From ab8d1639bad5ddec31368a3448d56f78027b0902 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Fri, 11 May 2018 08:08:38 +0000 Subject: [PATCH] Update: Made listensocket_* perfect thread safe. --- src/connection.c | 7 +- src/listensocket.c | 213 +++++++++++++++++++++++++++++++++++++++------ src/listensocket.h | 1 + 3 files changed, 195 insertions(+), 26 deletions(-) diff --git a/src/connection.c b/src/connection.c index 973c7a07..2da136f6 100644 --- a/src/connection.c +++ b/src/connection.c @@ -623,6 +623,8 @@ static client_queue_t *create_client_node(client_t *client) node->shoutcast_mount = strdup(listener->shoutcast_mount); } + listensocket_release_listener(client->con->listensocket_effective); + return node; } @@ -1216,6 +1218,7 @@ static int _handle_resources(client_t *client, char **uri) break; } + listensocket_release_listener(client->con->listensocket_effective); config_release_config(); if (new_uri) { @@ -1405,15 +1408,17 @@ static void __prepare_shoutcast_admin_cgi_request(client_t *client) return; } - listener = listensocket_get_listener(client->con->listensocket_effective); global_lock(); config = config_get_config(); sc_mount = config->shoutcast_mount; + listener = listensocket_get_listener(client->con->listensocket_effective); if (listener && listener->shoutcast_mount) sc_mount = listener->shoutcast_mount; httpp_set_query_param(client->parser, "mount", sc_mount); + listensocket_release_listener(client->con->listensocket_effective); + httpp_setvar(client->parser, HTTPP_VAR_PROTOCOL, "ICY"); client->password = strdup(pass); config_release_config(); diff --git a/src/listensocket.c b/src/listensocket.c index bddbee00..9ff1fde6 100644 --- a/src/listensocket.c +++ b/src/listensocket.c @@ -23,6 +23,7 @@ #include #include "common/net/sock.h" +#include "common/thread/thread.h" #include "listensocket.h" #include "global.h" @@ -35,6 +36,7 @@ struct listensocket_container_tag { refobject_base_t __base; + mutex_t lock; listensocket_t **sock; int *sockref; size_t sock_len; @@ -44,13 +46,19 @@ struct listensocket_container_tag { struct listensocket_tag { refobject_base_t __base; size_t sockrefc; + mutex_t lock; + rwlock_t listener_rwlock; listener_t *listener; listener_t *listener_update; sock_t sock; }; +static int listensocket_container_configure__unlocked(listensocket_container_t *self, const ice_config_t *config); +static int listensocket_container_setup__unlocked(listensocket_container_t *self); +static ssize_t listensocket_container_sockcount__unlocked(listensocket_container_t *self); static listensocket_t * listensocket_new(const listener_t *listener); static int listensocket_apply_config(listensocket_t *self); +static int listensocket_apply_config__unlocked(listensocket_t *self); static int listensocket_set_update(listensocket_t *self, const listener_t *listener); #ifdef HAVE_POLL static inline int listensocket__poll_fill(listensocket_t *self, struct pollfd *p); @@ -88,10 +96,10 @@ static inline void __call_sockcount_cb(listensocket_container_t *self) if (self->sockcount_cb == NULL) return; - self->sockcount_cb(listensocket_container_sockcount(self), self->sockcount_userdata); + self->sockcount_cb(listensocket_container_sockcount__unlocked(self), self->sockcount_userdata); } -static void listensocket_container_clear_sockets(listensocket_container_t *self) +static void __listensocket_container_clear_sockets(listensocket_container_t *self) { size_t i; @@ -121,7 +129,10 @@ static void listensocket_container_clear_sockets(listensocket_container_t *self) static void __listensocket_container_free(refobject_t self, void **userdata) { listensocket_container_t *container = REFOBJECT_TO_TYPE(self, listensocket_container_t *); - listensocket_container_clear_sockets(container); + thread_mutex_lock(&container->lock); + __listensocket_container_clear_sockets(container); + thread_mutex_unlock(&container->lock); + thread_mutex_destroy(&container->lock); } listensocket_container_t * listensocket_container_new(void) @@ -130,25 +141,29 @@ listensocket_container_t * listensocket_container_new(void) if (!self) return NULL; - self->sock = NULL; self->sock_len = 0; self->sockcount_cb = NULL; self->sockcount_userdata = NULL; + thread_mutex_create(&self->lock); + return self; } static inline void __find_matching_entry(listensocket_container_t *self, const listener_t *listener, listensocket_t ***found, int **ref) { const listener_t *b; + int test; size_t i; for (i = 0; i < self->sock_len; i++) { if (self->sock[i] != NULL) { if (self->sockref[i]) { b = listensocket_get_listener(self->sock[i]); - if (__listener_cmp(listener, b) == 1) { + test = __listener_cmp(listener, b); + listensocket_release_listener(self->sock[i]); + if (test == 1) { *found = &(self->sock[i]); *ref = &(self->sockref[i]); return; @@ -162,6 +177,20 @@ static inline void __find_matching_entry(listensocket_container_t *self, const l } int listensocket_container_configure(listensocket_container_t *self, const ice_config_t *config) +{ + int ret; + + if (!self) + return -1; + + thread_mutex_lock(&self->lock); + ret = listensocket_container_configure__unlocked(self, config); + thread_mutex_unlock(&self->lock); + + return ret; +} + +static int listensocket_container_configure__unlocked(listensocket_container_t *self, const ice_config_t *config) { listensocket_t **n; listensocket_t **match; @@ -174,7 +203,7 @@ int listensocket_container_configure(listensocket_contai return -1; if (!config->listen_sock_count) { - listensocket_container_clear_sockets(self); + __listensocket_container_clear_sockets(self); return 0; } @@ -212,7 +241,7 @@ int listensocket_container_configure(listensocket_contai cur = cur->next; } - listensocket_container_clear_sockets(self); + __listensocket_container_clear_sockets(self); self->sock = n; self->sockref = r; @@ -229,28 +258,41 @@ int listensocket_container_configure_and_setup(listensoc if (!self) return -1; + thread_mutex_lock(&self->lock); cb = self->sockcount_cb; self->sockcount_cb = NULL; - if (listensocket_container_configure(self, config) == 0) { - ret = listensocket_container_setup(self); + if (listensocket_container_configure__unlocked(self, config) == 0) { + ret = listensocket_container_setup__unlocked(self); } else { ret = -1; } self->sockcount_cb = cb; __call_sockcount_cb(self); + thread_mutex_unlock(&self->lock); return ret; } -int listensocket_container_setup(listensocket_container_t *self) { - size_t i; - int ret = 0; +int listensocket_container_setup(listensocket_container_t *self) +{ + int ret; if (!self) return -1; + thread_mutex_lock(&self->lock); + ret = listensocket_container_setup__unlocked(self); + thread_mutex_unlock(&self->lock); + + return ret; +} +static int listensocket_container_setup__unlocked(listensocket_container_t *self) +{ + size_t i; + int ret = 0; + for (i = 0; i < self->sock_len; i++) { if (self->sockref[i]) { listensocket_apply_config(self->sock[i]); @@ -360,10 +402,15 @@ static connection_t * listensocket_container_accept__inner(listensocket_co } connection_t * listensocket_container_accept(listensocket_container_t *self, int timeout) { + connection_t *ret; + if (!self) return NULL; - return listensocket_container_accept__inner(self, timeout); + thread_mutex_lock(&self->lock); + ret = listensocket_container_accept__inner(self, timeout); + thread_mutex_unlock(&self->lock); + return ret; } int listensocket_container_set_sockcount_cb(listensocket_container_t *self, void (*cb)(size_t count, void *userdata), void *userdata) @@ -371,20 +418,33 @@ int listensocket_container_set_sockcount_cb(listensocket if (!self) return -1; + thread_mutex_lock(&self->lock); self->sockcount_cb = cb; self->sockcount_userdata = userdata; + thread_mutex_unlock(&self->lock); return 0; } ssize_t listensocket_container_sockcount(listensocket_container_t *self) { - ssize_t count = 0; - size_t i; + ssize_t ret; if (!self) return -1; + thread_mutex_lock(&self->lock); + ret = listensocket_container_sockcount__unlocked(self); + thread_mutex_unlock(&self->lock); + + return ret; +} + +static ssize_t listensocket_container_sockcount__unlocked(listensocket_container_t *self) +{ + ssize_t count = 0; + size_t i; + for (i = 0; i < self->sock_len; i++) { if (self->sockref[i]) { count++; @@ -394,18 +454,27 @@ ssize_t listensocket_container_sockcount(listensocket_contai return count; } +/* ---------------------------------------------------------------------------- */ + static void __listensocket_free(refobject_t self, void **userdata) { listensocket_t *listensocket = REFOBJECT_TO_TYPE(self, listensocket_t *); + thread_mutex_lock(&listensocket->lock); + if (listensocket->sockrefc) { ICECAST_LOG_ERROR("BUG: listensocket->sockrefc == 0 && listensocket->sockrefc == %zu", listensocket->sockrefc); listensocket->sockrefc = 1; listensocket_unrefsock(listensocket); } - while ((listensocket->listener = config_clear_listener(listensocket->listener))); while ((listensocket->listener_update = config_clear_listener(listensocket->listener_update))); + thread_rwlock_wlock(&listensocket->listener_rwlock); + while ((listensocket->listener = config_clear_listener(listensocket->listener))); + thread_rwlock_unlock(&listensocket->listener_rwlock); + thread_rwlock_destroy(&listensocket->listener_rwlock); + thread_mutex_unlock(&listensocket->lock); + thread_mutex_destroy(&listensocket->lock); } static listensocket_t * listensocket_new(const listener_t *listener) { @@ -420,6 +489,9 @@ static listensocket_t * listensocket_new(const listener_t *listener) { self->sock = SOCK_ERROR; + thread_mutex_create(&self->lock); + thread_rwlock_create(&self->listener_rwlock); + self->listener = config_copy_listener_one(listener); if (self->listener == NULL) { refobject_unref(self); @@ -430,12 +502,27 @@ static listensocket_t * listensocket_new(const listener_t *listener) { } static int listensocket_apply_config(listensocket_t *self) +{ + int ret; + + if (!self) + return -1; + + thread_mutex_lock(&self->lock); + ret = listensocket_apply_config__unlocked(self); + thread_mutex_unlock(&self->lock); + + return ret; +} + +static int listensocket_apply_config__unlocked(listensocket_t *self) { const listener_t *listener; if (!self) return -1; + thread_rwlock_wlock(&self->listener_rwlock); if (self->listener_update) { if (__listener_cmp(self->listener, self->listener_update) != 1) { ICECAST_LOG_ERROR("Tried to apply incomplete configuration to listensocket: bind address missmatch: have %s:%i, got %s:%i", @@ -444,6 +531,7 @@ static int listensocket_apply_config(listensocket_t *self) __string_default(self->listener_update->bind_address, ""), self->listener_update->port ); + thread_rwlock_unlock(&self->listener_rwlock); return -1; } @@ -463,6 +551,8 @@ static int listensocket_apply_config(listensocket_t *self) self->listener_update = NULL; } + thread_rwlock_unlock(&self->listener_rwlock); + return 0; } @@ -477,8 +567,10 @@ static int listensocket_set_update(listensocket_t *self, const list if (n == NULL) return -1; + thread_mutex_lock(&self->lock); while ((self->listener_update = config_clear_listener(self->listener_update))); self->listener_update = n; + thread_mutex_unlock(&self->lock); return 0; } @@ -487,26 +579,38 @@ int listensocket_refsock(listensocket_t *self) if (!self) return -1; + thread_mutex_lock(&self->lock); if (self->sockrefc) { self->sockrefc++; + thread_mutex_unlock(&self->lock); return 0; } + thread_rwlock_rlock(&self->listener_rwlock); self->sock = sock_get_server_socket(self->listener->port, self->listener->bind_address); - if (self->sock == SOCK_ERROR) + thread_rwlock_unlock(&self->listener_rwlock); + if (self->sock == SOCK_ERROR) { + thread_mutex_unlock(&self->lock); return -1; + } if (sock_listen(self->sock, ICECAST_LISTEN_QUEUE) == 0) { sock_close(self->sock); self->sock = SOCK_ERROR; + thread_rwlock_rlock(&self->listener_rwlock); ICECAST_LOG_ERROR("Can not listen on socket: %s port %i", __string_default(self->listener->bind_address, ""), self->listener->port); + thread_rwlock_unlock(&self->listener_rwlock); + thread_mutex_unlock(&self->lock); return -1; } - if (listensocket_apply_config(self) == -1) + if (listensocket_apply_config__unlocked(self) == -1) { + thread_mutex_unlock(&self->lock); return -1; + } self->sockrefc++; + thread_mutex_unlock(&self->lock); return 0; } @@ -516,15 +620,21 @@ int listensocket_unrefsock(listensocket_t *self) if (!self) return -1; + thread_mutex_lock(&self->lock); self->sockrefc--; - if (self->sockrefc) + if (self->sockrefc) { + thread_mutex_unlock(&self->lock); return 0; + } - if (self->sock == SOCK_ERROR) + if (self->sock == SOCK_ERROR) { + thread_mutex_unlock(&self->lock); return 0; + } sock_close(self->sock); self->sock = SOCK_ERROR; + thread_mutex_unlock(&self->lock); return 0; } @@ -542,7 +652,9 @@ connection_t * listensocket_accept(listensocket_t *self) if (!ip) return NULL; + thread_mutex_lock(&self->lock); sock = sock_accept(self->sock, ip, MAX_ADDR_LEN); + thread_mutex_unlock(&self->lock); if (sock == SOCK_ERROR) { free(ip); return NULL; @@ -564,41 +676,92 @@ connection_t * listensocket_accept(listensocket_t *self) const listener_t * listensocket_get_listener(listensocket_t *self) { + const listener_t *ret; + if (!self) return NULL; - return self->listener; + thread_mutex_lock(&self->lock); + thread_rwlock_rlock(&self->listener_rwlock); + ret = self->listener; + thread_mutex_unlock(&self->lock); + + return ret; +} + +int listensocket_release_listener(listensocket_t *self) +{ + if (!self) + return -1; + + /* This is safe with no self->lock holding as unref requires a wlock. + * A wlock can not be acquired when someone still holds the rlock. + * In fact this must be done in unlocked state as otherwise we could end up in a + * dead lock with some 3rd party holding the self->lock for an unrelated operation + * waiting for a wlock to be come available. + * -- ph3-der-loewe, 2018-05-11 + */ + thread_rwlock_unlock(&self->listener_rwlock); + + return 0; } #ifdef HAVE_POLL static inline int listensocket__poll_fill(listensocket_t *self, struct pollfd *p) { - if (!self || self->sock == SOCK_ERROR) + if (!self) return -1; + thread_mutex_lock(&self->lock); + if (self->sock == SOCK_ERROR) { + thread_mutex_unlock(&self->lock); + return -1; + } + memset(p, 0, sizeof(*p)); p->fd = self->sock; p->events = POLLIN; p->revents = 0; + thread_mutex_unlock(&self->lock); + return 0; } #else static inline int listensocket__select_set(listensocket_t *self, fd_set *set, int *max) { - if (!self || self->sock == SOCK_ERROR) + if (!self) return -1; + thread_mutex_lock(&self->lock); + if (self->sock == SOCK_ERROR) { + thread_mutex_unlock(&self->lock); + return -1; + } + if (*max < self->sock) *max = self->sock; FD_SET(self->sock, set); + thread_mutex_unlock(&self->lock); + return 0; } static inline int listensocket__select_isset(listensocket_t *self, fd_set *set) { - if (!self || self->sock == SOCK_ERROR) + int ret; + + if (!self) return -1; - return FD_ISSET(self->sock, set); + + thread_mutex_lock(&self->lock); + if (self->sock == SOCK_ERROR) { + thread_mutex_unlock(&self->lock); + return -1; + } + ret = FD_ISSET(self->sock, set); + thread_mutex_unlock(&self->lock); + return ret; + } #endif diff --git a/src/listensocket.h b/src/listensocket.h index b3a03e6a..43c18c9b 100644 --- a/src/listensocket.h +++ b/src/listensocket.h @@ -24,5 +24,6 @@ int listensocket_refsock(listensocket_t *self); int listensocket_unrefsock(listensocket_t *self); connection_t * listensocket_accept(listensocket_t *self); const listener_t * listensocket_get_listener(listensocket_t *self); +int listensocket_release_listener(listensocket_t *self); #endif