diff --git a/src/cfgfile.h b/src/cfgfile.h index 4bb97898..05609bc3 100644 --- a/src/cfgfile.h +++ b/src/cfgfile.h @@ -32,15 +32,6 @@ typedef struct ice_config_dir_tag struct ice_config_dir_tag *next; } ice_config_dir_t; -typedef struct _relay_server { - char *server; - int port; - char *mount; - char *localmount; - int mp3metadata; - struct _relay_server *next; -} relay_server; - typedef struct _config_options { char *name; char *value; diff --git a/src/connection.c b/src/connection.c index c3b55b37..1d3dd50c 100644 --- a/src/connection.c +++ b/src/connection.c @@ -479,6 +479,7 @@ int connection_complete_source (source_t *source) global.sources++; global_unlock(); + stats_event_inc(NULL, "sources"); /* for relays, we don't yet have a client, however we do require one * to retrieve the stream from. This is created here, quite late, @@ -578,10 +579,28 @@ int connection_create_source(client_t *client, connection_t *con, http_parser_t } config_release_config(); + /* we need to add this source into the tree but fail if this mountpoint + * already exists + */ + avl_tree_wlock(global.source_tree); + if (source_find_mount_raw (mount) != NULL) + { + avl_tree_unlock(global.source_tree); + global_lock(); + global.sources--; + global_unlock(); + stats_event_dec(NULL, "sources"); + INFO1("source \"%s\" already in use", mount); + client_send_404 (client, "Mountpoint in use"); + return 0; + } + avl_insert(global.source_tree, (void *)source); + avl_tree_unlock(global.source_tree); + source->send_return = 1; source->shutdown_rwlock = &_source_shutdown_rwlock; sock_set_blocking(con->sock, SOCK_NONBLOCK); - thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED); + thread_create("Source Thread", source_client_thread, (void *)source, THREAD_DETACHED); return 1; fail: @@ -775,9 +794,7 @@ static void _handle_source_request(connection_t *con, } avl_tree_unlock(global.source_tree); - if (!connection_create_source(client, con, parser, uri)) { - client_send_404(client, "Mountpoint in use"); - } + connection_create_source(client, con, parser, uri); } static void _handle_stats_request(connection_t *con, diff --git a/src/global.c b/src/global.c index 405e8b57..bc48beda 100644 --- a/src/global.c +++ b/src/global.c @@ -37,6 +37,8 @@ void global_initialize(void) { memset(global.serversock, 0, sizeof(int)*MAX_LISTEN_SOCKETS); global.server_sockets = 0; + global.relays = NULL; + global.master_relays = NULL; global.running = 0; global.clients = 0; global.sources = 0; @@ -47,7 +49,7 @@ void global_initialize(void) void global_shutdown(void) { thread_mutex_destroy(&_global_mutex); - avl_tree_free(global.source_tree, source_free_source); + avl_tree_free(global.source_tree, NULL); } void global_lock(void) diff --git a/src/global.h b/src/global.h index 235e9352..e646b0f7 100644 --- a/src/global.h +++ b/src/global.h @@ -23,6 +23,7 @@ #define MAX_LISTEN_SOCKETS 10 #include "thread/thread.h" +#include "slave.h" typedef struct ice_global_tag { @@ -36,6 +37,10 @@ typedef struct ice_global_tag int schedule_config_reread; avl_tree *source_tree; + /* for locally defined relays */ + struct _relay_server *relays; + /* relays retrieved from master */ + struct _relay_server *master_relays; cond_t shutdown_cond; } ice_global_t; diff --git a/src/slave.c b/src/slave.c index 7ddfcd37..6ee094e4 100644 --- a/src/slave.c +++ b/src/slave.c @@ -61,8 +61,53 @@ static void *_slave_thread(void *arg); thread_type *_slave_thread_id; -static int _initialized = 0; -static unsigned max_interval = 0; +static int slave_running = 0; +static int max_interval = 0; + +relay_server *relay_free (relay_server *relay) +{ + relay_server *next = relay->next; + DEBUG1("freeing relay %s", relay->localmount); + if (relay->source) + source_free_source (relay->source); + xmlFree (relay->server); + xmlFree (relay->mount); + xmlFree (relay->localmount); + xmlFree (relay); + return next; +} + + +relay_server *relay_copy (relay_server *r) +{ + relay_server *copy = calloc (1, sizeof (relay_server)); + + if (copy) + { + copy->server = xmlStrdup (r->server); + copy->mount = xmlStrdup (r->mount); + copy->localmount = xmlStrdup (r->localmount); + copy->port = r->port; + copy->mp3metadata = r->mp3metadata; + } + return copy; +} + + +static void *_relay_thread (void *arg) +{ + relay_server *relay = arg; + + relay->running = 1; + + source_main (relay->source); + + relay->running = 0; + if (relay->cleanup) + relay_free (relay); + + return NULL; +} void slave_recheck (void) @@ -71,189 +116,333 @@ void slave_recheck (void) } -void slave_initialize(void) { - ice_config_t *config; - if (_initialized) return; - - config = config_get_config(); - /* Don't create a slave thread if it isn't configured */ - if (config->master_server == NULL && - config->relay == NULL) - { - config_release_config(); +void slave_initialize(void) +{ + if (slave_running) return; - } - config_release_config(); - _initialized = 1; + slave_running = 1; _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED); } -void slave_shutdown(void) { - if (!_initialized) return; - _initialized = 0; - thread_join(_slave_thread_id); -} -static void create_relay_stream(char *server, int port, - char *remotemount, char *localmount, int mp3) +void slave_shutdown(void) { - sock_t streamsock; - char header[4096]; - connection_t *con; - http_parser_t *parser; - client_t *client; + relay_server *relay; - if(!localmount) - localmount = remotemount; - - DEBUG1("Adding source at mountpoint \"%s\"", localmount); - - streamsock = sock_connect_wto(server, port, 0); - if (streamsock == SOCK_ERROR) { - WARN2("Failed to relay stream from master server, couldn't connect to http://%s:%d", server, port); + if (!slave_running) return; - } - con = create_connection(streamsock, -1, NULL); - /* At this point we may not know if we are relaying a mp3 or vorbis stream, - * so lets send in the icy-metadata header just in case, it's harmless in - * the vorbis case. If we don't send in this header then relay will not - * have mp3 metadata. - */ - sock_write(streamsock, "GET %s HTTP/1.0\r\n" - "User-Agent: " ICECAST_VERSION_STRING "\r\n" - "Icy-MetaData: 1\r\n" - "\r\n", - remotemount); - memset(header, 0, sizeof(header)); - if (util_read_header(con->sock, header, 4096) == 0) { - WARN0("Header read failed"); - connection_close(con); - return; - } - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); - if(!httpp_parse_response(parser, header, strlen(header), localmount)) { - if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) { - ERROR1("Error parsing relay request: %s", - httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); - } - else - ERROR0("Error parsing relay request"); - connection_close(con); - httpp_destroy(parser); - return; - } + slave_running = 0; + thread_join (_slave_thread_id); - client = client_create(con, parser); - if (!connection_create_source(client, con, parser, - httpp_getvar(parser, HTTPP_VAR_URI))) { - DEBUG0("Failed to create source"); - client_destroy(client); - } + relay = global.relays; + while (relay) + relay = relay_free (relay); + global.relays = NULL; - return; + relay = global.master_relays; + while (relay) + relay = relay_free (relay); + global.master_relays = NULL; } -static void *_slave_thread(void *arg) { - sock_t mastersock; - char buf[256]; - unsigned interval = 0; - char *authheader, *data; - int len; - char *username = "relay"; - relay_server *relay; - ice_config_t *config; - while (_initialized) { - if (max_interval > ++interval) { - thread_sleep(1000000); - continue; +/* This does the actual connection for a relay. A thread is + * started off if a connection can be acquired + */ +static void start_relay_stream (relay_server *relay) +{ + sock_t streamsock = SOCK_ERROR; + source_t *src = relay->source; + http_parser_t *parser = NULL; + connection_t *con=NULL; + char header[4096]; + + INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount); + do + { + streamsock = sock_connect_wto (relay->server, relay->port, 30); + if (streamsock == SOCK_ERROR) + { + WARN3("Failed to relay stream from master server, couldn't connect to http://%s:%d%s", + relay->server, relay->port, relay->mount); + break; } - else { - /* In case it's been reconfigured */ - config = config_get_config(); - max_interval = config->master_update_interval; + con = create_connection (streamsock, -1, NULL); - interval = 0; + /* At this point we may not know if we are relaying an mp3 or vorbis + * stream, but only send the icy-metadata header if the relay details + * state so (the typical case). It's harmless in the vorbis case. If + * we don't send in this header then relay will not have mp3 metadata. + */ + sock_write(streamsock, "GET %s HTTP/1.0\r\n" + "User-Agent: " ICECAST_VERSION_STRING "\r\n" + "%s" + "\r\n", + relay->mount, relay->mp3metadata?"Icy-MetaData: 1\r\n":""); + memset (header, 0, sizeof(header)); + if (util_read_header (con->sock, header, 4096) == 0) + { + WARN0("Header read failed"); + break; } - - if(config->master_server != NULL) { - char *server = strdup (config->master_server); - int port = config->master_server_port; - char *password = NULL; - if (config->master_password != NULL) - password = strdup (config->master_password); - else - password = strdup (config->source_password); - config_release_config(); - - mastersock = sock_connect_wto(server, port, 0); - - if (mastersock == SOCK_ERROR) { - WARN0("Relay slave failed to contact master server to fetch stream list"); - free (server); - free (password); - continue; - } - - len = strlen(username) + strlen(password) + 1; - authheader = malloc(len+1); - strcpy(authheader, username); - strcat(authheader, ":"); - strcat(authheader, password); - data = util_base64_encode(authheader); - sock_write(mastersock, - "GET /admin/streamlist.txt HTTP/1.0\r\n" - "Authorization: Basic %s\r\n" - "\r\n", data); - free(authheader); - free(data); - while (sock_read_line(mastersock, buf, sizeof(buf))) { - if(!strlen(buf)) - break; - } - - while (sock_read_line(mastersock, buf, sizeof(buf))) { - avl_tree_rlock(global.source_tree); - if (!source_find_mount(buf)) { - avl_tree_unlock(global.source_tree); - - create_relay_stream(server, port, buf, NULL, 0); - } - else - avl_tree_unlock(global.source_tree); - } - free (server); - free (password); - sock_close(mastersock); + parser = httpp_create_parser(); + httpp_initialize (parser, NULL); + if (! httpp_parse_response (parser, header, strlen(header), relay->localmount)) + { + ERROR0("Error parsing relay request"); + break; } - else { - config_release_config(); + if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) + { + ERROR1("Error from relay request: %s", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); + break; } + src->parser = parser; + src->con = con; + if (connection_complete_source (src) < 0) + { + DEBUG0("Failed to complete source initialisation"); + break; + } + thread_create ("Relay Thread", _relay_thread, relay, THREAD_DETACHED); - /* And now, we process the individual mounts... */ - config = config_get_config(); - relay = config->relay; - thread_mutex_lock(&(config_locks()->relay_lock)); - config_release_config(); + return; + } while (0); - while(relay) { - avl_tree_rlock(global.source_tree); - if(!source_find_mount_raw(relay->localmount)) { - avl_tree_unlock(global.source_tree); + if (con == NULL && streamsock != SOCK_ERROR) + sock_close (streamsock); + if (con) + connection_close (con); + src->con = NULL; + if (parser) + httpp_destroy (parser); + src->parser = NULL; +} - create_relay_stream(relay->server, relay->port, relay->mount, - relay->localmount, relay->mp3metadata); - } - else - avl_tree_unlock(global.source_tree); + +/* wrapper for starting the provided relay stream */ +static void check_relay_stream (relay_server *relay) +{ + if (relay->source == NULL) + { + /* new relay, reserve the name */ + DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount); + relay->source = source_reserve (relay->localmount); + } + if (relay->source && !relay->running) + { + start_relay_stream (relay); + } +} + + +/* go through updated looking for relays that are different configured. The + * returned list contains relays that should be kept running, current contains + * the list of relays to shutdown + */ +static relay_server * +update_relay_set (relay_server **current, relay_server *updated) +{ + relay_server *relay = updated; + relay_server *existing_relay, **existing_p; + relay_server *new_list = NULL; + + while (relay) + { + existing_relay = *current; + existing_p = current; + + while (existing_relay) + { + if (strcmp (relay->localmount, existing_relay->localmount) == 0) + break; + existing_p = &existing_relay->next; + existing_relay = existing_relay->next; + } + if (existing_relay == NULL) + { + /* new one, copy and insert */ + existing_relay = relay_copy (relay); + } + else + { + *existing_p = existing_relay->next; + } + existing_relay->next = new_list; + new_list = existing_relay; + relay = relay->next; + } + return new_list; +} + + +/* update the relay_list with entries from new_relay_list. Any new relays + * are added to the list, and any not listed in the provided new_relay_list + * get marked for shutting down, just in case they are not shutting down by + * themselves + */ +static void +update_relays (relay_server **relay_list, relay_server *new_relay_list) +{ + relay_server *relay, *current; + + current = update_relay_set (relay_list, new_relay_list); + + /* ok whats left, lets make sure they shut down */ + relay = *relay_list; + while (relay) + { + relay->cleanup = 1; + if (relay->source) + { + if (relay->source->running) + DEBUG1 ("requested %s to shut down", relay->source->mount); + relay->source->running = 0; relay = relay->next; } - - thread_mutex_unlock(&(config_locks()->relay_lock)); + else + relay = relay_free (relay); } - INFO0 ("Slave thread shutting down"); + /* re-assign new set */ + *relay_list = current; +} + + +static int update_from_master(ice_config_t *config) +{ + char *master = NULL, *password = NULL, *username= NULL; + int port; + sock_t mastersock; + int ret = 0; + char buf[256]; + do + { + char *authheader, *data; + relay_server *relays = NULL, *relay; + int len, count = 1; + + username = strdup ("relay"); + if (config->master_password) + password = strdup (config->master_password); + + if (config->master_server) + master = strdup (config->master_server); + + port = config->master_server_port; + + if (password == NULL || master == NULL || port == 0) + break; + ret = 1; + config_release_config(); + mastersock = sock_connect_wto (master, port, 0); + + if (mastersock == SOCK_ERROR) + { + WARN0("Relay slave failed to contact master server to fetch stream list"); + break; + } + + len = strlen(username) + strlen(password) + 1; + authheader = malloc(len+1); + strcpy(authheader, username); + strcat(authheader, ":"); + strcat(authheader, password); + data = util_base64_encode(authheader); + sock_write (mastersock, + "GET /admin/streamlist.txt HTTP/1.0\r\n" + "Authorization: Basic %s\r\n" + "\r\n", data); + free(authheader); + free(data); + + while (sock_read_line(mastersock, buf, sizeof(buf))) + { + if (!strlen(buf)) + break; + } + while (sock_read_line(mastersock, buf, sizeof(buf))) + { + relay_server *r; + if (!strlen(buf)) + continue; + DEBUG2 ("read %d from master \"%s\"", count++, buf); + r = calloc (1, sizeof (relay_server)); + if (r) + { + r->server = xmlStrdup (master); + r->port = port; + r->mount = xmlStrdup (buf); + r->localmount = xmlStrdup (buf); + r->mp3metadata = 1; + r->next = relays; + relays = r; + } + } + sock_close (mastersock); + + update_relays (&global.master_relays, relays); + /* start any inactive relays */ + relay = global.master_relays; + while (relay) + { + check_relay_stream (relay); + relay = relay->next; + } + relay = relays; + while (relay) + relay = relay_free (relay); + } while(0); + + if (master) + free (master); + if (username) + free (username); + if (password) + free (password); + + return ret; +} + + +static void *_slave_thread(void *arg) +{ + ice_config_t *config; + relay_server *relay; + unsigned interval = 0; + + while (slave_running) + { + thread_sleep (1000000); + if (max_interval > ++interval) + continue; + + interval = 0; + config = config_get_config(); + + max_interval = config->master_update_interval; + + /* the connection could time some time, so the lock can drop */ + if (update_from_master (config)) + config = config_get_config(); + + thread_mutex_lock (&(config_locks()->relay_lock)); + + update_relays (&global.relays, config->relay); + + config_release_config(); + + /* start any inactive relays */ + relay = global.relays; + while (relay) + { + check_relay_stream (relay); + relay = relay->next; + } + thread_mutex_unlock (&(config_locks()->relay_lock)); + } + INFO0 ("Slave thread shutdown complete"); + return NULL; } diff --git a/src/slave.h b/src/slave.h index a07705c8..b0078551 100644 --- a/src/slave.h +++ b/src/slave.h @@ -13,8 +13,22 @@ #ifndef __SLAVE_H__ #define __SLAVE_H__ +typedef struct _relay_server { + char *server; + int port; + char *mount; + char *localmount; + struct source_tag *source; + int mp3metadata; + int running; + int cleanup; + struct _relay_server *next; +} relay_server; + + void slave_initialize(void); void slave_shutdown(void); void slave_recheck (void); +relay_server *relay_free (relay_server *relay); #endif /* __SLAVE_H__ */ diff --git a/src/source.c b/src/source.c index 4f4fc988..f1e1eb8b 100644 --- a/src/source.c +++ b/src/source.c @@ -246,11 +246,13 @@ int source_compare_sources(void *arg, void *a, void *b) void source_clear_source (source_t *source) { #ifdef USE_YP - int i; + int i; #endif DEBUG1 ("clearing source \"%s\"", source->mount); client_destroy(source->client); source->client = NULL; + source->parser = NULL; + source->con = NULL; /* lets kick off any clients that are left on here */ avl_tree_rlock (source->client_tree); @@ -281,15 +283,15 @@ void source_clear_source (source_t *source) source->ypdata[i] = NULL; } source->num_yp_directories = 0; + + util_dict_free (source->audio_info); + source->audio_info = NULL; #endif source->listeners = 0; source->no_mount = 0; source->max_listeners = -1; source->yp_public = 0; - util_dict_free(source->audio_info); - source->audio_info = NULL; - free(source->fallback_mount); source->fallback_mount = NULL; @@ -298,33 +300,26 @@ void source_clear_source (source_t *source) } +/* Remove the provided source from the global tree and free it */ int source_free_source(void *key) { source_t *source = key; -#ifdef USE_YP - int i; -#endif - free(source->mount); - free(source->fallback_mount); - free(source->dumpfilename); - client_destroy(source->client); + DEBUG1 ("freeing source \"%s\"", source->mount); + avl_tree_wlock (global.source_tree); + avl_delete (global.source_tree, source, NULL); + avl_tree_unlock (global.source_tree); + avl_tree_free(source->pending_tree, _free_client); avl_tree_free(source->client_tree, _free_client); - source->format->free_plugin(source->format); -#ifdef USE_YP - for (i=0; inum_yp_directories; i++) - { - yp_destroy_ypdata(source->ypdata[i]); - source->ypdata[i] = NULL; - } -#endif - util_dict_free(source->audio_info); - free(source); + + free (source->mount); + free (source); return 1; } + client_t *source_find_client(source_t *source, int id) { client_t fakeclient; @@ -455,27 +450,6 @@ void *source_main(void *arg) /* grab a read lock, to make sure we get a chance to cleanup */ thread_rwlock_rlock(source->shutdown_rwlock); - avl_tree_wlock(global.source_tree); - /* Now, we must do a final check with write lock taken out that the - * mountpoint is available.. - */ - if (source_find_mount_raw(source->mount) != NULL) { - avl_tree_unlock(global.source_tree); - if(source->send_return) { - client_send_404(source->client, "Mountpoint in use"); - } - global_lock(); - global.sources--; - global_unlock(); - thread_rwlock_unlock(source->shutdown_rwlock); - thread_exit(0); - return NULL; - } - /* insert source onto source tree */ - avl_insert(global.source_tree, (void *)source); - /* release write lock on global source tree */ - avl_tree_unlock(global.source_tree); - /* If we connected successfully, we can send the message (if requested) * back */ @@ -491,6 +465,7 @@ void *source_main(void *arg) stats_event(source->mount, "listeners", "0"); stats_event(source->mount, "type", source->format->format_description); #ifdef USE_YP + source->audio_info = util_dict_new(); /* ice-* is icecast, icy-* is shoutcast */ if ((s = httpp_getvar(source->parser, "ice-url"))) { add_yp_info(source, "server_url", s, YP_SERVER_URL); @@ -847,6 +822,7 @@ void *source_main(void *arg) done: + source->running = 0; INFO1("Source \"%s\" exiting", source->mount); #ifdef USE_YP @@ -855,13 +831,11 @@ done: } #endif - /* Now, we must remove this source from the source tree before - * removing the clients, otherwise new clients can sneak into the pending - * tree after we've cleared it + /* we have de-activated the source now, so no more clients will be + * added, now move the listeners we have to the fallback (if any) */ - avl_tree_wlock(global.source_tree); + avl_tree_rlock(global.source_tree); fallback_source = source_find_mount (source->fallback_mount); - avl_delete (global.source_tree, source, NULL); if (fallback_source != NULL) source_move_clients (source, fallback_source); @@ -882,10 +856,10 @@ done: /* release our hold on the lock so the main thread can continue cleaning up */ thread_rwlock_unlock(source->shutdown_rwlock); - source_free_source(source); + /* we don't remove the source from the tree here, it may be a relay and + therefore reserved */ + source_clear_source (source); - thread_exit(0); - return NULL; }