From 4cc76466ac8688960fd4d65f32ff3f56fbd06e5f Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 19 Mar 2022 21:50:42 +0000 Subject: [PATCH] Update: Check if clients are ready before interacting with them --- src/connection.c | 239 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 171 insertions(+), 68 deletions(-) diff --git a/src/connection.c b/src/connection.c index 30409fec..9f70dadb 100644 --- a/src/connection.c +++ b/src/connection.c @@ -90,6 +90,7 @@ typedef struct client_queue_tag { char *bodybuffer; size_t bodybufferlen; int tried_body; + bool ready; struct client_queue_tag *next; } client_queue_entry_t; @@ -97,6 +98,10 @@ typedef struct { client_queue_entry_t *head; client_queue_entry_t **tail; mutex_t mutex; +#ifdef HAVE_POLL + struct pollfd *pollfds; + size_t pollfds_len; +#endif } client_queue_t; static spin_t _connection_lock; // protects _current_id @@ -128,6 +133,9 @@ static void client_queue_init(client_queue_t *queue) static void client_queue_destroy(client_queue_t *queue) { thread_mutex_destroy(&(queue->mutex)); +#ifdef HAVE_POLL + free(queue->pollfds); +#endif } static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry) @@ -160,6 +168,104 @@ static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_q return ret; } +static bool client_queue_check_ready(client_queue_t *queue, int timeout) +{ + if (!queue->head) + return false; + +#ifdef HAVE_POLL + if (true) { + size_t count = 0; + size_t i; + client_queue_entry_t *cur; + + thread_mutex_lock(&(queue->mutex)); + for (cur = queue->head; cur; cur = cur->next) { + count++; + cur->ready = false; + } + + if (queue->pollfds_len < count) { + free(queue->pollfds); + queue->pollfds = calloc(count, sizeof(*queue->pollfds)); + if (queue->pollfds) { + queue->pollfds_len = count; + } else { + ICECAST_LOG_ERROR("Allocation of queue->pollfds failed. BAD."); + queue->pollfds_len = 0; + thread_mutex_unlock(&(queue->mutex)); + return false; + } + } else { + memset(queue->pollfds, 0, sizeof(*queue->pollfds)*count); + } + + for (cur = queue->head, i = 0; cur && i < count; cur = cur->next, i++) { + queue->pollfds[i].fd = cur->client->con->sock; + queue->pollfds[i].events = POLLIN; + } + thread_mutex_unlock(&(queue->mutex)); + + if (poll(queue->pollfds, count, timeout) < 1) + return false; + + thread_mutex_lock(&(queue->mutex)); + for (cur = queue->head; cur; cur = cur->next) { + for (i = 0; i < count; i++) { + if (queue->pollfds[i].fd == cur->client->con->sock) { + if (queue->pollfds[i].revents) { + cur->ready = true; + } + } + } + } + thread_mutex_unlock(&(queue->mutex)); + } +#endif + + return true; +} + +static client_queue_entry_t * client_queue_shift_ready(client_queue_t *queue, client_queue_entry_t *stop) +{ +#ifdef HAVE_POLL + client_queue_entry_t *cur; + client_queue_entry_t *last = NULL; + + if (!queue->head) + return NULL; + + thread_mutex_lock(&(queue->mutex)); + for (cur = queue->head; cur && cur != stop; cur = cur->next) { + if (cur->ready) { + // use this one. + if (last == NULL) { + /* we are the head */ + queue->head = cur->next; + if (!queue->head) { + queue->tail = &(queue->head); + } + } else { + last->next = cur->next; + if (queue->tail == &(cur->next)) { + queue->tail = &(last->next); + } + } + + cur->next = NULL; + thread_mutex_unlock(&(queue->mutex)); + return cur; + } + last = cur; + } + thread_mutex_unlock(&(queue->mutex)); + return NULL; +#else + /* just return any */ + return client_queue_shift(queue, stop); +#endif +} + static bool client_queue_empty(client_queue_t *queue) { /* No need to lock here as this is a point-in-time thing anyway */ @@ -547,7 +653,9 @@ static void process_request_queue (void) timeout = config->header_timeout; config_release_config(); - while ((node = client_queue_shift(&_request_queue, stop))) { + client_queue_check_ready(&_request_queue, 100); + + while ((node = client_queue_shift_ready(&_request_queue, stop))) { if (!process_request_queue_one(node, timeout)) { client_queue_add(&_request_queue, node); if (!stop) @@ -941,75 +1049,70 @@ static void _handle_connection(void) const char *rawuri; client_queue_entry_t *node; - while (1) { - node = client_queue_shift(&_connection_queue, NULL); - if (node) { - client_t *client = node->client; - int already_parsed = 0; + while ((node = client_queue_shift(&_connection_queue, NULL))) { + client_t *client = node->client; + int already_parsed = 0; - /* Check for special shoutcast compatability processing */ - if (node->shoutcast) { - _handle_shoutcast_compatible (node); - if (node->shoutcast) - continue; - } - - /* process normal HTTP headers */ - if (client->parser) { - already_parsed = 1; - parser = client->parser; - } else { - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); - client->parser = parser; - } - if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) { - client->refbuf->len = 0; - - /* early check if we need more data */ - client_complete(client); - if (_need_body(node)) { - /* Just calling client_queue_add(&_body_queue, node) would do the job. - * However, if the client only has a small body this might work without moving it between queues. - * -> much faster. - */ - client_slurp_result_t res; - ice_config_t *config; - time_t timeout; - size_t body_size_limit; - - config = config_get_config(); - timeout = time(NULL) - config->body_timeout; - body_size_limit = config->body_size_limit; - config_release_config(); - - res = process_request_body_queue_one(node, timeout, body_size_limit); - if (res != CLIENT_SLURP_SUCCESS) { - ICECAST_LOG_DEBUG("Putting client %p in body queue.", client); - client_queue_add(&_body_queue, node); - continue; - } else { - ICECAST_LOG_DEBUG("Success on fast lane"); - } - } - - rawuri = httpp_getvar(parser, HTTPP_VAR_URI); - - /* assign a port-based shoutcast mountpoint if required */ - if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) - httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); - - free_client_node(node); - - connection_handle_client(client); - } else { - free (node); - ICECAST_LOG_ERROR("HTTP request parsing failed"); - client_destroy (client); - } - continue; + /* Check for special shoutcast compatability processing */ + if (node->shoutcast) { + _handle_shoutcast_compatible (node); + if (node->shoutcast) + continue; + } + + /* process normal HTTP headers */ + if (client->parser) { + already_parsed = 1; + parser = client->parser; + } else { + parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + client->parser = parser; + } + if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) { + client->refbuf->len = 0; + + /* early check if we need more data */ + client_complete(client); + if (_need_body(node)) { + /* Just calling client_queue_add(&_body_queue, node) would do the job. + * However, if the client only has a small body this might work without moving it between queues. + * -> much faster. + */ + client_slurp_result_t res; + ice_config_t *config; + time_t timeout; + size_t body_size_limit; + + config = config_get_config(); + timeout = time(NULL) - config->body_timeout; + body_size_limit = config->body_size_limit; + config_release_config(); + + res = process_request_body_queue_one(node, timeout, body_size_limit); + if (res != CLIENT_SLURP_SUCCESS) { + ICECAST_LOG_DEBUG("Putting client %p in body queue.", client); + client_queue_add(&_body_queue, node); + continue; + } else { + ICECAST_LOG_DEBUG("Success on fast lane"); + } + } + + rawuri = httpp_getvar(parser, HTTPP_VAR_URI); + + /* assign a port-based shoutcast mountpoint if required */ + if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) + httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); + + free_client_node(node); + + connection_handle_client(client); + } else { + free_client_node(node); + ICECAST_LOG_ERROR("HTTP request parsing failed"); + client_destroy (client); } - break; } }