1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-06-23 06:25:24 +00:00

Update: Reworked connection queues

This commit is contained in:
Philipp Schafft 2022-03-19 19:05:53 +00:00
parent b10e302c2a
commit c34f715f0d

View File

@ -93,13 +93,19 @@ typedef struct client_queue_tag {
struct client_queue_tag *next;
} client_queue_entry_t;
static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail
typedef struct {
client_queue_entry_t *head;
client_queue_entry_t **tail;
mutex_t mutex;
} client_queue_t;
static spin_t _connection_lock; // protects _current_id
static volatile connection_id_t _current_id = 0;
static int _initialized = 0;
static volatile client_queue_entry_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
static volatile client_queue_entry_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
static volatile client_queue_entry_t *_body_queue = NULL, **_body_queue_tail = &_body_queue;
static client_queue_t _request_queue;
static client_queue_t _connection_queue;
static client_queue_t _body_queue;
static bool tls_ok = false;
static tls_ctx_t *tls_ctx;
@ -111,6 +117,54 @@ rwlock_t _source_shutdown_rwlock;
static void _handle_connection(void);
static void get_tls_certificate(ice_config_t *config);
static void client_queue_init(client_queue_t *queue)
{
memset(queue, 0, sizeof(*queue));
queue->tail = &(queue->head);
thread_mutex_create(&(queue->mutex));
}
static void client_queue_destroy(client_queue_t *queue)
{
thread_mutex_destroy(&(queue->mutex));
}
static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry)
{
thread_mutex_lock(&(queue->mutex));
*(queue->tail) = entry;
queue->tail = &(entry->next);
thread_mutex_unlock(&(queue->mutex));
}
static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_queue_entry_t *stop)
{
client_queue_entry_t *ret;
thread_mutex_lock(&(queue->mutex));
ret = queue->head;
if (ret) {
if (ret == stop) {
ret = NULL;
} else {
queue->head = ret->next;
if (!queue->head) {
queue->tail = &(queue->head);
}
ret->next = NULL;
}
}
thread_mutex_unlock(&(queue->mutex));
return ret;
}
static bool client_queue_empty(client_queue_t *queue)
{
/* No need to lock here as this is a point-in-time thing anyway */
return queue->head == NULL ? true : false;
}
void connection_initialize(void)
{
if (_initialized)
@ -119,12 +173,9 @@ void connection_initialize(void)
thread_spin_create (&_connection_lock);
thread_mutex_create(&move_clients_mutex);
thread_rwlock_create(&_source_shutdown_rwlock);
_req_queue = NULL;
_req_queue_tail = &_req_queue;
_con_queue = NULL;
_con_queue_tail = &_con_queue;
_body_queue = NULL;
_body_queue_tail = &_body_queue;
client_queue_init(&_request_queue);
client_queue_init(&_connection_queue);
client_queue_init(&_body_queue);
_initialized = 1;
}
@ -141,6 +192,9 @@ void connection_shutdown(void)
thread_rwlock_destroy(&_source_shutdown_rwlock);
thread_spin_destroy (&_connection_lock);
thread_mutex_destroy(&move_clients_mutex);
client_queue_destroy(&_request_queue);
client_queue_destroy(&_connection_queue);
client_queue_destroy(&_body_queue);
_initialized = 0;
}
@ -391,161 +445,118 @@ int connection_read_put_back(connection_t *con, const void *buf, size_t len)
}
}
/* add client to connection queue. At this point some header information
* has been collected, so we now pass it onto the connection thread for
* further processing
*/
static void _add_connection(client_queue_entry_t *node)
/* run along queue checking for any data that has come in or a timeout */
static bool process_request_queue_one (client_queue_entry_t *node, int timeout)
{
thread_spin_lock(&_connection_lock);
*_con_queue_tail = node;
_con_queue_tail = (volatile client_queue_entry_t **) &node->next;
thread_spin_unlock(&_connection_lock);
}
client_t *client = node->client;
int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
char *buf = client->refbuf->data + node->offset;
ICECAST_LOG_DDEBUG("Checking on client %p", client);
/* this returns queued clients for the connection thread. headers are
* already provided, but need to be parsed.
*/
static client_queue_entry_t *_get_connection(void)
{
client_queue_entry_t *node = NULL;
thread_spin_lock(&_connection_lock);
if (_con_queue){
node = (client_queue_entry_t *)_con_queue;
_con_queue = node->next;
if (_con_queue == NULL)
_con_queue_tail = &_con_queue;
node->next = NULL;
if (client->con->tlsmode == ICECAST_TLSMODE_AUTO || client->con->tlsmode == ICECAST_TLSMODE_AUTO_NO_PLAIN) {
char peak;
if (recv(client->con->sock, &peak, 1, MSG_PEEK) == 1) {
if (peak == 0x16) { /* TLS Record Protocol Content type 0x16 == Handshake */
connection_uses_tls(client->con);
}
}
}
thread_spin_unlock(&_connection_lock);
return node;
if (len > 0) {
if (client->con->con_time + timeout <= time(NULL)) {
len = 0;
} else {
len = client_read_bytes(client, buf, len);
}
}
if (len > 0 || node->shoutcast > 1) {
ssize_t stream_offset = -1;
int pass_it = 1;
char *ptr;
if (len < 0 && node->shoutcast > 1)
len = 0;
/* handle \n, \r\n and nsvcap which for some strange reason has
* EOL as \r\r\n */
node->offset += len;
client->refbuf->data[node->offset] = '\000';
do {
if (node->shoutcast == 1) {
/* password line */
if (strstr (client->refbuf->data, "\r\r\n") != NULL)
break;
if (strstr (client->refbuf->data, "\r\n") != NULL)
break;
if (strstr (client->refbuf->data, "\n") != NULL)
break;
}
/* stream_offset refers to the start of any data sent after the
* http style headers, we don't want to lose those */
ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n");
if (ptr) {
stream_offset = (ptr+6) - client->refbuf->data;
break;
}
ptr = strstr(client->refbuf->data, "\r\n\r\n");
if (ptr) {
stream_offset = (ptr+4) - client->refbuf->data;
break;
}
ptr = strstr(client->refbuf->data, "\n\n");
if (ptr) {
stream_offset = (ptr+2) - client->refbuf->data;
break;
}
pass_it = 0;
} while (0);
ICECAST_LOG_DDEBUG("pass_it=%i, len=%i", pass_it, (int)len);
ICECAST_LOG_DDEBUG("Client %p has buffer: %H", client, client->refbuf->data);
if (pass_it) {
if (stream_offset != -1) {
connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset);
node->offset = stream_offset;
}
client_queue_add(&_connection_queue, node);
return true;
}
} else {
if (len == 0 || client->con->error) {
client_destroy(client);
free(node);
return true;
}
}
return false;
}
/* run along queue checking for any data that has come in or a timeout */
static void process_request_queue (void)
{
client_queue_entry_t **node_ref = (client_queue_entry_t **)&_req_queue;
client_queue_entry_t *stop = NULL;
client_queue_entry_t *node;
ice_config_t *config;
int timeout;
char peak;
config = config_get_config();
timeout = config->header_timeout;
config_release_config();
while (*node_ref) {
client_queue_entry_t *node = *node_ref;
client_t *client = node->client;
int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
char *buf = client->refbuf->data + node->offset;
ICECAST_LOG_DDEBUG("Checking on client %p", client);
if (client->con->tlsmode == ICECAST_TLSMODE_AUTO || client->con->tlsmode == ICECAST_TLSMODE_AUTO_NO_PLAIN) {
if (recv(client->con->sock, &peak, 1, MSG_PEEK) == 1) {
if (peak == 0x16) { /* TLS Record Protocol Content type 0x16 == Handshake */
connection_uses_tls(client->con);
}
}
while ((node = client_queue_shift(&_request_queue, stop))) {
if (!process_request_queue_one(node, timeout)) {
client_queue_add(&_request_queue, node);
if (!stop)
stop = node;
}
if (len > 0) {
if (client->con->con_time + timeout <= time(NULL)) {
len = 0;
} else {
len = client_read_bytes(client, buf, len);
}
}
if (len > 0 || node->shoutcast > 1) {
ssize_t stream_offset = -1;
int pass_it = 1;
char *ptr;
if (len < 0 && node->shoutcast > 1)
len = 0;
/* handle \n, \r\n and nsvcap which for some strange reason has
* EOL as \r\r\n */
node->offset += len;
client->refbuf->data[node->offset] = '\000';
do {
if (node->shoutcast == 1) {
/* password line */
if (strstr (client->refbuf->data, "\r\r\n") != NULL)
break;
if (strstr (client->refbuf->data, "\r\n") != NULL)
break;
if (strstr (client->refbuf->data, "\n") != NULL)
break;
}
/* stream_offset refers to the start of any data sent after the
* http style headers, we don't want to lose those */
ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n");
if (ptr) {
stream_offset = (ptr+6) - client->refbuf->data;
break;
}
ptr = strstr(client->refbuf->data, "\r\n\r\n");
if (ptr) {
stream_offset = (ptr+4) - client->refbuf->data;
break;
}
ptr = strstr(client->refbuf->data, "\n\n");
if (ptr) {
stream_offset = (ptr+2) - client->refbuf->data;
break;
}
pass_it = 0;
} while (0);
ICECAST_LOG_DDEBUG("pass_it=%i, len=%i", pass_it, (int)len);
ICECAST_LOG_DDEBUG("Client %p has buffer: %H", client, client->refbuf->data);
if (pass_it) {
if (stream_offset != -1) {
connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset);
node->offset = stream_offset;
}
if ((client_queue_entry_t **)_req_queue_tail == &(node->next))
_req_queue_tail = (volatile client_queue_entry_t **)node_ref;
*node_ref = node->next;
node->next = NULL;
_add_connection(node);
continue;
}
} else {
if (len == 0 || client->con->error) {
if ((client_queue_entry_t **)_req_queue_tail == &node->next)
_req_queue_tail = (volatile client_queue_entry_t **)node_ref;
*node_ref = node->next;
client_destroy(client);
free(node);
continue;
}
}
node_ref = &node->next;
}
_handle_connection();
}
/* add client to body queue.
*/
static void _add_body_client(client_queue_entry_t *node)
{
ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client);
thread_spin_lock(&_connection_lock);
*_body_queue_tail = node;
_body_queue_tail = (volatile client_queue_entry_t **) &node->next;
thread_spin_unlock(&_connection_lock);
}
static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t *node, time_t timeout, size_t body_size_limit)
{
client_t *client = node->client;
@ -586,22 +597,20 @@ static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t
/* This queue reads data from the body of clients. */
static void process_request_body_queue (void)
{
client_queue_entry_t **node_ref = (client_queue_entry_t **)&_body_queue;
client_queue_entry_t *stop = NULL;
client_queue_entry_t *node;
ice_config_t *config;
time_t timeout;
size_t body_size_limit;
ICECAST_LOG_DDEBUG("Processing body queue.");
ICECAST_LOG_DDEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail);
config = config_get_config();
timeout = time(NULL) - config->body_timeout;
body_size_limit = config->body_size_limit;
config_release_config();
while (*node_ref) {
client_queue_entry_t *node = *node_ref;
while ((node = client_queue_shift(&_body_queue, stop))) {
client_t *client = node->client;
client_slurp_result_t res;
@ -611,29 +620,19 @@ static void process_request_body_queue (void)
res = process_request_body_queue_one(node, timeout, body_size_limit);
if (res != CLIENT_SLURP_NEEDS_MORE_DATA) {
if (res == CLIENT_SLURP_NEEDS_MORE_DATA) {
client_queue_add(&_body_queue, node);
if (!stop)
stop = node;
} else {
ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client);
if ((client_queue_entry_t **)_body_queue_tail == &(node->next))
_body_queue_tail = (volatile client_queue_entry_t **)node_ref;
*node_ref = node->next;
node->next = NULL;
_add_connection(node);
client_queue_add(&_connection_queue, node);
continue;
}
node_ref = &node->next;
}
}
/* add node to the queue of requests. This is where the clients are when
* initial http details are read.
*/
static void _add_request_queue(client_queue_entry_t *node)
{
*_req_queue_tail = node;
_req_queue_tail = (volatile client_queue_entry_t **)&node->next;
}
static client_queue_entry_t *create_client_node(client_t *client)
{
client_queue_entry_t *node = calloc (1, sizeof (client_queue_entry_t));
@ -692,7 +691,7 @@ void connection_queue(connection_t *con)
return;
}
_add_request_queue(node);
client_queue_add(&_request_queue, node);
stats_event_inc(NULL, "connections");
}
@ -713,7 +712,7 @@ void connection_accept_loop(void)
connection_queue(con);
duration = 5;
} else {
if (_req_queue == NULL)
if (client_queue_empty(&_request_queue) && client_queue_empty(&_body_queue))
duration = 300; /* use longer timeouts when nothing waiting */
}
process_request_queue();
@ -863,7 +862,7 @@ static void _handle_shoutcast_compatible(client_queue_entry_t *node)
memmove(client->refbuf->data, headers, node->offset+1);
node->shoutcast = 2;
/* we've checked the password, now send it back for reading headers */
_add_request_queue(node);
client_queue_add(&_request_queue, node);
ICECAST_LOG_DDEBUG("Client %p re-added to request queue", client);
return;
}
@ -937,7 +936,7 @@ static void _handle_connection(void)
client_queue_entry_t *node;
while (1) {
node = _get_connection();
node = client_queue_shift(&_connection_queue, NULL);
if (node) {
client_t *client = node->client;
int already_parsed = 0;
@ -964,7 +963,7 @@ static void _handle_connection(void)
/* early check if we need more data */
client_complete(client);
if (_need_body(node)) {
/* Just calling _add_body_client() would do the job.
/* Just calling client_queue_add(&_body_queue, node) would do the job.
* However, if the client only has a small body this might work without moving it between queues.
* -> much faster.
*/
@ -980,7 +979,8 @@ static void _handle_connection(void)
res = process_request_body_queue_one(node, timeout, body_size_limit);
if (res != CLIENT_SLURP_SUCCESS) {
_add_body_client(node);
ICECAST_LOG_DEBUG("Putting client %p in body queue.", client);
client_queue_add(&_body_queue, node);
continue;
} else {
ICECAST_LOG_DEBUG("Success on fast lane");
@ -1080,5 +1080,5 @@ void connection_close(connection_t *con)
void connection_queue_client(client_t *client)
{
client_queue_entry_t *node = create_client_node(client);
_add_connection(node);
client_queue_add(&_connection_queue, node);
}