diff --git a/src/slave.c b/src/slave.c index 6cf836c7..bfef2db4 100644 --- a/src/slave.c +++ b/src/slave.c @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -114,7 +115,7 @@ relay_t *relay_free (relay_t *relay) ICECAST_LOG_DEBUG("freeing relay %s", relay->config->localmount); if (relay->source) - source_free_source (relay->source); + source_free_source(relay->source); relay_config_free(relay->config); @@ -140,7 +141,7 @@ static inline void relay_config_upstream_copy(relay_config_upstream_t *dst, cons static inline relay_config_t *relay_config_copy (relay_config_t *r) { - relay_config_t *copy = calloc (1, sizeof (relay_config_t)); + relay_config_t *copy = calloc(1, sizeof(relay_config_t)); relay_config_upstream_t *u = NULL; size_t i; @@ -219,7 +220,7 @@ void slave_initialize(void) slave_running = 1; max_interval = 0; - thread_mutex_create (&_slave_mutex); + thread_mutex_create(&_slave_mutex); _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED); } @@ -235,7 +236,7 @@ void slave_shutdown(void) thread_mutex_unlock(&_slave_mutex); ICECAST_LOG_DEBUG("waiting for slave thread"); - thread_join (_slave_thread_id); + thread_join(_slave_thread_id); } @@ -250,44 +251,41 @@ static client_t *open_relay_connection (relay_t *relay, relay_config_upstream_t ice_config_t *config; http_parser_t *parser = NULL; connection_t *con=NULL; - char *server = strdup (_GET_UPSTREAM_SETTING(server)); - char *mount = strdup (_GET_UPSTREAM_SETTING(mount)); + char *server = strdup(_GET_UPSTREAM_SETTING(server)); + char *mount = strdup(_GET_UPSTREAM_SETTING(mount)); int port = _GET_UPSTREAM_SETTING(port); char *auth_header; char header[4096]; - config = config_get_config (); - server_id = strdup (config->server_id); - config_release_config (); + config = config_get_config(); + server_id = strdup(config->server_id); + config_release_config(); /* build any authentication header before connecting */ - if (_GET_UPSTREAM_SETTING(username) && _GET_UPSTREAM_SETTING(password)) - { + if (_GET_UPSTREAM_SETTING(username) && _GET_UPSTREAM_SETTING(password)) { char *esc_authorisation; unsigned len = strlen(_GET_UPSTREAM_SETTING(username)) + strlen(_GET_UPSTREAM_SETTING(password)) + 2; - auth_header = malloc (len); - snprintf (auth_header, len, "%s:%s", _GET_UPSTREAM_SETTING(username), _GET_UPSTREAM_SETTING(password)); + auth_header = malloc(len); + snprintf(auth_header, len, "%s:%s", _GET_UPSTREAM_SETTING(username), _GET_UPSTREAM_SETTING(password)); esc_authorisation = util_base64_encode(auth_header, len); free(auth_header); - len = strlen (esc_authorisation) + 24; - auth_header = malloc (len); - snprintf (auth_header, len, + len = strlen(esc_authorisation) + 24; + auth_header = malloc(len); + snprintf(auth_header, len, "Authorization: Basic %s\r\n", esc_authorisation); free(esc_authorisation); + } else { + auth_header = strdup(""); } - else - auth_header = strdup (""); - while (redirects < 10) - { + while (redirects < 10) { sock_t streamsock; ICECAST_LOG_INFO("connecting to %s:%d", server, port); - streamsock = sock_connect_wto_bind (server, port, _GET_UPSTREAM_SETTING(bind), 10); - if (streamsock == SOCK_ERROR) - { + streamsock = sock_connect_wto_bind(server, port, _GET_UPSTREAM_SETTING(bind), 10); + if (streamsock == SOCK_ERROR) { ICECAST_LOG_WARN("Failed to connect to %s:%d", server, port); break; } @@ -309,93 +307,86 @@ static client_t *open_relay_connection (relay_t *relay, relay_config_upstream_t server, _GET_UPSTREAM_SETTING(mp3metadata) ? "Icy-MetaData: 1\r\n" : "", auth_header); - memset (header, 0, sizeof(header)); - if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0) - { + memset(header, 0, sizeof(header)); + if (util_read_header(con->sock, header, 4096, READ_ENTIRE_HEADER) == 0) { ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->config->localmount, server, port, mount); break; } igloo_prng_write(igloo_instance, header, strlen(header), -1, igloo_PRNG_FLAG_NONE); parser = httpp_create_parser(); - httpp_initialize (parser, NULL); - if (! httpp_parse_response (parser, header, strlen(header), relay->config->localmount)) - { + httpp_initialize(parser, NULL); + if (! httpp_parse_response(parser, header, strlen(header), relay->config->localmount)) { ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->config->localmount, server, port, mount); break; } - if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0) - { + if (strcmp(httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0) { /* better retry the connection again but with different details */ const char *uri, *mountpoint; int len; - uri = httpp_getvar (parser, "location"); + uri = httpp_getvar(parser, "location"); ICECAST_LOG_INFO("redirect received %s", uri); - if (strncmp (uri, "http://", 7) != 0) + if (strncmp(uri, "http://", 7) != 0) break; uri += 7; - mountpoint = strchr (uri, '/'); - free (mount); + mountpoint = strchr(uri, '/'); + free(mount); if (mountpoint) - mount = strdup (mountpoint); + mount = strdup(mountpoint); else - mount = strdup ("/"); + mount = strdup("/"); - len = strcspn (uri, ":/"); + len = strcspn(uri, ":/"); port = 80; - if (uri [len] == ':') - port = atoi (uri+len+1); - free (server); - server = calloc (1, len+1); - strncpy (server, uri, len); - connection_close (con); - httpp_destroy (parser); + if (uri[len] == ':') + port = atoi(uri+len+1); + free(server); + server = calloc(1, len+1); + strncpy(server, uri, len); + connection_close(con); + httpp_destroy(parser); con = NULL; parser = NULL; - } - else - { + } else { client_t *client = NULL; - if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) - { + if (httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) { ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->config->localmount, httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); break; } - global_lock (); - if (client_create (&client, con, parser) < 0) - { - global_unlock (); + global_lock(); + if (client_create(&client, con, parser) < 0) { + global_unlock(); /* make sure only the client_destroy frees these */ con = NULL; parser = NULL; - client_destroy (client); + client_destroy(client); break; } - global_unlock (); - sock_set_blocking (streamsock, 0); - client_set_queue (client, NULL); + global_unlock(); + sock_set_blocking(streamsock, 0); + client_set_queue(client, NULL); client_complete(client); - free (server); - free (mount); - free (server_id); - free (auth_header); + free(server); + free(mount); + free(server_id); + free(auth_header); return client; } redirects++; } /* failed, better clean up */ - free (server); - free (mount); - free (server_id); - free (auth_header); + free(server); + free(mount); + free(server_id); + free(auth_header); if (con) - connection_close (con); + connection_close(con); if (parser) - httpp_destroy (parser); + httpp_destroy(parser); return NULL; } @@ -410,8 +401,7 @@ static void *start_relay_stream (void *arg) client_t *client; ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->config->localmount); - do - { + do { size_t i; for (i = 0; i < relay->config->upstreams; i++) { @@ -434,22 +424,20 @@ static void *start_relay_stream (void *arg) src->parser = client->parser; src->con = client->con; - if (connection_complete_source (src, 0) < 0) - { + if (connection_complete_source (src, 0) < 0) { ICECAST_LOG_INFO("Failed to complete source initialisation"); client_destroy (client); src->client = NULL; continue; } stats_event_inc(NULL, "source_relay_connections"); - stats_event (relay->config->localmount, "source_ip", client->con->ip); + stats_event(relay->config->localmount, "source_ip", client->con->ip); - source_main (relay->source); + source_main(relay->source); - if (relay->config->on_demand == 0) - { + if (relay->config->on_demand == 0) { /* only keep refreshing YP entries for inactive on-demand relays */ - yp_remove (relay->config->localmount); + yp_remove(relay->config->localmount); relay->source->yp_public = -1; relay->start = time(NULL) + 10; /* prevent busy looping if failing */ slave_update_all_mounts(); @@ -462,8 +450,7 @@ static void *start_relay_stream (void *arg) return NULL; } while (0); /* TODO allow looping through multiple servers */ - if (relay->source->fallback_mount) - { + if (relay->source->fallback_mount) { source_t *fallback_source; ICECAST_LOG_DEBUG("failed relay, fallback to %s", relay->source->fallback_mount); @@ -494,35 +481,28 @@ static void *start_relay_stream (void *arg) /* wrapper for starting the provided relay stream */ static void check_relay_stream (relay_t *relay) { - if (relay->source == NULL) - { - if (relay->config->localmount[0] != '/') - { + if (relay->source == NULL) { + if (relay->config->localmount[0] != '/') { ICECAST_LOG_WARN("relay mountpoint \"%s\" does not start with /, skipping", relay->config->localmount); return; } /* new relay, reserve the name */ - relay->source = source_reserve (relay->config->localmount); - if (relay->source) - { + relay->source = source_reserve(relay->config->localmount); + if (relay->source) { ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->config->localmount); - if (relay->config->on_demand) - { - ice_config_t *config = config_get_config (); - mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL); + if (relay->config->on_demand) { + ice_config_t *config = config_get_config(); + mount_proxy *mountinfo = config_find_mount(config, relay->config->localmount, MOUNT_TYPE_NORMAL); relay->source->on_demand = relay->config->on_demand; if (mountinfo == NULL) - source_update_settings (config, relay->source, mountinfo); - config_release_config (); - stats_event (relay->config->localmount, "listeners", "0"); + source_update_settings(config, relay->source, mountinfo); + config_release_config(); + stats_event(relay->config->localmount, "listeners", "0"); slave_update_all_mounts(); } - } - else - { - if (relay->start == 0) - { + } else { + if (relay->start == 0) { ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->config->localmount); relay->start = 1; } @@ -536,21 +516,18 @@ static void check_relay_stream (relay_t *relay) if (relay->source == NULL || relay->running || relay->start > time(NULL)) break; /* check if an inactive on-demand relay has a fallback that has listeners */ - if (relay->config->on_demand && source->on_demand_req == 0) - { + if (relay->config->on_demand && source->on_demand_req == 0) { relay->source->on_demand = relay->config->on_demand; - if (source->fallback_mount && source->fallback_override != FALLBACK_OVERRIDE_NONE) - { + if (source->fallback_mount && source->fallback_override != FALLBACK_OVERRIDE_NONE) { source_t *fallback; - avl_tree_rlock (global.source_tree); - fallback = source_find_mount (source->fallback_mount); - if (fallback && fallback->running && fallback->listeners) - { + avl_tree_rlock(global.source_tree); + fallback = source_find_mount(source->fallback_mount); + if (fallback && fallback->running && fallback->listeners) { ICECAST_LOG_DEBUG("fallback running %d with %lu listeners", fallback->running, fallback->listeners); source->on_demand_req = 1; } - avl_tree_unlock (global.source_tree); + avl_tree_unlock(global.source_tree); } if (source->on_demand_req == 0) break; @@ -558,30 +535,27 @@ static void check_relay_stream (relay_t *relay) relay->start = time(NULL) + 5; relay->running = 1; - relay->thread = thread_create ("Relay Thread", start_relay_stream, + relay->thread = thread_create("Relay Thread", start_relay_stream, relay, THREAD_ATTACHED); return; } while (0); /* the relay thread may of shut down itself */ - if (relay->cleanup) - { - if (relay->thread) - { + if (relay->cleanup) { + if (relay->thread) { ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->config->localmount); - thread_join (relay->thread); + thread_join(relay->thread); relay->thread = NULL; } relay->cleanup = 0; relay->running = 0; - if (relay->config->on_demand && relay->source) - { - ice_config_t *config = config_get_config (); - mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL); - source_update_settings (config, relay->source, mountinfo); - config_release_config (); - stats_event (relay->config->localmount, "listeners", "0"); + if (relay->config->on_demand && relay->source) { + ice_config_t *config = config_get_config(); + mount_proxy *mountinfo = config_find_mount(config, relay->config->localmount, MOUNT_TYPE_NORMAL); + source_update_settings(config, relay->source, mountinfo); + config_release_config(); + stats_event(relay->config->localmount, "listeners", "0"); } } } @@ -657,8 +631,7 @@ update_relay_set(relay_t **current, relay_config_t **updated, size_t updated_len existing_relay = *current; existing_p = current; - while (existing_relay) - { + while (existing_relay) { /* break out if keeping relay */ if (strcmp(relay->localmount, existing_relay->config->localmount) == 0) if (relay_has_changed(relay, existing_relay->config) == 0) @@ -669,13 +642,10 @@ update_relay_set(relay_t **current, relay_config_t **updated, size_t updated_len } - if (existing_relay == NULL) - { + if (existing_relay == NULL) { /* new one, copy and insert */ existing_relay = relay_new(relay); - } - else - { + } else { *existing_p = existing_relay->next; } existing_relay->next = new_list; @@ -710,30 +680,26 @@ static void relay_check_streams (relay_t *to_start, { relay_t *relay; - while (to_free) - { - if (to_free->source) - { - if (to_free->running) - { + while (to_free) { + if (to_free->source) { + if (to_free->running) { /* relay has been removed from xml, shut down active relay */ ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->config->localmount); to_free->running = 0; to_free->source->running = 0; - thread_join (to_free->thread); + thread_join(to_free->thread); + } else { + stats_event(to_free->config->localmount, NULL, NULL); } - else - stats_event (to_free->config->localmount, NULL, NULL); } - to_free = relay_free (to_free); + to_free = relay_free(to_free); } relay = to_start; - while (relay) - { + while (relay) { if (skip_timer) relay->start = 0; - check_relay_stream (relay); + check_relay_stream(relay); relay = relay->next; } } @@ -746,8 +712,8 @@ static int update_from_master(ice_config_t *config) sock_t mastersock; int ret = 0; char buf[256]; - do - { + + do { char *authheader, *data; relay_t *cleanup_relays; relay_config_t **new_relays = NULL; @@ -772,17 +738,16 @@ static int update_from_master(ice_config_t *config) config_release_config(); mastersock = sock_connect_wto(master, port, 10); - if (mastersock == SOCK_ERROR) - { + if (mastersock == SOCK_ERROR) { ICECAST_LOG_WARN("Relay slave failed to contact master server to fetch stream list"); break; } len = strlen(username) + strlen(password) + 2; authheader = malloc(len); - snprintf (authheader, len, "%s:%s", username, password); + snprintf(authheader, len, "%s:%s", username, password); data = util_base64_encode(authheader, len); - sock_write (mastersock, + sock_write(mastersock, "GET /admin/streamlist.txt HTTP/1.0\r\n" "Authorization: Basic %s\r\n" "\r\n", data); @@ -790,9 +755,8 @@ static int update_from_master(ice_config_t *config) free(data); if (sock_read_line(mastersock, buf, sizeof(buf)) == 0 || - ((strncmp (buf, "HTTP/1.0 200", 12) != 0) && (strncmp (buf, "HTTP/1.1 200", 12) != 0))) - { - sock_close (mastersock); + ((strncmp (buf, "HTTP/1.0 200", 12) != 0) && (strncmp (buf, "HTTP/1.1 200", 12) != 0))) { + sock_close(mastersock); ICECAST_LOG_WARN("Master rejected streamlist request"); break; } else { @@ -860,28 +824,28 @@ static int update_from_master(ice_config_t *config) } xmlFreeURI(parsed_uri); } - sock_close (mastersock); + sock_close(mastersock); - thread_mutex_lock (&(config_locks()->relay_lock)); - cleanup_relays = update_relays (&global.master_relays, new_relays, new_relays_length); + thread_mutex_lock(&(config_locks()->relay_lock)); + cleanup_relays = update_relays(&global.master_relays, new_relays, new_relays_length); - relay_check_streams (global.master_relays, cleanup_relays, 0); + relay_check_streams(global.master_relays, cleanup_relays, 0); for (i = 0; i < new_relays_length; i++) { relay_config_free(new_relays[i]); } free(new_relays); - thread_mutex_unlock (&(config_locks()->relay_lock)); + thread_mutex_unlock(&(config_locks()->relay_lock)); } while(0); if (master) - free (master); + free(master); if (username) - free (username); + free(username); if (password) - free (password); + free(password); return ret; } @@ -904,8 +868,7 @@ static void *_slave_thread(void *arg) config_release_config(); source_recheck_mounts(1); - while (1) - { + while (true) { relay_t *cleanup_relays = NULL; int skip_timer = 0; @@ -925,12 +888,11 @@ static void *_slave_thread(void *arg) } thread_mutex_unlock(&_slave_mutex); - ++interval; + interval++; /* only update relays lists when required */ thread_mutex_lock(&_slave_mutex); - if (max_interval <= interval) - { + if (max_interval <= interval) { ICECAST_LOG_DEBUG("checking master stream list"); config = config_get_config(); @@ -941,37 +903,34 @@ static void *_slave_thread(void *arg) thread_mutex_unlock(&_slave_mutex); /* the connection could take some time, so the lock can drop */ - if (update_from_master (config)) + if (update_from_master(config)) config = config_get_config(); - thread_mutex_lock (&(config_locks()->relay_lock)); + thread_mutex_lock(&(config_locks()->relay_lock)); cleanup_relays = update_relays(&global.relays, config->relay, config->relay_length); config_release_config(); - } - else - { + } else { thread_mutex_unlock(&_slave_mutex); - thread_mutex_lock (&(config_locks()->relay_lock)); + thread_mutex_lock(&(config_locks()->relay_lock)); } - relay_check_streams (global.relays, cleanup_relays, skip_timer); - relay_check_streams (global.master_relays, NULL, skip_timer); - thread_mutex_unlock (&(config_locks()->relay_lock)); + relay_check_streams(global.relays, cleanup_relays, skip_timer); + relay_check_streams(global.master_relays, NULL, skip_timer); + thread_mutex_unlock(&(config_locks()->relay_lock)); thread_mutex_lock(&_slave_mutex); - if (update_settings) - { - source_recheck_mounts (update_all_mounts); + if (update_settings) { + source_recheck_mounts(update_all_mounts); update_settings = 0; update_all_mounts = 0; } thread_mutex_unlock(&_slave_mutex); } ICECAST_LOG_INFO("shutting down current relays"); - relay_check_streams (NULL, global.relays, 0); - relay_check_streams (NULL, global.master_relays, 0); + relay_check_streams(NULL, global.relays, 0); + relay_check_streams(NULL, global.master_relays, 0); ICECAST_LOG_INFO("Slave thread shutdown complete");