diff --git a/src/connection.c b/src/connection.c index d5641c0c..27c2021c 100644 --- a/src/connection.c +++ b/src/connection.c @@ -93,13 +93,19 @@ typedef struct client_queue_tag { struct client_queue_tag *next; } client_queue_entry_t; -static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail +typedef struct { + client_queue_entry_t *head; + client_queue_entry_t **tail; + mutex_t mutex; +} client_queue_t; + +static spin_t _connection_lock; // protects _current_id static volatile connection_id_t _current_id = 0; static int _initialized = 0; -static volatile client_queue_entry_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; -static volatile client_queue_entry_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; -static volatile client_queue_entry_t *_body_queue = NULL, **_body_queue_tail = &_body_queue; +static client_queue_t _request_queue; +static client_queue_t _connection_queue; +static client_queue_t _body_queue; static bool tls_ok = false; static tls_ctx_t *tls_ctx; @@ -111,6 +117,54 @@ rwlock_t _source_shutdown_rwlock; static void _handle_connection(void); static void get_tls_certificate(ice_config_t *config); +static void client_queue_init(client_queue_t *queue) +{ + memset(queue, 0, sizeof(*queue)); + queue->tail = &(queue->head); + thread_mutex_create(&(queue->mutex)); +} + +static void client_queue_destroy(client_queue_t *queue) +{ + thread_mutex_destroy(&(queue->mutex)); +} + +static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry) +{ + thread_mutex_lock(&(queue->mutex)); + *(queue->tail) = entry; + queue->tail = &(entry->next); + thread_mutex_unlock(&(queue->mutex)); +} + +static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_queue_entry_t *stop) +{ + client_queue_entry_t *ret; + + thread_mutex_lock(&(queue->mutex)); + ret = queue->head; + if (ret) { + if (ret == stop) { + ret = NULL; + } else { + queue->head = ret->next; + if (!queue->head) { + queue->tail = &(queue->head); + } + ret->next = NULL; + } + } + thread_mutex_unlock(&(queue->mutex)); + + return ret; +} + +static bool client_queue_empty(client_queue_t *queue) +{ + /* No need to lock here as this is a point-in-time thing anyway */ + return queue->head == NULL ? true : false; +} + void connection_initialize(void) { if (_initialized) @@ -119,12 +173,9 @@ void connection_initialize(void) thread_spin_create (&_connection_lock); thread_mutex_create(&move_clients_mutex); thread_rwlock_create(&_source_shutdown_rwlock); - _req_queue = NULL; - _req_queue_tail = &_req_queue; - _con_queue = NULL; - _con_queue_tail = &_con_queue; - _body_queue = NULL; - _body_queue_tail = &_body_queue; + client_queue_init(&_request_queue); + client_queue_init(&_connection_queue); + client_queue_init(&_body_queue); _initialized = 1; } @@ -141,6 +192,9 @@ void connection_shutdown(void) thread_rwlock_destroy(&_source_shutdown_rwlock); thread_spin_destroy (&_connection_lock); thread_mutex_destroy(&move_clients_mutex); + client_queue_destroy(&_request_queue); + client_queue_destroy(&_connection_queue); + client_queue_destroy(&_body_queue); _initialized = 0; } @@ -391,161 +445,118 @@ int connection_read_put_back(connection_t *con, const void *buf, size_t len) } } -/* add client to connection queue. At this point some header information - * has been collected, so we now pass it onto the connection thread for - * further processing - */ -static void _add_connection(client_queue_entry_t *node) +/* run along queue checking for any data that has come in or a timeout */ +static bool process_request_queue_one (client_queue_entry_t *node, int timeout) { - thread_spin_lock(&_connection_lock); - *_con_queue_tail = node; - _con_queue_tail = (volatile client_queue_entry_t **) &node->next; - thread_spin_unlock(&_connection_lock); -} + client_t *client = node->client; + int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset; + char *buf = client->refbuf->data + node->offset; + ICECAST_LOG_DDEBUG("Checking on client %p", client); -/* this returns queued clients for the connection thread. headers are - * already provided, but need to be parsed. - */ -static client_queue_entry_t *_get_connection(void) -{ - client_queue_entry_t *node = NULL; - - thread_spin_lock(&_connection_lock); - - if (_con_queue){ - node = (client_queue_entry_t *)_con_queue; - _con_queue = node->next; - if (_con_queue == NULL) - _con_queue_tail = &_con_queue; - node->next = NULL; + if (client->con->tlsmode == ICECAST_TLSMODE_AUTO || client->con->tlsmode == ICECAST_TLSMODE_AUTO_NO_PLAIN) { + char peak; + if (recv(client->con->sock, &peak, 1, MSG_PEEK) == 1) { + if (peak == 0x16) { /* TLS Record Protocol Content type 0x16 == Handshake */ + connection_uses_tls(client->con); + } + } } - thread_spin_unlock(&_connection_lock); - return node; + if (len > 0) { + if (client->con->con_time + timeout <= time(NULL)) { + len = 0; + } else { + len = client_read_bytes(client, buf, len); + } + } + + if (len > 0 || node->shoutcast > 1) { + ssize_t stream_offset = -1; + int pass_it = 1; + char *ptr; + + if (len < 0 && node->shoutcast > 1) + len = 0; + + /* handle \n, \r\n and nsvcap which for some strange reason has + * EOL as \r\r\n */ + node->offset += len; + client->refbuf->data[node->offset] = '\000'; + do { + if (node->shoutcast == 1) { + /* password line */ + if (strstr (client->refbuf->data, "\r\r\n") != NULL) + break; + if (strstr (client->refbuf->data, "\r\n") != NULL) + break; + if (strstr (client->refbuf->data, "\n") != NULL) + break; + } + /* stream_offset refers to the start of any data sent after the + * http style headers, we don't want to lose those */ + ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n"); + if (ptr) { + stream_offset = (ptr+6) - client->refbuf->data; + break; + } + ptr = strstr(client->refbuf->data, "\r\n\r\n"); + if (ptr) { + stream_offset = (ptr+4) - client->refbuf->data; + break; + } + ptr = strstr(client->refbuf->data, "\n\n"); + if (ptr) { + stream_offset = (ptr+2) - client->refbuf->data; + break; + } + pass_it = 0; + } while (0); + + ICECAST_LOG_DDEBUG("pass_it=%i, len=%i", pass_it, (int)len); + ICECAST_LOG_DDEBUG("Client %p has buffer: %H", client, client->refbuf->data); + + if (pass_it) { + if (stream_offset != -1) { + connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset); + node->offset = stream_offset; + } + client_queue_add(&_connection_queue, node); + return true; + } + } else { + if (len == 0 || client->con->error) { + client_destroy(client); + free(node); + return true; + } + } + + return false; } - -/* run along queue checking for any data that has come in or a timeout */ static void process_request_queue (void) { - client_queue_entry_t **node_ref = (client_queue_entry_t **)&_req_queue; + client_queue_entry_t *stop = NULL; + client_queue_entry_t *node; ice_config_t *config; int timeout; - char peak; config = config_get_config(); timeout = config->header_timeout; config_release_config(); - while (*node_ref) { - client_queue_entry_t *node = *node_ref; - client_t *client = node->client; - int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset; - char *buf = client->refbuf->data + node->offset; - - ICECAST_LOG_DDEBUG("Checking on client %p", client); - - if (client->con->tlsmode == ICECAST_TLSMODE_AUTO || client->con->tlsmode == ICECAST_TLSMODE_AUTO_NO_PLAIN) { - if (recv(client->con->sock, &peak, 1, MSG_PEEK) == 1) { - if (peak == 0x16) { /* TLS Record Protocol Content type 0x16 == Handshake */ - connection_uses_tls(client->con); - } - } + while ((node = client_queue_shift(&_request_queue, stop))) { + if (!process_request_queue_one(node, timeout)) { + client_queue_add(&_request_queue, node); + if (!stop) + stop = node; } - - if (len > 0) { - if (client->con->con_time + timeout <= time(NULL)) { - len = 0; - } else { - len = client_read_bytes(client, buf, len); - } - } - - if (len > 0 || node->shoutcast > 1) { - ssize_t stream_offset = -1; - int pass_it = 1; - char *ptr; - - if (len < 0 && node->shoutcast > 1) - len = 0; - - /* handle \n, \r\n and nsvcap which for some strange reason has - * EOL as \r\r\n */ - node->offset += len; - client->refbuf->data[node->offset] = '\000'; - do { - if (node->shoutcast == 1) { - /* password line */ - if (strstr (client->refbuf->data, "\r\r\n") != NULL) - break; - if (strstr (client->refbuf->data, "\r\n") != NULL) - break; - if (strstr (client->refbuf->data, "\n") != NULL) - break; - } - /* stream_offset refers to the start of any data sent after the - * http style headers, we don't want to lose those */ - ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n"); - if (ptr) { - stream_offset = (ptr+6) - client->refbuf->data; - break; - } - ptr = strstr(client->refbuf->data, "\r\n\r\n"); - if (ptr) { - stream_offset = (ptr+4) - client->refbuf->data; - break; - } - ptr = strstr(client->refbuf->data, "\n\n"); - if (ptr) { - stream_offset = (ptr+2) - client->refbuf->data; - break; - } - pass_it = 0; - } while (0); - - ICECAST_LOG_DDEBUG("pass_it=%i, len=%i", pass_it, (int)len); - ICECAST_LOG_DDEBUG("Client %p has buffer: %H", client, client->refbuf->data); - - if (pass_it) { - if (stream_offset != -1) { - connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset); - node->offset = stream_offset; - } - if ((client_queue_entry_t **)_req_queue_tail == &(node->next)) - _req_queue_tail = (volatile client_queue_entry_t **)node_ref; - *node_ref = node->next; - node->next = NULL; - _add_connection(node); - continue; - } - } else { - if (len == 0 || client->con->error) { - if ((client_queue_entry_t **)_req_queue_tail == &node->next) - _req_queue_tail = (volatile client_queue_entry_t **)node_ref; - *node_ref = node->next; - client_destroy(client); - free(node); - continue; - } - } - node_ref = &node->next; } + _handle_connection(); } -/* add client to body queue. - */ -static void _add_body_client(client_queue_entry_t *node) -{ - ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client); - - thread_spin_lock(&_connection_lock); - *_body_queue_tail = node; - _body_queue_tail = (volatile client_queue_entry_t **) &node->next; - thread_spin_unlock(&_connection_lock); -} - static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t *node, time_t timeout, size_t body_size_limit) { client_t *client = node->client; @@ -586,22 +597,20 @@ static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t /* This queue reads data from the body of clients. */ static void process_request_body_queue (void) { - client_queue_entry_t **node_ref = (client_queue_entry_t **)&_body_queue; + client_queue_entry_t *stop = NULL; + client_queue_entry_t *node; ice_config_t *config; time_t timeout; size_t body_size_limit; ICECAST_LOG_DDEBUG("Processing body queue."); - ICECAST_LOG_DDEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail); - config = config_get_config(); timeout = time(NULL) - config->body_timeout; body_size_limit = config->body_size_limit; config_release_config(); - while (*node_ref) { - client_queue_entry_t *node = *node_ref; + while ((node = client_queue_shift(&_body_queue, stop))) { client_t *client = node->client; client_slurp_result_t res; @@ -611,29 +620,19 @@ static void process_request_body_queue (void) res = process_request_body_queue_one(node, timeout, body_size_limit); - if (res != CLIENT_SLURP_NEEDS_MORE_DATA) { + if (res == CLIENT_SLURP_NEEDS_MORE_DATA) { + client_queue_add(&_body_queue, node); + if (!stop) + stop = node; + } else { ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); - if ((client_queue_entry_t **)_body_queue_tail == &(node->next)) - _body_queue_tail = (volatile client_queue_entry_t **)node_ref; - *node_ref = node->next; - node->next = NULL; - _add_connection(node); + client_queue_add(&_connection_queue, node); continue; } - node_ref = &node->next; } } -/* add node to the queue of requests. This is where the clients are when - * initial http details are read. - */ -static void _add_request_queue(client_queue_entry_t *node) -{ - *_req_queue_tail = node; - _req_queue_tail = (volatile client_queue_entry_t **)&node->next; -} - static client_queue_entry_t *create_client_node(client_t *client) { client_queue_entry_t *node = calloc (1, sizeof (client_queue_entry_t)); @@ -692,7 +691,7 @@ void connection_queue(connection_t *con) return; } - _add_request_queue(node); + client_queue_add(&_request_queue, node); stats_event_inc(NULL, "connections"); } @@ -713,7 +712,7 @@ void connection_accept_loop(void) connection_queue(con); duration = 5; } else { - if (_req_queue == NULL) + if (client_queue_empty(&_request_queue) && client_queue_empty(&_body_queue)) duration = 300; /* use longer timeouts when nothing waiting */ } process_request_queue(); @@ -863,7 +862,7 @@ static void _handle_shoutcast_compatible(client_queue_entry_t *node) memmove(client->refbuf->data, headers, node->offset+1); node->shoutcast = 2; /* we've checked the password, now send it back for reading headers */ - _add_request_queue(node); + client_queue_add(&_request_queue, node); ICECAST_LOG_DDEBUG("Client %p re-added to request queue", client); return; } @@ -937,7 +936,7 @@ static void _handle_connection(void) client_queue_entry_t *node; while (1) { - node = _get_connection(); + node = client_queue_shift(&_connection_queue, NULL); if (node) { client_t *client = node->client; int already_parsed = 0; @@ -964,7 +963,7 @@ static void _handle_connection(void) /* early check if we need more data */ client_complete(client); if (_need_body(node)) { - /* Just calling _add_body_client() would do the job. + /* Just calling client_queue_add(&_body_queue, node) would do the job. * However, if the client only has a small body this might work without moving it between queues. * -> much faster. */ @@ -980,7 +979,8 @@ static void _handle_connection(void) res = process_request_body_queue_one(node, timeout, body_size_limit); if (res != CLIENT_SLURP_SUCCESS) { - _add_body_client(node); + ICECAST_LOG_DEBUG("Putting client %p in body queue.", client); + client_queue_add(&_body_queue, node); continue; } else { ICECAST_LOG_DEBUG("Success on fast lane"); @@ -1080,5 +1080,5 @@ void connection_close(connection_t *con) void connection_queue_client(client_t *client) { client_queue_entry_t *node = create_client_node(client); - _add_connection(node); + client_queue_add(&_connection_queue, node); }