diff --git a/src/auth.c b/src/auth.c index 34841529..de0a9a84 100644 --- a/src/auth.c +++ b/src/auth.c @@ -318,7 +318,6 @@ static int add_client_to_source (source_t *source, client_t *client) /* enable on-demand relay to start, wake up the slave thread */ DEBUG0("kicking off on-demand relay"); source->on_demand_req = 1; - slave_rescan (); } DEBUG1 ("Added client to %s", source->mount); return 0; diff --git a/src/auth.h b/src/auth.h index 12c6916a..c481bb0d 100644 --- a/src/auth.h +++ b/src/auth.h @@ -35,7 +35,7 @@ typedef enum AUTH_FORBIDDEN, AUTH_USERADDED, AUTH_USEREXISTS, - AUTH_USERDELETED, + AUTH_USERDELETED } auth_result; typedef struct auth_client_tag diff --git a/src/slave.c b/src/slave.c index de1f8eb6..65b308a5 100644 --- a/src/slave.c +++ b/src/slave.c @@ -65,7 +65,6 @@ static thread_type *_slave_thread_id; static int slave_running = 0; static int update_settings = 0; static volatile unsigned int max_interval = 0; -static volatile int rescan_relays = 0; relay_server *relay_free (relay_server *relay) { @@ -116,22 +115,12 @@ void slave_recheck_mounts (void) } -/* Request slave thread to rescan the existing relays to see if any need - * starting up, eg on-demand relays - */ -void slave_rescan (void) -{ - rescan_relays = 1; -} - - /* Request slave thread to check the relay list for changes and to * update the stats for the current streams. */ void slave_rebuild_mounts (void) { update_settings = 1; - rescan_relays = 1; } @@ -249,6 +238,7 @@ static void *start_relay_stream (void *arg) break; } global_unlock (); + sock_set_blocking (streamsock, SOCK_NONBLOCK); con = NULL; parser = NULL; client_set_queue (src->client, NULL); @@ -268,11 +258,11 @@ static void *start_relay_stream (void *arg) /* only keep refreshing YP entries for inactive on-demand relays */ yp_remove (relay->localmount); relay->source->yp_public = -1; + relay->start = time(NULL) + 10; /* prevent busy looping if failing */ } - /* initiate an immediate relay cleanup run */ + /* we've finished, now get cleaned up */ relay->cleanup = 1; - rescan_relays = 1; return NULL; } while (0); @@ -299,9 +289,9 @@ static void *start_relay_stream (void *arg) src->parser = NULL; source_clear_source (relay->source); - /* initiate an immediate relay cleanup run */ + /* cleanup relay, but prevent this relay from starting up again too soon */ + relay->start = time(NULL) + max_interval; relay->cleanup = 1; - rescan_relays = 1; return NULL; } @@ -321,25 +311,21 @@ static void check_relay_stream (relay_server *relay) /* new relay, reserve the name */ relay->source = source_reserve (relay->localmount); if (relay->source) + { DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount); + slave_rebuild_mounts(); + } else WARN1 ("new relay but source \"%s\" already exists", relay->localmount); } do { source_t *source = relay->source; - if (relay->source == NULL || relay->running) + /* skip relay if active, not configured or just not time yet */ + if (relay->source == NULL || relay->running || relay->start > time(NULL)) break; - if (relay->on_demand) + if (relay->on_demand && source->on_demand_req == 0) { - ice_config_t *config = config_get_config (); - mount_proxy *mountinfo = config_find_mount (config, relay->localmount); - - if (mountinfo == NULL) - source_update_settings (config, relay->source, mountinfo); - config_release_config (); - slave_rebuild_mounts(); - stats_event (relay->localmount, "listeners", "0"); relay->source->on_demand = relay->on_demand; if (source->fallback_mount && source->fallback_override) @@ -359,17 +345,21 @@ static void check_relay_stream (relay_server *relay) break; } + relay->start = time(NULL) + 5; relay->thread = thread_create ("Relay Thread", start_relay_stream, relay, THREAD_ATTACHED); return; - } while(0); + } while (0); /* the relay thread may of shut down itself */ - if (relay->cleanup && relay->thread) + if (relay->cleanup) { - DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount); - thread_join (relay->thread); - relay->thread = NULL; + if (relay->thread) + { + DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount); + thread_join (relay->thread); + relay->thread = NULL; + } relay->cleanup = 0; relay->running = 0; @@ -469,7 +459,8 @@ update_relays (relay_server **relay_list, relay_server *new_relay_list) } -static void relay_check_streams (relay_server *to_start, relay_server *to_free) +static void relay_check_streams (relay_server *to_start, + relay_server *to_free, int skip_timer) { relay_server *relay; @@ -494,6 +485,8 @@ static void relay_check_streams (relay_server *to_start, relay_server *to_free) relay = to_start; while (relay) { + if (skip_timer) + relay->start = 0; check_relay_stream (relay); relay = relay->next; } @@ -584,8 +577,8 @@ static int update_from_master(ice_config_t *config) thread_mutex_lock (&(config_locks()->relay_lock)); cleanup_relays = update_relays (&global.master_relays, new_relays); - relay_check_streams (global.master_relays, cleanup_relays); - relay_check_streams (NULL, new_relays); + relay_check_streams (global.master_relays, cleanup_relays, 0); + relay_check_streams (NULL, new_relays, 0); thread_mutex_unlock (&(config_locks()->relay_lock)); @@ -611,7 +604,8 @@ static void *_slave_thread(void *arg) while (1) { - relay_server *cleanup_relays; + relay_server *cleanup_relays = NULL; + int skip_timer = 0; /* re-read xml file if requested */ if (global . schedule_config_reread) @@ -623,8 +617,8 @@ static void *_slave_thread(void *arg) thread_sleep (1000000); if (slave_running == 0) break; - if (rescan_relays == 0 && max_interval > ++interval) - continue; + + ++interval; /* only update relays lists when required */ if (max_interval <= interval) @@ -632,6 +626,8 @@ static void *_slave_thread(void *arg) DEBUG0 ("checking master stream list"); config = config_get_config(); + if (max_interval == 0) + skip_timer = 1; interval = 0; max_interval = config->master_update_interval; @@ -644,19 +640,14 @@ static void *_slave_thread(void *arg) cleanup_relays = update_relays (&global.relays, config->relay); config_release_config(); - - relay_check_streams (global.relays, cleanup_relays); - thread_mutex_unlock (&(config_locks()->relay_lock)); } else - { - DEBUG0 ("rescanning relay lists"); thread_mutex_lock (&(config_locks()->relay_lock)); - relay_check_streams (global.master_relays, NULL); - relay_check_streams (global.relays, NULL); - thread_mutex_unlock (&(config_locks()->relay_lock)); - } - rescan_relays = 0; + + relay_check_streams (global.relays, cleanup_relays, skip_timer); + relay_check_streams (global.master_relays, NULL, skip_timer); + thread_mutex_unlock (&(config_locks()->relay_lock)); + if (update_settings) { update_settings = 0; @@ -664,8 +655,8 @@ static void *_slave_thread(void *arg) } } DEBUG0 ("shutting down current relays"); - relay_check_streams (NULL, global.relays); - relay_check_streams (NULL, global.master_relays); + relay_check_streams (NULL, global.relays, 0); + relay_check_streams (NULL, global.master_relays, 0); INFO0 ("Slave thread shutdown complete"); diff --git a/src/slave.h b/src/slave.h index 34c28cbc..67eb1e82 100644 --- a/src/slave.h +++ b/src/slave.h @@ -27,6 +27,7 @@ typedef struct _relay_server { int on_demand; int running; int cleanup; + time_t start; thread_type *thread; struct _relay_server *next; } relay_server; @@ -36,7 +37,6 @@ void slave_initialize(void); void slave_shutdown(void); void slave_recheck_mounts (void); void slave_rebuild_mounts (void); -void slave_rescan (void); relay_server *relay_free (relay_server *relay); #endif /* __SLAVE_H__ */