diff --git a/src/connection.c b/src/connection.c index ee1f9c2c..e3bb9890 100644 --- a/src/connection.c +++ b/src/connection.c @@ -91,6 +91,7 @@ static int _initialized = 0; static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; +static volatile client_queue_t *_body_queue = NULL, **_body_queue_tail = &_body_queue; static int tls_ok; static tls_ctx_t *tls_ctx; @@ -115,6 +116,8 @@ void connection_initialize(void) _req_queue_tail = &_req_queue; _con_queue = NULL; _con_queue_tail = &_con_queue; + _body_queue = NULL; + _body_queue_tail = &_body_queue; _initialized = 1; } @@ -291,6 +294,7 @@ ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len) if (len >= con->readbufferlen) { memcpy(buf, con->readbuffer, con->readbufferlen); free(con->readbuffer); + ICECAST_LOG_DEBUG("New fill in buffer="); if (len == con->readbufferlen) { con->readbufferlen = 0; return len; @@ -590,6 +594,50 @@ static void process_request_queue (void) _handle_connection(); } +/* add client to body queue. + */ +static void _add_body_client(client_queue_t *node) +{ + ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client); + + thread_spin_lock(&_connection_lock); + *_body_queue_tail = node; + _body_queue_tail = (volatile client_queue_t **) &node->next; + thread_spin_unlock(&_connection_lock); +} + + +/* This queue reads data from the body of clients. */ +static void process_request_body_queue (void) +{ + client_queue_t **node_ref = (client_queue_t **)&_body_queue; + + ICECAST_LOG_DEBUG("Processing body queue."); + + ICECAST_LOG_DEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail); + + while (*node_ref) { + client_queue_t *node = *node_ref; + client_t *client = node->client; + client_slurp_result_t res; + + ICECAST_LOG_DEBUG("Got client %p in body queue.", client); + + res = client_body_skip(client); + + if (res != CLIENT_SLURP_NEEDS_MORE_DATA) { + ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); + + if ((client_queue_t **)_body_queue_tail == &(node->next)) + _body_queue_tail = (volatile client_queue_t **)node_ref; + *node_ref = node->next; + node->next = NULL; + _add_connection(node); + continue; + } + node_ref = &node->next; + } +} /* add node to the queue of requests. This is where the clients are when * initial http details are read. @@ -685,6 +733,7 @@ void connection_accept_loop(void) duration = 300; /* use longer timeouts when nothing waiting */ } process_request_queue(); + process_request_body_queue(); } /* Give all the other threads notification to shut down */ @@ -1109,6 +1158,7 @@ static void _handle_shoutcast_compatible(client_queue_t *node) /* we may have more than just headers, so prepare for it */ if (node->stream_offset != node->offset) { connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); + node->offset = node->stream_offset; } client->refbuf->len = 0; client->parser = parser; @@ -1470,6 +1520,26 @@ static void _update_client_request_body_length(client_t *client) ICECAST_LOG_DEBUG("Client %p has request_body_length=%zi", client, client->request_body_length); } +/* Check if we need body of client */ +static int _need_body(client_t *client) +{ + if (client->parser->req_type == httpp_req_source) { + /* SOURCE connection. */ + return 0; + } else if (client->parser->req_type == httpp_req_put) { + /* PUT connection. + * TODO: We may need body for /admin/ but we do not know if it's an admin request yet. + */ + return 0; + } else if (client->request_body_length != -1 && (size_t)client->request_body_length != client->request_body_read) { + return 1; + } else if (client->request_body_length == -1 && client_body_eof(client) == 0) { + return 1; + } + + return 0; +} + /* Connection thread. Here we take clients off the connection queue and check * the contents provided. We set up the parser then hand off to the specific * request handler. @@ -1509,9 +1579,17 @@ static void _handle_connection(void) /* we may have more than just headers, so prepare for it */ if (node->stream_offset != node->offset) { connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); + node->offset = node->stream_offset; } client->refbuf->len = 0; + /* early check if we need more data */ + _update_client_request_body_length(client); + if (_need_body(client)) { + _add_body_client(node); + continue; + } + rawuri = httpp_getvar(parser, HTTPP_VAR_URI); /* assign a port-based shoutcast mountpoint if required */ @@ -1528,8 +1606,6 @@ static void _handle_connection(void) continue; } - _update_client_request_body_length(client); - upgrade = httpp_getvar(parser, "upgrade"); connection = httpp_getvar(parser, "connection"); if (upgrade && connection && strcasecmp(connection, "upgrade") == 0) {