From 0dc265583809e067e514a25cc51938998b9ff8a3 Mon Sep 17 00:00:00 2001 From: Karl Heyes Date: Wed, 15 Mar 2006 03:02:08 +0000 Subject: [PATCH] slave handler update. add timestamps to relays, allows slave thread to process them better. This simplifies various checks and sits better with relay startup and relay cleanup in certain error cases. svn path=/icecast/trunk/icecast/; revision=11008 --- src/auth.c | 1 - src/auth.h | 2 +- src/slave.c | 87 ++++++++++++++++++++++++----------------------------- src/slave.h | 2 +- 4 files changed, 41 insertions(+), 51 deletions(-) diff --git a/src/auth.c b/src/auth.c index 34841529..de0a9a84 100644 --- a/src/auth.c +++ b/src/auth.c @@ -318,7 +318,6 @@ static int add_client_to_source (source_t *source, client_t *client) /* enable on-demand relay to start, wake up the slave thread */ DEBUG0("kicking off on-demand relay"); source->on_demand_req = 1; - slave_rescan (); } DEBUG1 ("Added client to %s", source->mount); return 0; diff --git a/src/auth.h b/src/auth.h index 12c6916a..c481bb0d 100644 --- a/src/auth.h +++ b/src/auth.h @@ -35,7 +35,7 @@ typedef enum AUTH_FORBIDDEN, AUTH_USERADDED, AUTH_USEREXISTS, - AUTH_USERDELETED, + AUTH_USERDELETED } auth_result; typedef struct auth_client_tag diff --git a/src/slave.c b/src/slave.c index de1f8eb6..65b308a5 100644 --- a/src/slave.c +++ b/src/slave.c @@ -65,7 +65,6 @@ static thread_type *_slave_thread_id; static int slave_running = 0; static int update_settings = 0; static volatile unsigned int max_interval = 0; -static volatile int rescan_relays = 0; relay_server *relay_free (relay_server *relay) { @@ -116,22 +115,12 @@ void slave_recheck_mounts (void) } -/* Request slave thread to rescan the existing relays to see if any need - * starting up, eg on-demand relays - */ -void slave_rescan (void) -{ - rescan_relays = 1; -} - - /* Request slave thread to check the relay list for changes and to * update the stats for the current streams. */ void slave_rebuild_mounts (void) { update_settings = 1; - rescan_relays = 1; } @@ -249,6 +238,7 @@ static void *start_relay_stream (void *arg) break; } global_unlock (); + sock_set_blocking (streamsock, SOCK_NONBLOCK); con = NULL; parser = NULL; client_set_queue (src->client, NULL); @@ -268,11 +258,11 @@ static void *start_relay_stream (void *arg) /* only keep refreshing YP entries for inactive on-demand relays */ yp_remove (relay->localmount); relay->source->yp_public = -1; + relay->start = time(NULL) + 10; /* prevent busy looping if failing */ } - /* initiate an immediate relay cleanup run */ + /* we've finished, now get cleaned up */ relay->cleanup = 1; - rescan_relays = 1; return NULL; } while (0); @@ -299,9 +289,9 @@ static void *start_relay_stream (void *arg) src->parser = NULL; source_clear_source (relay->source); - /* initiate an immediate relay cleanup run */ + /* cleanup relay, but prevent this relay from starting up again too soon */ + relay->start = time(NULL) + max_interval; relay->cleanup = 1; - rescan_relays = 1; return NULL; } @@ -321,25 +311,21 @@ static void check_relay_stream (relay_server *relay) /* new relay, reserve the name */ relay->source = source_reserve (relay->localmount); if (relay->source) + { DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount); + slave_rebuild_mounts(); + } else WARN1 ("new relay but source \"%s\" already exists", relay->localmount); } do { source_t *source = relay->source; - if (relay->source == NULL || relay->running) + /* skip relay if active, not configured or just not time yet */ + if (relay->source == NULL || relay->running || relay->start > time(NULL)) break; - if (relay->on_demand) + if (relay->on_demand && source->on_demand_req == 0) { - ice_config_t *config = config_get_config (); - mount_proxy *mountinfo = config_find_mount (config, relay->localmount); - - if (mountinfo == NULL) - source_update_settings (config, relay->source, mountinfo); - config_release_config (); - slave_rebuild_mounts(); - stats_event (relay->localmount, "listeners", "0"); relay->source->on_demand = relay->on_demand; if (source->fallback_mount && source->fallback_override) @@ -359,17 +345,21 @@ static void check_relay_stream (relay_server *relay) break; } + relay->start = time(NULL) + 5; relay->thread = thread_create ("Relay Thread", start_relay_stream, relay, THREAD_ATTACHED); return; - } while(0); + } while (0); /* the relay thread may of shut down itself */ - if (relay->cleanup && relay->thread) + if (relay->cleanup) { - DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount); - thread_join (relay->thread); - relay->thread = NULL; + if (relay->thread) + { + DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount); + thread_join (relay->thread); + relay->thread = NULL; + } relay->cleanup = 0; relay->running = 0; @@ -469,7 +459,8 @@ update_relays (relay_server **relay_list, relay_server *new_relay_list) } -static void relay_check_streams (relay_server *to_start, relay_server *to_free) +static void relay_check_streams (relay_server *to_start, + relay_server *to_free, int skip_timer) { relay_server *relay; @@ -494,6 +485,8 @@ static void relay_check_streams (relay_server *to_start, relay_server *to_free) relay = to_start; while (relay) { + if (skip_timer) + relay->start = 0; check_relay_stream (relay); relay = relay->next; } @@ -584,8 +577,8 @@ static int update_from_master(ice_config_t *config) thread_mutex_lock (&(config_locks()->relay_lock)); cleanup_relays = update_relays (&global.master_relays, new_relays); - relay_check_streams (global.master_relays, cleanup_relays); - relay_check_streams (NULL, new_relays); + relay_check_streams (global.master_relays, cleanup_relays, 0); + relay_check_streams (NULL, new_relays, 0); thread_mutex_unlock (&(config_locks()->relay_lock)); @@ -611,7 +604,8 @@ static void *_slave_thread(void *arg) while (1) { - relay_server *cleanup_relays; + relay_server *cleanup_relays = NULL; + int skip_timer = 0; /* re-read xml file if requested */ if (global . schedule_config_reread) @@ -623,8 +617,8 @@ static void *_slave_thread(void *arg) thread_sleep (1000000); if (slave_running == 0) break; - if (rescan_relays == 0 && max_interval > ++interval) - continue; + + ++interval; /* only update relays lists when required */ if (max_interval <= interval) @@ -632,6 +626,8 @@ static void *_slave_thread(void *arg) DEBUG0 ("checking master stream list"); config = config_get_config(); + if (max_interval == 0) + skip_timer = 1; interval = 0; max_interval = config->master_update_interval; @@ -644,19 +640,14 @@ static void *_slave_thread(void *arg) cleanup_relays = update_relays (&global.relays, config->relay); config_release_config(); - - relay_check_streams (global.relays, cleanup_relays); - thread_mutex_unlock (&(config_locks()->relay_lock)); } else - { - DEBUG0 ("rescanning relay lists"); thread_mutex_lock (&(config_locks()->relay_lock)); - relay_check_streams (global.master_relays, NULL); - relay_check_streams (global.relays, NULL); - thread_mutex_unlock (&(config_locks()->relay_lock)); - } - rescan_relays = 0; + + relay_check_streams (global.relays, cleanup_relays, skip_timer); + relay_check_streams (global.master_relays, NULL, skip_timer); + thread_mutex_unlock (&(config_locks()->relay_lock)); + if (update_settings) { update_settings = 0; @@ -664,8 +655,8 @@ static void *_slave_thread(void *arg) } } DEBUG0 ("shutting down current relays"); - relay_check_streams (NULL, global.relays); - relay_check_streams (NULL, global.master_relays); + relay_check_streams (NULL, global.relays, 0); + relay_check_streams (NULL, global.master_relays, 0); INFO0 ("Slave thread shutdown complete"); diff --git a/src/slave.h b/src/slave.h index 34c28cbc..67eb1e82 100644 --- a/src/slave.h +++ b/src/slave.h @@ -27,6 +27,7 @@ typedef struct _relay_server { int on_demand; int running; int cleanup; + time_t start; thread_type *thread; struct _relay_server *next; } relay_server; @@ -36,7 +37,6 @@ void slave_initialize(void); void slave_shutdown(void); void slave_recheck_mounts (void); void slave_rebuild_mounts (void); -void slave_rescan (void); relay_server *relay_free (relay_server *relay); #endif /* __SLAVE_H__ */