From ece3786a0aab9db7a3aaef74f8feb1581a7f5214 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Tue, 17 Apr 2018 07:22:12 +0000 Subject: [PATCH 01/19] Fix: Check if we actually have source->client->con in case we want to access source->client->con->tls --- src/source.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/source.c b/src/source.c index ef8aa347..73120c0e 100644 --- a/src/source.c +++ b/src/source.c @@ -516,7 +516,7 @@ static refbuf_t *get_next_buffer (source_t *source) } source->last_read = current; refbuf = source->format->get_buffer (source); - if (source->client->con->tls && tls_got_shutdown(source->client->con->tls) > 1) + if (source->client->con && source->client->con->tls && tls_got_shutdown(source->client->con->tls) > 1) source->client->con->error = 1; if (source->client->con && source->client->con->error) { From 38436c3f6e3eabe9899593b71a244fd7fc3e346b Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Tue, 17 Apr 2018 07:29:49 +0000 Subject: [PATCH 02/19] Update: Abstract body read with client_body_read() and client_body_eof() --- src/client.c | 20 ++++++++++++++++++++ src/client.h | 2 ++ src/format_ebml.c | 2 +- src/format_mp3.c | 7 ++++--- src/format_ogg.c | 4 ++-- src/source.c | 5 +---- 6 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/client.c b/src/client.c index 44ae457e..279ac1ff 100644 --- a/src/client.c +++ b/src/client.c @@ -32,6 +32,7 @@ #include "refobject.h" #include "cfgfile.h" #include "connection.h" +#include "tls.h" #include "refbuf.h" #include "format.h" #include "stats.h" @@ -435,3 +436,22 @@ void client_set_queue(client_t *client, refbuf_t *refbuf) if (to_release) refbuf_release(to_release); } + +ssize_t client_body_read(client_t *client, void *buf, size_t len) +{ + return client_read_bytes(client, buf, len); +} + +int client_body_eof(client_t *client) +{ + if (!client->con) + return 0; + + if (client->con->tls && tls_got_shutdown(client->con->tls) > 1) + client->con->error = 1; + + if (client->con->error) + return 1; + + return 0; +} diff --git a/src/client.h b/src/client.h index d87a2465..81e78565 100644 --- a/src/client.h +++ b/src/client.h @@ -122,5 +122,7 @@ admin_format_t client_get_admin_format_by_content_negotiation(client_t *client); int client_send_bytes (client_t *client, const void *buf, unsigned len); int client_read_bytes (client_t *client, void *buf, unsigned len); void client_set_queue (client_t *client, refbuf_t *refbuf); +ssize_t client_body_read(client_t *client, void *buf, size_t len); +int client_body_eof(client_t *client); #endif /* __CLIENT_H__ */ diff --git a/src/format_ebml.c b/src/format_ebml.c index 5ac45d6b..0a9fe9bc 100644 --- a/src/format_ebml.c +++ b/src/format_ebml.c @@ -317,7 +317,7 @@ static refbuf_t *ebml_get_buffer(source_t *source) } else if(read_bytes == 0) { /* Feed more bytes into the parser */ write_buffer = ebml_get_write_buffer(ebml_source_state->ebml, &write_bytes); - read_bytes = client_read_bytes (source->client, write_buffer, write_bytes); + read_bytes = client_body_read(source->client, write_buffer, write_bytes); if (read_bytes <= 0) { ebml_wrote (ebml_source_state->ebml, 0); return NULL; diff --git a/src/format_mp3.c b/src/format_mp3.c index dae620c6..f652868d 100644 --- a/src/format_mp3.c +++ b/src/format_mp3.c @@ -465,7 +465,7 @@ static void format_mp3_free_plugin(format_plugin_t *self) */ static int complete_read(source_t *source) { - int bytes; + ssize_t bytes; format_plugin_t *format = source->format; mp3_state *source_mp3 = format->_state; char *buf; @@ -480,10 +480,11 @@ static int complete_read(source_t *source) } buf = source_mp3->read_data->data + source_mp3->read_count; - bytes = client_read_bytes (source->client, buf, REFBUF_SIZE-source_mp3->read_count); + bytes = client_body_read(source->client, buf, REFBUF_SIZE-source_mp3->read_count); if (bytes < 0) { - if (source->client->con->error) + /* Why do we do this here (not source.c)? -- ph3-der-loewe, 2018-04-17 */ + if (client_body_eof(source->client)) { refbuf_release (source_mp3->read_data); source_mp3->read_data = NULL; diff --git a/src/format_ogg.c b/src/format_ogg.c index 605279f8..c2795adb 100644 --- a/src/format_ogg.c +++ b/src/format_ogg.c @@ -402,7 +402,7 @@ static refbuf_t *ogg_get_buffer(source_t *source) ogg_state_t *ogg_info = source->format->_state; format_plugin_t *format = source->format; char *data = NULL; - int bytes = 0; + ssize_t bytes = 0; while (1) { @@ -449,7 +449,7 @@ static refbuf_t *ogg_get_buffer(source_t *source) /* we need more data to continue getting pages */ data = ogg_sync_buffer (&ogg_info->oy, 4096); - bytes = client_read_bytes (source->client, data, 4096); + bytes = client_body_read(source->client, data, 4096); if (bytes <= 0) { ogg_sync_wrote (&ogg_info->oy, 0); diff --git a/src/source.c b/src/source.c index 73120c0e..69d0f7f6 100644 --- a/src/source.c +++ b/src/source.c @@ -516,10 +516,7 @@ static refbuf_t *get_next_buffer (source_t *source) } source->last_read = current; refbuf = source->format->get_buffer (source); - if (source->client->con && source->client->con->tls && tls_got_shutdown(source->client->con->tls) > 1) - source->client->con->error = 1; - if (source->client->con && source->client->con->error) - { + if (client_body_eof(source->client)) { ICECAST_LOG_INFO("End of Stream %s", source->mount); source->running = 0; continue; From 460477230d8ce5c021e4d750c60fc777f96c79b0 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Tue, 17 Apr 2018 09:07:57 +0000 Subject: [PATCH 03/19] Feature: Consider encoding backend EOF state --- src/client.c | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/client.c b/src/client.c index 279ac1ff..6afb0f5e 100644 --- a/src/client.c +++ b/src/client.c @@ -439,11 +439,16 @@ void client_set_queue(client_t *client, refbuf_t *refbuf) ssize_t client_body_read(client_t *client, void *buf, size_t len) { + ICECAST_LOG_DEBUG("Reading from body (client=%p)", client); return client_read_bytes(client, buf, len); } -int client_body_eof(client_t *client) +/* we might un-static this if needed at some time in distant future. -- ph3-der-loewe, 2018-04-17 */ +static int client_eof(client_t *client) { + if (!client) + return -1; + if (!client->con) return 0; @@ -455,3 +460,22 @@ int client_body_eof(client_t *client) return 0; } + +int client_body_eof(client_t *client) +{ + int ret = -1; + + if (!client) + return -1; + + if (client->encoding) { + ICECAST_LOG_DEBUG("Looking for body EOF with encoding (client=%p)", client); + ret = httpp_encoding_eof(client->encoding, (int(*)(void*))client_eof, client); + } else { + ICECAST_LOG_DEBUG("Looking for body EOF without encoding (client=%p)", client); + ret = client_eof(client); + } + + ICECAST_LOG_DEBUG("... result is: %i (client=%p)", ret, client); + return ret; +} From f370e8833504dd367ec43a4baeb7dcd70f8c36c1 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 09:34:27 +0000 Subject: [PATCH 04/19] Feature: Implemented a way to put back data read from a connection --- src/connection.c | 91 +++++++++++++++++++++++++++++++++++++++++------- src/connection.h | 4 +++ 2 files changed, 82 insertions(+), 13 deletions(-) diff --git a/src/connection.c b/src/connection.c index 352a31cd..f83fca38 100644 --- a/src/connection.c +++ b/src/connection.c @@ -271,6 +271,12 @@ void connection_uses_tls(connection_t *con) if (con->tls) return; + if (con->readbufferlen) { + ICECAST_LOG_ERROR("Connection is now using TLS but has data put back. BAD. Discarding putback data."); + free(con->readbuffer); + con->readbufferlen = 0; + } + con->tlsmode = ICECAST_TLSMODE_RFC2818; con->read = connection_read_tls; con->send = connection_send_tls; @@ -282,7 +288,70 @@ void connection_uses_tls(connection_t *con) ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len) { - return con->read(con, buf, len); + ssize_t done = 0; + ssize_t ret; + + if (con->readbufferlen) { + ICECAST_LOG_DEBUG("On connection %p we read from putback buffer, filled with %zu bytes, requested are %zu bytes", con, con->readbufferlen, len); + if (len >= con->readbufferlen) { + memcpy(buf, con->readbuffer, con->readbufferlen); + free(con->readbuffer); + if (len == con->readbufferlen) { + con->readbufferlen = 0; + return len; + } else { + len -= con->readbufferlen; + buf += con->readbufferlen; + done = con->readbufferlen; + con->readbufferlen = 0; + } + } else { + memcpy(buf, con->readbuffer, len); + memmove(con->readbuffer, con->readbuffer+len, con->readbufferlen-len); + con->readbufferlen -= len; + return len; + } + } + + ret = con->read(con, buf, len); + + if (ret < 0) { + if (done == 0) { + return ret; + } else { + return done; + } + } + + return done + ret; +} + +int connection_read_put_back(connection_t *con, const void *buf, size_t len) +{ + void *n; + + if (con->readbufferlen) { + n = realloc(con->readbuffer, con->readbufferlen + len); + if (!n) + return -1; + + memcpy(n + con->readbufferlen, buf, len); + con->readbuffer = n; + con->readbufferlen += len; + + ICECAST_LOG_DEBUG("On connection %p %zu bytes have been put back.", con, len); + return 0; + } else { + n = malloc(len); + if (!n) + return -1; + + memcpy(n, buf, len); + con->readbuffer = n; + con->readbufferlen = len; + ICECAST_LOG_DEBUG("On connection %p %zu bytes have been put back.", con, len); + return 0; + } } static sock_t wait_for_serversock(int timeout) @@ -1043,13 +1112,10 @@ static void _handle_shoutcast_compatible(client_queue_t *node) httpp_initialize(parser, NULL); if (httpp_parse(parser, http_compliant, strlen(http_compliant))) { /* we may have more than just headers, so prepare for it */ - if (node->stream_offset == node->offset) { - client->refbuf->len = 0; - } else { - char *ptr = client->refbuf->data; - client->refbuf->len = node->offset - node->stream_offset; - memmove(ptr, ptr + node->stream_offset, client->refbuf->len); + if (node->stream_offset != node->offset) { + connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); } + client->refbuf->len = 0; client->parser = parser; client->protocol = ICECAST_PROTOCOL_SHOUTCAST; node->shoutcast = 0; @@ -1402,13 +1468,10 @@ static void _handle_connection(void) const char *upgrade, *connection; /* we may have more than just headers, so prepare for it */ - if (node->stream_offset == node->offset) { - client->refbuf->len = 0; - } else { - char *ptr = client->refbuf->data; - client->refbuf->len = node->offset - node->stream_offset; - memmove (ptr, ptr + node->stream_offset, client->refbuf->len); + if (node->stream_offset != node->offset) { + connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); } + client->refbuf->len = 0; rawuri = httpp_getvar(parser, HTTPP_VAR_URI); @@ -1579,5 +1642,7 @@ void connection_close(connection_t *con) sock_close(con->sock); if (con->ip) free(con->ip); + if (con->readbufferlen) + free(con->readbuffer); free(con); } diff --git a/src/connection.h b/src/connection.h index 3a989d7e..3782b4ae 100644 --- a/src/connection.h +++ b/src/connection.h @@ -40,6 +40,9 @@ struct connection_tag { int (*send)(connection_t *handle, const void *buf, size_t len); int (*read)(connection_t *handle, void *buf, size_t len); + void *readbuffer; + size_t readbufferlen; + char *ip; }; @@ -55,6 +58,7 @@ void connection_queue(connection_t *con); void connection_uses_tls(connection_t *con); ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len); +int connection_read_put_back(connection_t *con, const void *buf, size_t len); extern rwlock_t _source_shutdown_rwlock; From 01c35e2c4173ab0b1ca995a1945c77281197671b Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 09:48:57 +0000 Subject: [PATCH 05/19] Feature: Check if we know client's body length. If so do not allow reading more than it. --- src/client.c | 26 ++++++++++++++++++++++++-- src/client.h | 8 ++++++++ src/connection.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/src/client.c b/src/client.c index 6afb0f5e..f66ddaf4 100644 --- a/src/client.c +++ b/src/client.c @@ -87,6 +87,8 @@ int client_create(client_t **c_ptr, connection_t *con, http_parser_t *parser) client->con = con; client->parser = parser; client->protocol = ICECAST_PROTOCOL_HTTP; + client->request_body_length = 0; + client->request_body_read = 0; client->admin_command = ADMIN_COMMAND_ERROR; client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE); client->refbuf->len = 0; /* force reader code to ignore buffer contents */ @@ -439,8 +441,25 @@ void client_set_queue(client_t *client, refbuf_t *refbuf) ssize_t client_body_read(client_t *client, void *buf, size_t len) { + ssize_t ret; + ICECAST_LOG_DEBUG("Reading from body (client=%p)", client); - return client_read_bytes(client, buf, len); + + if (client->request_body_length != -1) { + size_t left = (size_t)client->request_body_length - client->request_body_read; + if (len > left) { + ICECAST_LOG_DEBUG("Limiting read request to left over body size: left %zu byte, requested %zu byte", left, len); + len = left; + } + } + + ret = client_read_bytes(client, buf, len); + + if (ret > 0) { + client->request_body_read += ret; + } + + return ret; } /* we might un-static this if needed at some time in distant future. -- ph3-der-loewe, 2018-04-17 */ @@ -468,7 +487,10 @@ int client_body_eof(client_t *client) if (!client) return -1; - if (client->encoding) { + if (client->request_body_length != -1 && client->request_body_read == (size_t)client->request_body_length) { + ICECAST_LOG_DEBUG("Reached given body length (client=%p)", client); + ret = 1; + } else if (client->encoding) { ICECAST_LOG_DEBUG("Looking for body EOF with encoding (client=%p)", client); ret = httpp_encoding_eof(client->encoding, (int(*)(void*))client_eof, client); } else { diff --git a/src/client.h b/src/client.h index 81e78565..c9a845ea 100644 --- a/src/client.h +++ b/src/client.h @@ -62,6 +62,14 @@ struct _client_tag { /* protocol client uses */ protocol_t protocol; + /* http request body length + * -1 for streaming (e.g. chunked), 0 for no body, >0 for NNN bytes + */ + ssize_t request_body_length; + + /* http request body length read so far */ + size_t request_body_read; + /* http response code for this client */ int respcode; diff --git a/src/connection.c b/src/connection.c index f83fca38..3acb3fa2 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1431,6 +1431,50 @@ static void __prepare_shoutcast_admin_cgi_request(client_t *client) global_unlock(); } +static void _update_client_request_body_length(client_t *client) +{ + const char *header; + long long unsigned int scannumber; + int have = 0; + + if (!have) { + if (client->parser->req_type == httpp_req_source) { + client->request_body_length = -1; /* streaming */ + have = 1; + } + } + + if (!have) { + header = httpp_getvar(client->parser, "transfer-encoding"); + if (header) { + if (strcasecmp(header, "identity") != 0) { + client->request_body_length = -1; /* streaming */ + have = 1; + } + } + } + + if (!have) { + header = httpp_getvar(client->parser, "content-length"); + if (header) { + if (sscanf(header, "%llu", &scannumber) == 1) { + client->request_body_length = scannumber; + have = 1; + } + } + } + + if (!have) { + if (client->parser->req_type == httpp_req_put) { + /* As we don't know yet, we asume this PUT is in streaming mode */ + client->request_body_length = -1; /* streaming */ + have = 1; + } + } + + ICECAST_LOG_DEBUG("Client %p has request_body_length=%zi", client, client->request_body_length); +} + /* Connection thread. Here we take clients off the connection queue and check * the contents provided. We set up the parser then hand off to the specific * request handler. @@ -1489,6 +1533,8 @@ static void _handle_connection(void) continue; } + _update_client_request_body_length(client); + upgrade = httpp_getvar(parser, "upgrade"); connection = httpp_getvar(parser, "connection"); if (upgrade && connection && strcasecmp(connection, "upgrade") == 0) { From 33dcf24d21d3bd3ae3865da9ff8493d1db433d79 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 10:27:15 +0000 Subject: [PATCH 06/19] Fix: Only reuse client IF we reached the end of it's body --- src/client.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/client.c b/src/client.c index f66ddaf4..8753e37d 100644 --- a/src/client.c +++ b/src/client.c @@ -140,8 +140,11 @@ void client_destroy(client_t *client) return; if (client->reuse != ICECAST_REUSE_CLOSE) { - client_reuseconnection(client); - return; + /* only reuse the client if we reached the body's EOF. */ + if (client_body_eof(client) == 1) { + client_reuseconnection(client); + return; + } } /* release the buffer now, as the buffer could be on the source queue From b0c7da36a09a9cba46d0c920dedebb070d8415f4 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 10:31:13 +0000 Subject: [PATCH 07/19] Fix: Fixed HTTP/1.1 pipelineing --- src/client.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/client.c b/src/client.c index 8753e37d..1eb2f289 100644 --- a/src/client.c +++ b/src/client.c @@ -124,6 +124,16 @@ static inline void client_reuseconnection(client_t *client) { client->con->send = NULL; } + if (client->con->readbufferlen) { + /* Aend... moorre paaiin. + * stealing putback buffer. + */ + con->readbuffer = client->con->readbuffer; + con->readbufferlen = client->con->readbufferlen; + client->con->readbuffer = NULL; + client->con->readbufferlen = 0; + } + client->reuse = ICECAST_REUSE_CLOSE; client_destroy(client); From 1c7329cfbf7508f446c4ab829b445f67fe61916f Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 11:05:23 +0000 Subject: [PATCH 08/19] Feature: Added some client request body slurping helpers --- src/client.c | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/client.h | 9 +++++ 2 files changed, 109 insertions(+) diff --git a/src/client.c b/src/client.c index 1eb2f289..2c6efd86 100644 --- a/src/client.c +++ b/src/client.c @@ -514,3 +514,103 @@ int client_body_eof(client_t *client) ICECAST_LOG_DEBUG("... result is: %i (client=%p)", ret, client); return ret; } + +client_slurp_result_t client_body_slurp(client_t *client, void *buf, size_t *len) +{ + if (!client || !buf || !len) + return CLIENT_SLURP_ERROR; + + if (client->request_body_length != -1) { + /* non-streaming mode */ + size_t left = (size_t)client->request_body_length - client->request_body_read; + size_t ret; + + if (!left) + return CLIENT_SLURP_SUCCESS; + + if (*len < client->request_body_length) + return CLIENT_SLURP_BUFFER_TO_SMALL; + + if (left > 2048) + left = 2048; + + client_body_read(client, buf + client->request_body_read, left); + + if (client->request_body_length == client->request_body_read) { + *len = client->request_body_read; + + return CLIENT_SLURP_SUCCESS; + } else { + return CLIENT_SLURP_NEEDS_MORE_DATA; + } + } else { + /* streaming mode */ + size_t left = *len - client->request_body_read; + int ret; + + if (left) { + if (left > 2048) + left = 2048; + + client_body_read(client, buf + client->request_body_read, left); + } + + ret = client_body_eof(client); + switch (ret) { + case 0: + if (*len == client->request_body_read) { + return CLIENT_SLURP_BUFFER_TO_SMALL; + } + return CLIENT_SLURP_NEEDS_MORE_DATA; + break; + case 1: + return CLIENT_SLURP_SUCCESS; + break; + default: + return CLIENT_SLURP_ERROR; + break; + } + } +} + +client_slurp_result_t client_body_skip(client_t *client) +{ + char buf[2048]; + int ret; + + if (!client) + return CLIENT_SLURP_ERROR; + + if (client->request_body_length != -1) { + size_t left = (size_t)client->request_body_length - client->request_body_read; + + if (!left) + return CLIENT_SLURP_SUCCESS; + + if (left > sizeof(buf)) + left = sizeof(buf); + + client_body_read(client, buf, left); + + if (client->request_body_length == client->request_body_read) { + return CLIENT_SLURP_SUCCESS; + } else { + return CLIENT_SLURP_NEEDS_MORE_DATA; + } + } else { + client_body_read(client, buf, sizeof(buf)); + } + + ret = client_body_eof(client); + switch (ret) { + case 0: + return CLIENT_SLURP_NEEDS_MORE_DATA; + break; + case 1: + return CLIENT_SLURP_SUCCESS; + break; + default: + return CLIENT_SLURP_ERROR; + break; + } +} diff --git a/src/client.h b/src/client.h index c9a845ea..f097fa67 100644 --- a/src/client.h +++ b/src/client.h @@ -43,6 +43,13 @@ typedef enum _reuse_tag { ICECAST_REUSE_UPGRADETLS } reuse_t; +typedef enum { + CLIENT_SLURP_ERROR, + CLIENT_SLURP_NEEDS_MORE_DATA, + CLIENT_SLURP_BUFFER_TO_SMALL, + CLIENT_SLURP_SUCCESS +} client_slurp_result_t; + struct _client_tag { /* mode of operation for this client */ operation_mode mode; @@ -132,5 +139,7 @@ int client_read_bytes (client_t *client, void *buf, unsigned len); void client_set_queue (client_t *client, refbuf_t *refbuf); ssize_t client_body_read(client_t *client, void *buf, size_t len); int client_body_eof(client_t *client); +client_slurp_result_t client_body_slurp(client_t *client, void *buf, size_t *len); +client_slurp_result_t client_body_skip(client_t *client); #endif /* __CLIENT_H__ */ From e9624ef52336483a5157e21cbc53706d37f2dbee Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 15:18:41 +0000 Subject: [PATCH 09/19] Cleanup: Removed unused structure --- src/connection.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/connection.c b/src/connection.c index 3acb3fa2..ee1f9c2c 100644 --- a/src/connection.c +++ b/src/connection.c @@ -85,11 +85,6 @@ typedef struct client_queue_tag { struct client_queue_tag *next; } client_queue_t; -typedef struct _thread_queue_tag { - thread_type *thread_id; - struct _thread_queue_tag *next; -} thread_queue_t; - static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail static volatile unsigned long _current_id = 0; static int _initialized = 0; From bde88f82ae5eb95805d91295f2ffab6eef12af09 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 16:55:37 +0000 Subject: [PATCH 10/19] Update: Added debug level logging for client slurping --- src/client.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/client.c b/src/client.c index 2c6efd86..7168839a 100644 --- a/src/client.c +++ b/src/client.c @@ -578,14 +578,20 @@ client_slurp_result_t client_body_skip(client_t *client) char buf[2048]; int ret; - if (!client) + ICECAST_LOG_DEBUG("Slurping client %p"); + + if (!client) { + ICECAST_LOG_DEBUG("Slurping client %p ... failed"); return CLIENT_SLURP_ERROR; + } if (client->request_body_length != -1) { size_t left = (size_t)client->request_body_length - client->request_body_read; - if (!left) + if (!left) { + ICECAST_LOG_DEBUG("Slurping client %p ... was a success"); return CLIENT_SLURP_SUCCESS; + } if (left > sizeof(buf)) left = sizeof(buf); @@ -593,8 +599,10 @@ client_slurp_result_t client_body_skip(client_t *client) client_body_read(client, buf, left); if (client->request_body_length == client->request_body_read) { + ICECAST_LOG_DEBUG("Slurping client %p ... was a success"); return CLIENT_SLURP_SUCCESS; } else { + ICECAST_LOG_DEBUG("Slurping client %p ... needs more data"); return CLIENT_SLURP_NEEDS_MORE_DATA; } } else { @@ -604,12 +612,15 @@ client_slurp_result_t client_body_skip(client_t *client) ret = client_body_eof(client); switch (ret) { case 0: + ICECAST_LOG_DEBUG("Slurping client %p ... needs more data"); return CLIENT_SLURP_NEEDS_MORE_DATA; break; case 1: + ICECAST_LOG_DEBUG("Slurping client %p ... was a success"); return CLIENT_SLURP_SUCCESS; break; default: + ICECAST_LOG_DEBUG("Slurping client %p ... failed"); return CLIENT_SLURP_ERROR; break; } From 669707d312c97b74ade1a3a6e27324ff8bbf307e Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 16:57:36 +0000 Subject: [PATCH 11/19] Update: Added basic client body slurping --- src/connection.c | 80 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 2 deletions(-) diff --git a/src/connection.c b/src/connection.c index ee1f9c2c..e3bb9890 100644 --- a/src/connection.c +++ b/src/connection.c @@ -91,6 +91,7 @@ static int _initialized = 0; static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; +static volatile client_queue_t *_body_queue = NULL, **_body_queue_tail = &_body_queue; static int tls_ok; static tls_ctx_t *tls_ctx; @@ -115,6 +116,8 @@ void connection_initialize(void) _req_queue_tail = &_req_queue; _con_queue = NULL; _con_queue_tail = &_con_queue; + _body_queue = NULL; + _body_queue_tail = &_body_queue; _initialized = 1; } @@ -291,6 +294,7 @@ ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len) if (len >= con->readbufferlen) { memcpy(buf, con->readbuffer, con->readbufferlen); free(con->readbuffer); + ICECAST_LOG_DEBUG("New fill in buffer="); if (len == con->readbufferlen) { con->readbufferlen = 0; return len; @@ -590,6 +594,50 @@ static void process_request_queue (void) _handle_connection(); } +/* add client to body queue. + */ +static void _add_body_client(client_queue_t *node) +{ + ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client); + + thread_spin_lock(&_connection_lock); + *_body_queue_tail = node; + _body_queue_tail = (volatile client_queue_t **) &node->next; + thread_spin_unlock(&_connection_lock); +} + + +/* This queue reads data from the body of clients. */ +static void process_request_body_queue (void) +{ + client_queue_t **node_ref = (client_queue_t **)&_body_queue; + + ICECAST_LOG_DEBUG("Processing body queue."); + + ICECAST_LOG_DEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail); + + while (*node_ref) { + client_queue_t *node = *node_ref; + client_t *client = node->client; + client_slurp_result_t res; + + ICECAST_LOG_DEBUG("Got client %p in body queue.", client); + + res = client_body_skip(client); + + if (res != CLIENT_SLURP_NEEDS_MORE_DATA) { + ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); + + if ((client_queue_t **)_body_queue_tail == &(node->next)) + _body_queue_tail = (volatile client_queue_t **)node_ref; + *node_ref = node->next; + node->next = NULL; + _add_connection(node); + continue; + } + node_ref = &node->next; + } +} /* add node to the queue of requests. This is where the clients are when * initial http details are read. @@ -685,6 +733,7 @@ void connection_accept_loop(void) duration = 300; /* use longer timeouts when nothing waiting */ } process_request_queue(); + process_request_body_queue(); } /* Give all the other threads notification to shut down */ @@ -1109,6 +1158,7 @@ static void _handle_shoutcast_compatible(client_queue_t *node) /* we may have more than just headers, so prepare for it */ if (node->stream_offset != node->offset) { connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); + node->offset = node->stream_offset; } client->refbuf->len = 0; client->parser = parser; @@ -1470,6 +1520,26 @@ static void _update_client_request_body_length(client_t *client) ICECAST_LOG_DEBUG("Client %p has request_body_length=%zi", client, client->request_body_length); } +/* Check if we need body of client */ +static int _need_body(client_t *client) +{ + if (client->parser->req_type == httpp_req_source) { + /* SOURCE connection. */ + return 0; + } else if (client->parser->req_type == httpp_req_put) { + /* PUT connection. + * TODO: We may need body for /admin/ but we do not know if it's an admin request yet. + */ + return 0; + } else if (client->request_body_length != -1 && (size_t)client->request_body_length != client->request_body_read) { + return 1; + } else if (client->request_body_length == -1 && client_body_eof(client) == 0) { + return 1; + } + + return 0; +} + /* Connection thread. Here we take clients off the connection queue and check * the contents provided. We set up the parser then hand off to the specific * request handler. @@ -1509,9 +1579,17 @@ static void _handle_connection(void) /* we may have more than just headers, so prepare for it */ if (node->stream_offset != node->offset) { connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); + node->offset = node->stream_offset; } client->refbuf->len = 0; + /* early check if we need more data */ + _update_client_request_body_length(client); + if (_need_body(client)) { + _add_body_client(node); + continue; + } + rawuri = httpp_getvar(parser, HTTPP_VAR_URI); /* assign a port-based shoutcast mountpoint if required */ @@ -1528,8 +1606,6 @@ static void _handle_connection(void) continue; } - _update_client_request_body_length(client); - upgrade = httpp_getvar(parser, "upgrade"); connection = httpp_getvar(parser, "connection"); if (upgrade && connection && strcasecmp(connection, "upgrade") == 0) { From 19dda79146bbd461faa1a9babd7482131ecf550a Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 17:34:18 +0000 Subject: [PATCH 12/19] Update: Added timeout and size limit to client body queue handling --- src/cfgfile.c | 10 ++++++++++ src/cfgfile.h | 3 +++ src/connection.c | 10 +++++++++- 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/cfgfile.c b/src/cfgfile.c index f78095ee..2dd2e547 100644 --- a/src/cfgfile.c +++ b/src/cfgfile.c @@ -49,11 +49,13 @@ #define CONFIG_DEFAULT_CLIENT_LIMIT 256 #define CONFIG_DEFAULT_SOURCE_LIMIT 16 #define CONFIG_DEFAULT_QUEUE_SIZE_LIMIT (500*1024) +#define CONFIG_DEFAULT_BODY_SIZE_LIMIT (4*1024) #define CONFIG_DEFAULT_BURST_SIZE (64*1024) #define CONFIG_DEFAULT_THREADPOOL_SIZE 4 #define CONFIG_DEFAULT_CLIENT_TIMEOUT 30 #define CONFIG_DEFAULT_HEADER_TIMEOUT 15 #define CONFIG_DEFAULT_SOURCE_TIMEOUT 10 +#define CONFIG_DEFAULT_BODY_TIMEOUT (10 + CONFIG_DEFAULT_HEADER_TIMEOUT) #define CONFIG_DEFAULT_MASTER_USERNAME "relay" #define CONFIG_DEFAULT_SHOUTCAST_MOUNT "/stream" #define CONFIG_DEFAULT_SHOUTCAST_USER "source" @@ -801,12 +803,16 @@ static void _set_defaults(ice_config_t *configuration) ->source_limit = CONFIG_DEFAULT_SOURCE_LIMIT; configuration ->queue_size_limit = CONFIG_DEFAULT_QUEUE_SIZE_LIMIT; + configuration + ->body_size_limit = CONFIG_DEFAULT_BODY_SIZE_LIMIT; configuration ->client_timeout = CONFIG_DEFAULT_CLIENT_TIMEOUT; configuration ->header_timeout = CONFIG_DEFAULT_HEADER_TIMEOUT; configuration ->source_timeout = CONFIG_DEFAULT_SOURCE_TIMEOUT; + configuration + ->source_timeout = CONFIG_DEFAULT_BODY_TIMEOUT; configuration ->shoutcast_mount = (char *) xmlCharStrdup(CONFIG_DEFAULT_SHOUTCAST_MOUNT); configuration @@ -1123,6 +1129,8 @@ static void _parse_limits(xmlDocPtr doc, __read_int(doc, node, &configuration->client_limit, " must not be empty."); } else if (xmlStrcmp(node->name, XMLSTR("sources")) == 0) { __read_int(doc, node, &configuration->source_limit, " must not be empty."); + } else if (xmlStrcmp(node->name, XMLSTR("bodysize")) == 0) { + __read_int(doc, node, &configuration->body_size_limit, " must not be empty."); } else if (xmlStrcmp(node->name, XMLSTR("queue-size")) == 0) { __read_unsigned_int(doc, node, &configuration->queue_size_limit, " must not be empty."); } else if (xmlStrcmp(node->name, XMLSTR("threadpool")) == 0) { @@ -1134,6 +1142,8 @@ static void _parse_limits(xmlDocPtr doc, __read_int(doc, node, &configuration->header_timeout, " must not be empty."); } else if (xmlStrcmp(node->name, XMLSTR("source-timeout")) == 0) { __read_int(doc, node, &configuration->source_timeout, " must not be empty."); + } else if (xmlStrcmp(node->name, XMLSTR("body-timeout")) == 0) { + __read_int(doc, node, &configuration->body_timeout, " must not be empty."); } else if (xmlStrcmp(node->name, XMLSTR("burst-on-connect")) == 0) { ICECAST_LOG_WARN(" is deprecated, use instead."); tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); diff --git a/src/cfgfile.h b/src/cfgfile.h index 25fbfbbb..55ab1d98 100644 --- a/src/cfgfile.h +++ b/src/cfgfile.h @@ -26,6 +26,7 @@ #include "common/thread/thread.h" #include "common/avl/avl.h" #include "icecasttypes.h" +#include "compat.h" #define XMLSTR(str) ((xmlChar *)(str)) @@ -170,11 +171,13 @@ struct ice_config_tag { int client_limit; int source_limit; + int body_size_limit; unsigned int queue_size_limit; unsigned int burst_size; int client_timeout; int header_timeout; int source_timeout; + int body_timeout; int fileserve; int on_demand; /* global setting for all relays */ diff --git a/src/connection.c b/src/connection.c index e3bb9890..33022ebe 100644 --- a/src/connection.c +++ b/src/connection.c @@ -611,11 +611,19 @@ static void _add_body_client(client_queue_t *node) static void process_request_body_queue (void) { client_queue_t **node_ref = (client_queue_t **)&_body_queue; + ice_config_t *config; + time_t timeout; + size_t body_size_limit; ICECAST_LOG_DEBUG("Processing body queue."); ICECAST_LOG_DEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail); + config = config_get_config(); + timeout = time(NULL) - config->body_timeout; + body_size_limit = config->body_size_limit; + config_release_config(); + while (*node_ref) { client_queue_t *node = *node_ref; client_t *client = node->client; @@ -625,7 +633,7 @@ static void process_request_body_queue (void) res = client_body_skip(client); - if (res != CLIENT_SLURP_NEEDS_MORE_DATA) { + if (res != CLIENT_SLURP_NEEDS_MORE_DATA || client->con->con_time <= timeout || client->request_body_read >= body_size_limit) { ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); if ((client_queue_t **)_body_queue_tail == &(node->next)) From 44ebc3cf224f066984827fcd8615d9c5fa127719 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 21:08:17 +0000 Subject: [PATCH 13/19] Update: Added protection against re-queueing a client for reading body that has been queued once already --- src/connection.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/connection.c b/src/connection.c index 33022ebe..2f57c55a 100644 --- a/src/connection.c +++ b/src/connection.c @@ -82,6 +82,7 @@ typedef struct client_queue_tag { int stream_offset; int shoutcast; char *shoutcast_mount; + int tried_body; struct client_queue_tag *next; } client_queue_t; @@ -629,6 +630,8 @@ static void process_request_body_queue (void) client_t *client = node->client; client_slurp_result_t res; + node->tried_body = 1; + ICECAST_LOG_DEBUG("Got client %p in body queue.", client); res = client_body_skip(client); @@ -1529,8 +1532,13 @@ static void _update_client_request_body_length(client_t *client) } /* Check if we need body of client */ -static int _need_body(client_t *client) +static int _need_body(client_queue_t *node) { + client_t *client = node->client; + + if (node->tried_body) + return 0; + if (client->parser->req_type == httpp_req_source) { /* SOURCE connection. */ return 0; @@ -1593,7 +1601,7 @@ static void _handle_connection(void) /* early check if we need more data */ _update_client_request_body_length(client); - if (_need_body(client)) { + if (_need_body(node)) { _add_body_client(node); continue; } From a466900ae1de2a421bcb76026318da0a15d517bb Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 21:40:47 +0000 Subject: [PATCH 14/19] Feature: Allow POST for admin requests --- src/admin.c | 7 ++++--- src/common | 2 +- src/connection.c | 26 +++++++++++++++++++++++++- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/admin.c b/src/admin.c index 667ae59f..73a3b951 100644 --- a/src/admin.c +++ b/src/admin.c @@ -52,7 +52,7 @@ /* Helper macros */ #define COMMAND_REQUIRE(client,name,var) \ do { \ - (var) = httpp_get_query_param((client)->parser, (name)); \ + (var) = httpp_get_param((client)->parser, (name)); \ if((var) == NULL) { \ client_send_error_by_id(client, ICECAST_ERROR_ADMIN_MISSING_PARAMETER); \ return; \ @@ -60,7 +60,7 @@ } while(0); #define COMMAND_OPTIONAL(client,name,var) \ -(var) = httpp_get_query_param((client)->parser, (name)) +(var) = httpp_get_param((client)->parser, (name)) /* special commands */ #define COMMAND_ERROR ADMIN_COMMAND_ERROR @@ -502,7 +502,7 @@ void admin_handle_request(client_t *client, const char *uri) } } - mount = httpp_get_query_param(client->parser, "mount"); + COMMAND_OPTIONAL(client, "mount", mount); /* Find mountpoint source */ if(mount != NULL) { @@ -543,6 +543,7 @@ void admin_handle_request(client_t *client, const char *uri) switch (client->parser->req_type) { case httpp_req_get: + case httpp_req_post: handler->function(client, source, format); break; case httpp_req_options: diff --git a/src/common b/src/common index fca416b1..9bfb3a34 160000 --- a/src/common +++ b/src/common @@ -1 +1 @@ -Subproject commit fca416b126cb842034ac3468362c044895975b5a +Subproject commit 9bfb3a34fc41cc8e0075328d7d6527bd84eb40ba diff --git a/src/connection.c b/src/connection.c index 2f57c55a..8725b40d 100644 --- a/src/connection.c +++ b/src/connection.c @@ -82,6 +82,8 @@ typedef struct client_queue_tag { int stream_offset; int shoutcast; char *shoutcast_mount; + char *bodybuffer; + size_t bodybufferlen; int tried_body; struct client_queue_tag *next; } client_queue_t; @@ -634,7 +636,28 @@ static void process_request_body_queue (void) ICECAST_LOG_DEBUG("Got client %p in body queue.", client); - res = client_body_skip(client); + if (client->parser->req_type == httpp_req_post) { + if (node->bodybuffer == NULL && client->request_body_read == 0) { + if (client->request_body_length < 0) { + node->bodybufferlen = body_size_limit; + node->bodybuffer = malloc(node->bodybufferlen); + } else if (client->request_body_length <= (ssize_t)body_size_limit) { + node->bodybufferlen = client->request_body_length; + node->bodybuffer = malloc(node->bodybufferlen); + } + } + } + + if (node->bodybuffer) { + res = client_body_slurp(client, node->bodybuffer, &(node->bodybufferlen)); + if (res == CLIENT_SLURP_SUCCESS) { + httpp_parse_postdata(client->parser, node->bodybuffer, node->bodybufferlen); + free(node->bodybuffer); + node->bodybuffer = NULL; + } + } else { + res = client_body_skip(client); + } if (res != CLIENT_SLURP_NEEDS_MORE_DATA || client->con->con_time <= timeout || client->request_body_read >= body_size_limit) { ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); @@ -1612,6 +1635,7 @@ static void _handle_connection(void) if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); + free (node->bodybuffer); free (node->shoutcast_mount); free (node); From c3afdff5fb7d5f837f8e438debddc2bcac983cb3 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 21:45:55 +0000 Subject: [PATCH 15/19] Feature: Allow POST for web requests --- src/connection.c | 5 +++-- src/stats.c | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/connection.c b/src/connection.c index 8725b40d..7de3ca67 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1381,6 +1381,7 @@ static void _handle_authed_client(client_t *client, void *uri, auth_result resul _handle_stats_request(client, uri); break; case httpp_req_get: + case httpp_req_post: case httpp_req_options: _handle_get_request(client, uri); break; @@ -1434,7 +1435,7 @@ static void _handle_authentication_mount_generic(client_t *client, void *uri, mo if (!mountproxy) { int command_type = admin_get_command_type(client->admin_command); if (command_type == ADMINTYPE_MOUNT || command_type == ADMINTYPE_HYBRID) { - const char *mount = httpp_get_query_param(client->parser, "mount"); + const char *mount = httpp_get_param(client->parser, "mount"); if (mount) mountproxy = __find_non_admin_mount(config, mount, type); } @@ -1673,7 +1674,7 @@ static void _handle_connection(void) continue; } - client->mode = config_str_to_omode(httpp_get_query_param(client->parser, "omode")); + client->mode = config_str_to_omode(httpp_get_param(client->parser, "omode")); if (_handle_resources(client, &uri) != 0) { client_destroy (client); diff --git a/src/stats.c b/src/stats.c index 79c82798..5cfc578c 100644 --- a/src/stats.c +++ b/src/stats.c @@ -1020,7 +1020,7 @@ void stats_transform_xslt(client_t *client, const char *uri) { xmlDocPtr doc; char *xslpath = util_get_path_from_normalised_uri(uri); - const char *mount = httpp_get_query_param(client->parser, "mount"); + const char *mount = httpp_get_param(client->parser, "mount"); doc = stats_get_xml(0, mount, client->mode); From b8ceef24257857003daae961856a920a1ace2a0f Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 18 Jun 2018 21:49:45 +0000 Subject: [PATCH 16/19] Fix: Fixed some compiler warnings --- src/client.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/client.c b/src/client.c index 7168839a..6e5f0e67 100644 --- a/src/client.c +++ b/src/client.c @@ -523,12 +523,11 @@ client_slurp_result_t client_body_slurp(client_t *client, void *buf, size_t *len if (client->request_body_length != -1) { /* non-streaming mode */ size_t left = (size_t)client->request_body_length - client->request_body_read; - size_t ret; if (!left) return CLIENT_SLURP_SUCCESS; - if (*len < client->request_body_length) + if (*len < (size_t)client->request_body_length) return CLIENT_SLURP_BUFFER_TO_SMALL; if (left > 2048) @@ -536,7 +535,7 @@ client_slurp_result_t client_body_slurp(client_t *client, void *buf, size_t *len client_body_read(client, buf + client->request_body_read, left); - if (client->request_body_length == client->request_body_read) { + if ((size_t)client->request_body_length == client->request_body_read) { *len = client->request_body_read; return CLIENT_SLURP_SUCCESS; @@ -598,7 +597,7 @@ client_slurp_result_t client_body_skip(client_t *client) client_body_read(client, buf, left); - if (client->request_body_length == client->request_body_read) { + if ((size_t)client->request_body_length == client->request_body_read) { ICECAST_LOG_DEBUG("Slurping client %p ... was a success"); return CLIENT_SLURP_SUCCESS; } else { From 59cf2ff4264b7fd15f3d934d61e26e9f200e1d3c Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Tue, 19 Jun 2018 07:42:36 +0000 Subject: [PATCH 17/19] Cleanup: Removed now useless start-of-stream passing code (as much as I found. There is more) --- src/connection.c | 25 +++++++++---------------- src/source.c | 7 +++---- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/src/connection.c b/src/connection.c index 7de3ca67..2dfe4b7d 100644 --- a/src/connection.c +++ b/src/connection.c @@ -79,7 +79,6 @@ typedef struct client_queue_tag { client_t *client; int offset; - int stream_offset; int shoutcast; char *shoutcast_mount; char *bodybuffer; @@ -537,6 +536,7 @@ static void process_request_queue (void) } if (len > 0) { + ssize_t stream_offset = -1; int pass_it = 1; char *ptr; @@ -558,23 +558,27 @@ static void process_request_queue (void) * http style headers, we don't want to lose those */ ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n"); if (ptr) { - node->stream_offset = (ptr+6) - client->refbuf->data; + stream_offset = (ptr+6) - client->refbuf->data; break; } ptr = strstr(client->refbuf->data, "\r\n\r\n"); if (ptr) { - node->stream_offset = (ptr+4) - client->refbuf->data; + stream_offset = (ptr+4) - client->refbuf->data; break; } ptr = strstr(client->refbuf->data, "\n\n"); if (ptr) { - node->stream_offset = (ptr+2) - client->refbuf->data; + stream_offset = (ptr+2) - client->refbuf->data; break; } pass_it = 0; } while (0); if (pass_it) { + if (stream_offset != -1) { + connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset); + node->offset = stream_offset; + } if ((client_queue_t **)_req_queue_tail == &(node->next)) _req_queue_tail = (volatile client_queue_t **)node_ref; *node_ref = node->next; @@ -923,8 +927,7 @@ static inline void source_startup(client_t *client, const char *uri) ret = util_http_build_header(ok->data, PER_CLIENT_REFBUF_SIZE, 0, 0, status_to_send, NULL, NULL, NULL, NULL, NULL, client); snprintf(ok->data + ret, PER_CLIENT_REFBUF_SIZE - ret, "Content-Length: 0\r\n\r\n"); ok->len = strlen(ok->data); - /* we may have unprocessed data read in, so don't overwrite it */ - ok->associated = client->refbuf; + refbuf_release(client->refbuf); client->refbuf = ok; fserve_add_client_callback(client, source_client_callback, source); } @@ -1189,11 +1192,6 @@ static void _handle_shoutcast_compatible(client_queue_t *node) parser = httpp_create_parser(); httpp_initialize(parser, NULL); if (httpp_parse(parser, http_compliant, strlen(http_compliant))) { - /* we may have more than just headers, so prepare for it */ - if (node->stream_offset != node->offset) { - connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); - node->offset = node->stream_offset; - } client->refbuf->len = 0; client->parser = parser; client->protocol = ICECAST_PROTOCOL_SHOUTCAST; @@ -1616,11 +1614,6 @@ static void _handle_connection(void) char *uri; const char *upgrade, *connection; - /* we may have more than just headers, so prepare for it */ - if (node->stream_offset != node->offset) { - connection_read_put_back(client->con, client->refbuf->data + node->stream_offset, node->offset - node->stream_offset); - node->offset = node->stream_offset; - } client->refbuf->len = 0; /* early check if we need more data */ diff --git a/src/source.c b/src/source.c index 69d0f7f6..7a7e0a65 100644 --- a/src/source.c +++ b/src/source.c @@ -1318,7 +1318,6 @@ void source_client_callback (client_t *client, void *arg) { const char *agent; source_t *source = arg; - refbuf_t *old_data = client->refbuf; if (client->con->error) { @@ -1329,9 +1328,9 @@ void source_client_callback (client_t *client, void *arg) source_free_source (source); return; } - client->refbuf = old_data->associated; - old_data->associated = NULL; - refbuf_release (old_data); + + client->refbuf->len = 0; + stats_event (source->mount, "source_ip", source->client->con->ip); agent = httpp_getvar (source->client->parser, "user-agent"); if (agent) From 7e7698674169deb7a1c299ee245740a9616b8dc5 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Tue, 19 Jun 2018 08:08:39 +0000 Subject: [PATCH 18/19] Update: Added a fast lane for client requests with a small body --- src/connection.c | 84 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 25 deletions(-) diff --git a/src/connection.c b/src/connection.c index 2dfe4b7d..72efedf4 100644 --- a/src/connection.c +++ b/src/connection.c @@ -613,6 +613,42 @@ static void _add_body_client(client_queue_t *node) thread_spin_unlock(&_connection_lock); } +static client_slurp_result_t process_request_body_queue_one(client_queue_t *node, time_t timeout, size_t body_size_limit) +{ + client_t *client = node->client; + client_slurp_result_t res; + + if (client->parser->req_type == httpp_req_post) { + if (node->bodybuffer == NULL && client->request_body_read == 0) { + if (client->request_body_length < 0) { + node->bodybufferlen = body_size_limit; + node->bodybuffer = malloc(node->bodybufferlen); + } else if (client->request_body_length <= (ssize_t)body_size_limit) { + node->bodybufferlen = client->request_body_length; + node->bodybuffer = malloc(node->bodybufferlen); + } + } + } + + if (node->bodybuffer) { + res = client_body_slurp(client, node->bodybuffer, &(node->bodybufferlen)); + if (res == CLIENT_SLURP_SUCCESS) { + httpp_parse_postdata(client->parser, node->bodybuffer, node->bodybufferlen); + free(node->bodybuffer); + node->bodybuffer = NULL; + } + } else { + res = client_body_skip(client); + } + + if (res != CLIENT_SLURP_SUCCESS) { + if (client->con->con_time <= timeout || client->request_body_read >= body_size_limit) { + return CLIENT_SLURP_ERROR; + } + } + + return res; +} /* This queue reads data from the body of clients. */ static void process_request_body_queue (void) @@ -640,30 +676,9 @@ static void process_request_body_queue (void) ICECAST_LOG_DEBUG("Got client %p in body queue.", client); - if (client->parser->req_type == httpp_req_post) { - if (node->bodybuffer == NULL && client->request_body_read == 0) { - if (client->request_body_length < 0) { - node->bodybufferlen = body_size_limit; - node->bodybuffer = malloc(node->bodybufferlen); - } else if (client->request_body_length <= (ssize_t)body_size_limit) { - node->bodybufferlen = client->request_body_length; - node->bodybuffer = malloc(node->bodybufferlen); - } - } - } + res = process_request_body_queue_one(node, timeout, body_size_limit); - if (node->bodybuffer) { - res = client_body_slurp(client, node->bodybuffer, &(node->bodybufferlen)); - if (res == CLIENT_SLURP_SUCCESS) { - httpp_parse_postdata(client->parser, node->bodybuffer, node->bodybufferlen); - free(node->bodybuffer); - node->bodybuffer = NULL; - } - } else { - res = client_body_skip(client); - } - - if (res != CLIENT_SLURP_NEEDS_MORE_DATA || client->con->con_time <= timeout || client->request_body_read >= body_size_limit) { + if (res != CLIENT_SLURP_NEEDS_MORE_DATA) { ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); if ((client_queue_t **)_body_queue_tail == &(node->next)) @@ -1619,8 +1634,27 @@ static void _handle_connection(void) /* early check if we need more data */ _update_client_request_body_length(client); if (_need_body(node)) { - _add_body_client(node); - continue; + /* Just calling _add_body_client() would do the job. + * However, if the client only has a small body this might work without moving it between queues. + * -> much faster. + */ + client_slurp_result_t res; + ice_config_t *config; + time_t timeout; + size_t body_size_limit; + + config = config_get_config(); + timeout = time(NULL) - config->body_timeout; + body_size_limit = config->body_size_limit; + config_release_config(); + + res = process_request_body_queue_one(node, timeout, body_size_limit); + if (res != CLIENT_SLURP_SUCCESS) { + _add_body_client(node); + continue; + } else { + ICECAST_LOG_DEBUG("Success on fast lane"); + } } rawuri = httpp_getvar(parser, HTTPP_VAR_URI); From a53270065bccee7c5c73aad4534eabb07c51b5b9 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Tue, 19 Jun 2018 08:20:44 +0000 Subject: [PATCH 19/19] Update: Converted
s from GET to POST --- admin/manageauth.xsl | 2 +- admin/moveclients.xsl | 2 +- admin/updatemetadata.xsl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/admin/manageauth.xsl b/admin/manageauth.xsl index b0ea79e2..dc37cdf5 100644 --- a/admin/manageauth.xsl +++ b/admin/manageauth.xsl @@ -53,7 +53,7 @@

Add User

- + diff --git a/admin/moveclients.xsl b/admin/moveclients.xsl index e923da24..5bdcacba 100644 --- a/admin/moveclients.xsl +++ b/admin/moveclients.xsl @@ -18,7 +18,7 @@

Choose the mountpoint to which you want to move the listeners to:

- + diff --git a/admin/updatemetadata.xsl b/admin/updatemetadata.xsl index 766a8f4e..7d60093c 100644 --- a/admin/updatemetadata.xsl +++ b/admin/updatemetadata.xsl @@ -15,7 +15,7 @@

Mountpoint

- +