1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-12-04 14:46:30 -05:00

Update: Added basic client body slurping

This commit is contained in:
Philipp Schafft 2018-06-18 16:57:36 +00:00
parent bde88f82ae
commit 669707d312

View File

@ -91,6 +91,7 @@ static int _initialized = 0;
static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
static volatile client_queue_t *_body_queue = NULL, **_body_queue_tail = &_body_queue;
static int tls_ok; static int tls_ok;
static tls_ctx_t *tls_ctx; static tls_ctx_t *tls_ctx;
@ -115,6 +116,8 @@ void connection_initialize(void)
_req_queue_tail = &_req_queue; _req_queue_tail = &_req_queue;
_con_queue = NULL; _con_queue = NULL;
_con_queue_tail = &_con_queue; _con_queue_tail = &_con_queue;
_body_queue = NULL;
_body_queue_tail = &_body_queue;
_initialized = 1; _initialized = 1;
} }
@ -291,6 +294,7 @@ ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len)
if (len >= con->readbufferlen) { if (len >= con->readbufferlen) {
memcpy(buf, con->readbuffer, con->readbufferlen); memcpy(buf, con->readbuffer, con->readbufferlen);
free(con->readbuffer); free(con->readbuffer);
ICECAST_LOG_DEBUG("New fill in buffer=<empty>");
if (len == con->readbufferlen) { if (len == con->readbufferlen) {
con->readbufferlen = 0; con->readbufferlen = 0;
return len; return len;
@ -590,6 +594,50 @@ static void process_request_queue (void)
_handle_connection(); _handle_connection();
} }
/* add client to body queue.
*/
static void _add_body_client(client_queue_t *node)
{
ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client);
thread_spin_lock(&_connection_lock);
*_body_queue_tail = node;
_body_queue_tail = (volatile client_queue_t **) &node->next;
thread_spin_unlock(&_connection_lock);
}
/* This queue reads data from the body of clients. */
static void process_request_body_queue (void)
{
client_queue_t **node_ref = (client_queue_t **)&_body_queue;
ICECAST_LOG_DEBUG("Processing body queue.");
ICECAST_LOG_DEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail);
while (*node_ref) {
client_queue_t *node = *node_ref;
client_t *client = node->client;
client_slurp_result_t res;
ICECAST_LOG_DEBUG("Got client %p in body queue.", client);
res = client_body_skip(client);
if (res != CLIENT_SLURP_NEEDS_MORE_DATA) {
ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client);
if ((client_queue_t **)_body_queue_tail == &(node->next))
_body_queue_tail = (volatile client_queue_t **)node_ref;
*node_ref = node->next;
node->next = NULL;
_add_connection(node);
continue;
}
node_ref = &node->next;
}
}
/* add node to the queue of requests. This is where the clients are when /* add node to the queue of requests. This is where the clients are when
* initial http details are read. * initial http details are read.
@ -685,6 +733,7 @@ void connection_accept_loop(void)
duration = 300; /* use longer timeouts when nothing waiting */ duration = 300; /* use longer timeouts when nothing waiting */
} }
process_request_queue(); process_request_queue();
process_request_body_queue();
} }
/* Give all the other threads notification to shut down */ /* Give all the other threads notification to shut down */
@ -1109,6 +1158,7 @@ static void _handle_shoutcast_compatible(client_queue_t *node)
/* we may have more than just headers, so prepare for it */ /* we may have more than just headers, so prepare for it */
if (node->stream_offset != node->offset) { if (node->stream_offset != node->offset) {
connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset);
node->offset = node->stream_offset;
} }
client->refbuf->len = 0; client->refbuf->len = 0;
client->parser = parser; client->parser = parser;
@ -1470,6 +1520,26 @@ static void _update_client_request_body_length(client_t *client)
ICECAST_LOG_DEBUG("Client %p has request_body_length=%zi", client, client->request_body_length); ICECAST_LOG_DEBUG("Client %p has request_body_length=%zi", client, client->request_body_length);
} }
/* Check if we need body of client */
static int _need_body(client_t *client)
{
if (client->parser->req_type == httpp_req_source) {
/* SOURCE connection. */
return 0;
} else if (client->parser->req_type == httpp_req_put) {
/* PUT connection.
* TODO: We may need body for /admin/ but we do not know if it's an admin request yet.
*/
return 0;
} else if (client->request_body_length != -1 && (size_t)client->request_body_length != client->request_body_read) {
return 1;
} else if (client->request_body_length == -1 && client_body_eof(client) == 0) {
return 1;
}
return 0;
}
/* Connection thread. Here we take clients off the connection queue and check /* Connection thread. Here we take clients off the connection queue and check
* the contents provided. We set up the parser then hand off to the specific * the contents provided. We set up the parser then hand off to the specific
* request handler. * request handler.
@ -1509,9 +1579,17 @@ static void _handle_connection(void)
/* we may have more than just headers, so prepare for it */ /* we may have more than just headers, so prepare for it */
if (node->stream_offset != node->offset) { if (node->stream_offset != node->offset) {
connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset);
node->offset = node->stream_offset;
} }
client->refbuf->len = 0; client->refbuf->len = 0;
/* early check if we need more data */
_update_client_request_body_length(client);
if (_need_body(client)) {
_add_body_client(node);
continue;
}
rawuri = httpp_getvar(parser, HTTPP_VAR_URI); rawuri = httpp_getvar(parser, HTTPP_VAR_URI);
/* assign a port-based shoutcast mountpoint if required */ /* assign a port-based shoutcast mountpoint if required */
@ -1528,8 +1606,6 @@ static void _handle_connection(void)
continue; continue;
} }
_update_client_request_body_length(client);
upgrade = httpp_getvar(parser, "upgrade"); upgrade = httpp_getvar(parser, "upgrade");
connection = httpp_getvar(parser, "connection"); connection = httpp_getvar(parser, "connection");
if (upgrade && connection && strcasecmp(connection, "upgrade") == 0) { if (upgrade && connection && strcasecmp(connection, "upgrade") == 0) {