From 2cb0e3180d94e18fbf1c1fc586ab272fd0684cb6 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Tue, 17 Jul 2012 14:03:37 +0000 Subject: [PATCH] race condition patch as submitted by lds and remi, slightly motified by me. closes #1810 svn path=/icecast/trunk/icecast/; revision=18454 --- src/connection.c | 9 +++++++-- src/sighandler.c | 3 +++ src/slave.c | 21 +++++++++++++++++++++ src/source.c | 11 ++++++++++- src/source.h | 2 ++ src/stats.c | 6 +++++- 6 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/connection.c b/src/connection.c index bd2115ae..a93bc33f 100644 --- a/src/connection.c +++ b/src/connection.c @@ -105,7 +105,7 @@ typedef struct avl_tree *contents; } cache_file_contents; -static spin_t _connection_lock; +static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail static volatile unsigned long _current_id = 0; static int _initialized = 0; @@ -568,8 +568,10 @@ static connection_t *_accept_connection(int duration) */ static void _add_connection (client_queue_t *node) { + thread_spin_lock (&_connection_lock); *_con_queue_tail = node; _con_queue_tail = (volatile client_queue_t **)&node->next; + thread_spin_unlock (&_connection_lock); } @@ -580,7 +582,8 @@ static client_queue_t *_get_connection(void) { client_queue_t *node = NULL; - /* common case, no new connections so don't bother taking locks */ + thread_spin_lock (&_connection_lock); + if (_con_queue) { node = (client_queue_t *)_con_queue; @@ -589,6 +592,8 @@ static client_queue_t *_get_connection(void) _con_queue_tail = &_con_queue; node->next = NULL; } + + thread_spin_unlock (&_connection_lock); return node; } diff --git a/src/sighandler.c b/src/sighandler.c index 59349373..540fbc64 100644 --- a/src/sighandler.c +++ b/src/sighandler.c @@ -54,7 +54,10 @@ void _sig_ignore(int signo) void _sig_hup(int signo) { + global_lock(); global . schedule_config_reread = 1; + global_unlock(); + /* some OSes require us to reattach the signal handler */ signal(SIGHUP, _sig_hup); } diff --git a/src/slave.c b/src/slave.c index 697c3448..486711de 100644 --- a/src/slave.c +++ b/src/slave.c @@ -64,6 +64,7 @@ static int slave_running = 0; static volatile int update_settings = 0; static volatile int update_all_mounts = 0; static volatile unsigned int max_interval = 0; +static mutex_t _slave_mutex; // protects update_settings, update_all_mounts, max_interval relay_server *relay_free (relay_server *relay) { @@ -109,9 +110,11 @@ relay_server *relay_copy (relay_server *r) */ void slave_update_all_mounts (void) { + thread_mutex_lock(&_slave_mutex); max_interval = 0; update_all_mounts = 1; update_settings = 1; + thread_mutex_unlock(&_slave_mutex); } @@ -120,7 +123,9 @@ void slave_update_all_mounts (void) */ void slave_rebuild_mounts (void) { + thread_mutex_lock(&_slave_mutex); update_settings = 1; + thread_mutex_unlock(&_slave_mutex); } @@ -131,6 +136,7 @@ void slave_initialize(void) slave_running = 1; max_interval = 0; + thread_mutex_create (&_slave_mutex); _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED); } @@ -369,9 +375,13 @@ static void *start_relay_stream (void *arg) source_clear_source (relay->source); /* cleanup relay, but prevent this relay from starting up again too soon */ + thread_mutex_lock(&_slave_mutex); + thread_mutex_lock(&(config_locks()->relay_lock)); relay->source->on_demand = 0; relay->start = time(NULL) + max_interval; relay->cleanup = 1; + thread_mutex_unlock(&(config_locks()->relay_lock)); + thread_mutex_unlock(&_slave_mutex); return NULL; } @@ -697,8 +707,10 @@ static void *_slave_thread(void *arg) ice_config_t *config; unsigned int interval = 0; + thread_mutex_lock(&_slave_mutex); update_settings = 0; update_all_mounts = 0; + thread_mutex_unlock(&_slave_mutex); config = config_get_config(); stats_global (config); @@ -711,11 +723,13 @@ static void *_slave_thread(void *arg) int skip_timer = 0; /* re-read xml file if requested */ + global_lock(); if (global . schedule_config_reread) { event_config_read (NULL); global . schedule_config_reread = 0; } + global_unlock(); thread_sleep (1000000); if (slave_running == 0) @@ -724,6 +738,7 @@ static void *_slave_thread(void *arg) ++interval; /* only update relays lists when required */ + thread_mutex_lock(&_slave_mutex); if (max_interval <= interval) { DEBUG0 ("checking master stream list"); @@ -733,6 +748,7 @@ static void *_slave_thread(void *arg) skip_timer = 1; interval = 0; max_interval = config->master_update_interval; + thread_mutex_unlock(&_slave_mutex); /* the connection could take some time, so the lock can drop */ if (update_from_master (config)) @@ -745,18 +761,23 @@ static void *_slave_thread(void *arg) config_release_config(); } else + { + thread_mutex_unlock(&_slave_mutex); thread_mutex_lock (&(config_locks()->relay_lock)); + } relay_check_streams (global.relays, cleanup_relays, skip_timer); relay_check_streams (global.master_relays, NULL, skip_timer); thread_mutex_unlock (&(config_locks()->relay_lock)); + thread_mutex_lock(&_slave_mutex); if (update_settings) { source_recheck_mounts (update_all_mounts); update_settings = 0; update_all_mounts = 0; } + thread_mutex_unlock(&_slave_mutex); } INFO0 ("shutting down current relays"); relay_check_streams (NULL, global.relays, 0); diff --git a/src/source.c b/src/source.c index 2b86a1c1..22a24e7c 100644 --- a/src/source.c +++ b/src/source.c @@ -102,6 +102,7 @@ source_t *source_reserve (const char *mount) /* make duplicates for strings or similar */ src->mount = strdup (mount); src->max_listeners = -1; + thread_mutex_create(&src->lock); avl_insert (global.source_tree, src); @@ -492,13 +493,15 @@ static refbuf_t *get_next_buffer (source_t *source) } if (fds == 0) { - if (source->last_read + (time_t)source->timeout < current) + thread_mutex_lock(&source->lock); + if ((source->last_read + (time_t)source->timeout) < current) { DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read, source->timeout, (long)current); WARN0 ("Disconnecting source due to socket timeout"); source->running = 0; } + thread_mutex_unlock(&source->lock); break; } source->last_read = current; @@ -718,8 +721,10 @@ void source_main (source_t *source) source->format->write_buf_to_file (source, refbuf); } /* lets see if we have too much data in the queue, but don't remove it until later */ + thread_mutex_lock(&source->lock); if (source->queue_size > source->queue_size_limit) remove_from_q = 1; + thread_mutex_unlock(&source->lock); /* acquire write lock on pending_tree */ avl_tree_wlock(source->pending_tree); @@ -1169,13 +1174,16 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) /* update the specified source with details from the config or mount. * mountinfo can be NULL in which case default settings should be taken + * This function is called by the Slave thread */ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo) { + thread_mutex_lock(&source->lock); /* skip if source is a fallback to file */ if (source->running && source->client == NULL) { stats_event_hidden (source->mount, NULL, 1); + thread_mutex_unlock(&source->lock); return; } /* set global settings first */ @@ -1229,6 +1237,7 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy DEBUG1 ("burst size to %u", source->burst_size); DEBUG1 ("source timeout to %u", source->timeout); DEBUG1 ("fallback_when_full to %u", source->fallback_when_full); + thread_mutex_unlock(&source->lock); } diff --git a/src/source.h b/src/source.h index 8f665e21..0f2f29f9 100644 --- a/src/source.h +++ b/src/source.h @@ -17,11 +17,13 @@ #include "yp.h" #include "util.h" #include "format.h" +#include "thread/thread.h" #include typedef struct source_tag { + mutex_t lock; client_t *client; connection_t *con; http_parser_t *parser; diff --git a/src/stats.c b/src/stats.c index 4567e078..3ca8da63 100644 --- a/src/stats.c +++ b/src/stats.c @@ -631,9 +631,9 @@ static void *_stats_thread(void *arg) INFO0 ("stats thread started"); while (_stats_running) { + thread_mutex_lock(&_global_event_mutex); if (_global_event_queue.head != NULL) { /* grab the next event from the queue */ - thread_mutex_lock(&_global_event_mutex); event = _get_event_from_queue (&_global_event_queue); thread_mutex_unlock(&_global_event_mutex); @@ -667,6 +667,10 @@ static void *_stats_thread(void *arg) thread_mutex_unlock(&_stats_mutex); continue; } + else + { + thread_mutex_unlock(&_global_event_mutex); + } thread_sleep(300000); }