mirror of
https://gitlab.xiph.org/xiph/icecast-server.git
synced 2024-12-04 14:46:30 -05:00
Merge branch 'ph3-body'
This commit is contained in:
commit
4a10d7e744
@ -53,7 +53,7 @@
|
||||
<!-- Form to add Users -->
|
||||
<xsl:if test="@can-adduser = 'true'">
|
||||
<h4>Add User</h4>
|
||||
<form method="get" action="manageauth.xsl">
|
||||
<form method="post" action="manageauth.xsl">
|
||||
<label for="username" class="hidden">Username</label>
|
||||
<input type="text" id="username" name="username" value="" placeholder="Username" required="required" />
|
||||
<label for="password" class="hidden">Password</label>
|
||||
|
@ -18,7 +18,7 @@
|
||||
<xsl:choose>
|
||||
<xsl:when test="source">
|
||||
<p>Choose the mountpoint to which you want to move the listeners to:</p>
|
||||
<form method="get" action="moveclients.xsl">
|
||||
<form method="post" action="moveclients.xsl">
|
||||
<label for="moveto" class="hidden">
|
||||
Move from <code><xsl:value-of select="current_source" /></code> to
|
||||
</label>
|
||||
|
@ -15,7 +15,7 @@
|
||||
<h3>Mountpoint <xsl:value-of select="@mount" /></h3>
|
||||
<!-- Mount nav -->
|
||||
<xsl:call-template name="mountnav" />
|
||||
<form method="get" action="/admin/metadata.xsl">
|
||||
<form method="post" action="/admin/metadata.xsl">
|
||||
<label for="metadata" class="hidden">Metadata</label>
|
||||
<input type="text" id="metadata" name="song" value="" placeholder="Click to edit" required="required" />
|
||||
<input type="hidden" name="mount" value="{@mount}" />
|
||||
|
@ -52,7 +52,7 @@
|
||||
/* Helper macros */
|
||||
#define COMMAND_REQUIRE(client,name,var) \
|
||||
do { \
|
||||
(var) = httpp_get_query_param((client)->parser, (name)); \
|
||||
(var) = httpp_get_param((client)->parser, (name)); \
|
||||
if((var) == NULL) { \
|
||||
client_send_error_by_id(client, ICECAST_ERROR_ADMIN_MISSING_PARAMETER); \
|
||||
return; \
|
||||
@ -60,7 +60,7 @@
|
||||
} while(0);
|
||||
|
||||
#define COMMAND_OPTIONAL(client,name,var) \
|
||||
(var) = httpp_get_query_param((client)->parser, (name))
|
||||
(var) = httpp_get_param((client)->parser, (name))
|
||||
|
||||
/* special commands */
|
||||
#define COMMAND_ERROR ADMIN_COMMAND_ERROR
|
||||
@ -502,7 +502,7 @@ void admin_handle_request(client_t *client, const char *uri)
|
||||
}
|
||||
}
|
||||
|
||||
mount = httpp_get_query_param(client->parser, "mount");
|
||||
COMMAND_OPTIONAL(client, "mount", mount);
|
||||
|
||||
/* Find mountpoint source */
|
||||
if(mount != NULL) {
|
||||
@ -543,6 +543,7 @@ void admin_handle_request(client_t *client, const char *uri)
|
||||
|
||||
switch (client->parser->req_type) {
|
||||
case httpp_req_get:
|
||||
case httpp_req_post:
|
||||
handler->function(client, source, format);
|
||||
break;
|
||||
case httpp_req_options:
|
||||
|
@ -49,11 +49,13 @@
|
||||
#define CONFIG_DEFAULT_CLIENT_LIMIT 256
|
||||
#define CONFIG_DEFAULT_SOURCE_LIMIT 16
|
||||
#define CONFIG_DEFAULT_QUEUE_SIZE_LIMIT (500*1024)
|
||||
#define CONFIG_DEFAULT_BODY_SIZE_LIMIT (4*1024)
|
||||
#define CONFIG_DEFAULT_BURST_SIZE (64*1024)
|
||||
#define CONFIG_DEFAULT_THREADPOOL_SIZE 4
|
||||
#define CONFIG_DEFAULT_CLIENT_TIMEOUT 30
|
||||
#define CONFIG_DEFAULT_HEADER_TIMEOUT 15
|
||||
#define CONFIG_DEFAULT_SOURCE_TIMEOUT 10
|
||||
#define CONFIG_DEFAULT_BODY_TIMEOUT (10 + CONFIG_DEFAULT_HEADER_TIMEOUT)
|
||||
#define CONFIG_DEFAULT_MASTER_USERNAME "relay"
|
||||
#define CONFIG_DEFAULT_SHOUTCAST_MOUNT "/stream"
|
||||
#define CONFIG_DEFAULT_SHOUTCAST_USER "source"
|
||||
@ -801,12 +803,16 @@ static void _set_defaults(ice_config_t *configuration)
|
||||
->source_limit = CONFIG_DEFAULT_SOURCE_LIMIT;
|
||||
configuration
|
||||
->queue_size_limit = CONFIG_DEFAULT_QUEUE_SIZE_LIMIT;
|
||||
configuration
|
||||
->body_size_limit = CONFIG_DEFAULT_BODY_SIZE_LIMIT;
|
||||
configuration
|
||||
->client_timeout = CONFIG_DEFAULT_CLIENT_TIMEOUT;
|
||||
configuration
|
||||
->header_timeout = CONFIG_DEFAULT_HEADER_TIMEOUT;
|
||||
configuration
|
||||
->source_timeout = CONFIG_DEFAULT_SOURCE_TIMEOUT;
|
||||
configuration
|
||||
->source_timeout = CONFIG_DEFAULT_BODY_TIMEOUT;
|
||||
configuration
|
||||
->shoutcast_mount = (char *) xmlCharStrdup(CONFIG_DEFAULT_SHOUTCAST_MOUNT);
|
||||
configuration
|
||||
@ -1123,6 +1129,8 @@ static void _parse_limits(xmlDocPtr doc,
|
||||
__read_int(doc, node, &configuration->client_limit, "<clients> must not be empty.");
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("sources")) == 0) {
|
||||
__read_int(doc, node, &configuration->source_limit, "<sources> must not be empty.");
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("bodysize")) == 0) {
|
||||
__read_int(doc, node, &configuration->body_size_limit, "<bodysize> must not be empty.");
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("queue-size")) == 0) {
|
||||
__read_unsigned_int(doc, node, &configuration->queue_size_limit, "<queue-size> must not be empty.");
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("threadpool")) == 0) {
|
||||
@ -1134,6 +1142,8 @@ static void _parse_limits(xmlDocPtr doc,
|
||||
__read_int(doc, node, &configuration->header_timeout, "<header-timeout> must not be empty.");
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("source-timeout")) == 0) {
|
||||
__read_int(doc, node, &configuration->source_timeout, "<source-timeout> must not be empty.");
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("body-timeout")) == 0) {
|
||||
__read_int(doc, node, &configuration->body_timeout, "<body-timeout> must not be empty.");
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("burst-on-connect")) == 0) {
|
||||
ICECAST_LOG_WARN("<burst-on-connect> is deprecated, use <burst-size> instead.");
|
||||
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include "common/thread/thread.h"
|
||||
#include "common/avl/avl.h"
|
||||
#include "icecasttypes.h"
|
||||
#include "compat.h"
|
||||
|
||||
#define XMLSTR(str) ((xmlChar *)(str))
|
||||
|
||||
@ -170,11 +171,13 @@ struct ice_config_tag {
|
||||
|
||||
int client_limit;
|
||||
int source_limit;
|
||||
int body_size_limit;
|
||||
unsigned int queue_size_limit;
|
||||
unsigned int burst_size;
|
||||
int client_timeout;
|
||||
int header_timeout;
|
||||
int source_timeout;
|
||||
int body_timeout;
|
||||
int fileserve;
|
||||
int on_demand; /* global setting for all relays */
|
||||
|
||||
|
193
src/client.c
193
src/client.c
@ -32,6 +32,7 @@
|
||||
#include "refobject.h"
|
||||
#include "cfgfile.h"
|
||||
#include "connection.h"
|
||||
#include "tls.h"
|
||||
#include "refbuf.h"
|
||||
#include "format.h"
|
||||
#include "stats.h"
|
||||
@ -86,6 +87,8 @@ int client_create(client_t **c_ptr, connection_t *con, http_parser_t *parser)
|
||||
client->con = con;
|
||||
client->parser = parser;
|
||||
client->protocol = ICECAST_PROTOCOL_HTTP;
|
||||
client->request_body_length = 0;
|
||||
client->request_body_read = 0;
|
||||
client->admin_command = ADMIN_COMMAND_ERROR;
|
||||
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
|
||||
client->refbuf->len = 0; /* force reader code to ignore buffer contents */
|
||||
@ -121,6 +124,16 @@ static inline void client_reuseconnection(client_t *client) {
|
||||
client->con->send = NULL;
|
||||
}
|
||||
|
||||
if (client->con->readbufferlen) {
|
||||
/* Aend... moorre paaiin.
|
||||
* stealing putback buffer.
|
||||
*/
|
||||
con->readbuffer = client->con->readbuffer;
|
||||
con->readbufferlen = client->con->readbufferlen;
|
||||
client->con->readbuffer = NULL;
|
||||
client->con->readbufferlen = 0;
|
||||
}
|
||||
|
||||
client->reuse = ICECAST_REUSE_CLOSE;
|
||||
|
||||
client_destroy(client);
|
||||
@ -137,8 +150,11 @@ void client_destroy(client_t *client)
|
||||
return;
|
||||
|
||||
if (client->reuse != ICECAST_REUSE_CLOSE) {
|
||||
client_reuseconnection(client);
|
||||
return;
|
||||
/* only reuse the client if we reached the body's EOF. */
|
||||
if (client_body_eof(client) == 1) {
|
||||
client_reuseconnection(client);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* release the buffer now, as the buffer could be on the source queue
|
||||
@ -435,3 +451,176 @@ void client_set_queue(client_t *client, refbuf_t *refbuf)
|
||||
if (to_release)
|
||||
refbuf_release(to_release);
|
||||
}
|
||||
|
||||
ssize_t client_body_read(client_t *client, void *buf, size_t len)
|
||||
{
|
||||
ssize_t ret;
|
||||
|
||||
ICECAST_LOG_DEBUG("Reading from body (client=%p)", client);
|
||||
|
||||
if (client->request_body_length != -1) {
|
||||
size_t left = (size_t)client->request_body_length - client->request_body_read;
|
||||
if (len > left) {
|
||||
ICECAST_LOG_DEBUG("Limiting read request to left over body size: left %zu byte, requested %zu byte", left, len);
|
||||
len = left;
|
||||
}
|
||||
}
|
||||
|
||||
ret = client_read_bytes(client, buf, len);
|
||||
|
||||
if (ret > 0) {
|
||||
client->request_body_read += ret;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* we might un-static this if needed at some time in distant future. -- ph3-der-loewe, 2018-04-17 */
|
||||
static int client_eof(client_t *client)
|
||||
{
|
||||
if (!client)
|
||||
return -1;
|
||||
|
||||
if (!client->con)
|
||||
return 0;
|
||||
|
||||
if (client->con->tls && tls_got_shutdown(client->con->tls) > 1)
|
||||
client->con->error = 1;
|
||||
|
||||
if (client->con->error)
|
||||
return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int client_body_eof(client_t *client)
|
||||
{
|
||||
int ret = -1;
|
||||
|
||||
if (!client)
|
||||
return -1;
|
||||
|
||||
if (client->request_body_length != -1 && client->request_body_read == (size_t)client->request_body_length) {
|
||||
ICECAST_LOG_DEBUG("Reached given body length (client=%p)", client);
|
||||
ret = 1;
|
||||
} else if (client->encoding) {
|
||||
ICECAST_LOG_DEBUG("Looking for body EOF with encoding (client=%p)", client);
|
||||
ret = httpp_encoding_eof(client->encoding, (int(*)(void*))client_eof, client);
|
||||
} else {
|
||||
ICECAST_LOG_DEBUG("Looking for body EOF without encoding (client=%p)", client);
|
||||
ret = client_eof(client);
|
||||
}
|
||||
|
||||
ICECAST_LOG_DEBUG("... result is: %i (client=%p)", ret, client);
|
||||
return ret;
|
||||
}
|
||||
|
||||
client_slurp_result_t client_body_slurp(client_t *client, void *buf, size_t *len)
|
||||
{
|
||||
if (!client || !buf || !len)
|
||||
return CLIENT_SLURP_ERROR;
|
||||
|
||||
if (client->request_body_length != -1) {
|
||||
/* non-streaming mode */
|
||||
size_t left = (size_t)client->request_body_length - client->request_body_read;
|
||||
|
||||
if (!left)
|
||||
return CLIENT_SLURP_SUCCESS;
|
||||
|
||||
if (*len < (size_t)client->request_body_length)
|
||||
return CLIENT_SLURP_BUFFER_TO_SMALL;
|
||||
|
||||
if (left > 2048)
|
||||
left = 2048;
|
||||
|
||||
client_body_read(client, buf + client->request_body_read, left);
|
||||
|
||||
if ((size_t)client->request_body_length == client->request_body_read) {
|
||||
*len = client->request_body_read;
|
||||
|
||||
return CLIENT_SLURP_SUCCESS;
|
||||
} else {
|
||||
return CLIENT_SLURP_NEEDS_MORE_DATA;
|
||||
}
|
||||
} else {
|
||||
/* streaming mode */
|
||||
size_t left = *len - client->request_body_read;
|
||||
int ret;
|
||||
|
||||
if (left) {
|
||||
if (left > 2048)
|
||||
left = 2048;
|
||||
|
||||
client_body_read(client, buf + client->request_body_read, left);
|
||||
}
|
||||
|
||||
ret = client_body_eof(client);
|
||||
switch (ret) {
|
||||
case 0:
|
||||
if (*len == client->request_body_read) {
|
||||
return CLIENT_SLURP_BUFFER_TO_SMALL;
|
||||
}
|
||||
return CLIENT_SLURP_NEEDS_MORE_DATA;
|
||||
break;
|
||||
case 1:
|
||||
return CLIENT_SLURP_SUCCESS;
|
||||
break;
|
||||
default:
|
||||
return CLIENT_SLURP_ERROR;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client_slurp_result_t client_body_skip(client_t *client)
|
||||
{
|
||||
char buf[2048];
|
||||
int ret;
|
||||
|
||||
ICECAST_LOG_DEBUG("Slurping client %p");
|
||||
|
||||
if (!client) {
|
||||
ICECAST_LOG_DEBUG("Slurping client %p ... failed");
|
||||
return CLIENT_SLURP_ERROR;
|
||||
}
|
||||
|
||||
if (client->request_body_length != -1) {
|
||||
size_t left = (size_t)client->request_body_length - client->request_body_read;
|
||||
|
||||
if (!left) {
|
||||
ICECAST_LOG_DEBUG("Slurping client %p ... was a success");
|
||||
return CLIENT_SLURP_SUCCESS;
|
||||
}
|
||||
|
||||
if (left > sizeof(buf))
|
||||
left = sizeof(buf);
|
||||
|
||||
client_body_read(client, buf, left);
|
||||
|
||||
if ((size_t)client->request_body_length == client->request_body_read) {
|
||||
ICECAST_LOG_DEBUG("Slurping client %p ... was a success");
|
||||
return CLIENT_SLURP_SUCCESS;
|
||||
} else {
|
||||
ICECAST_LOG_DEBUG("Slurping client %p ... needs more data");
|
||||
return CLIENT_SLURP_NEEDS_MORE_DATA;
|
||||
}
|
||||
} else {
|
||||
client_body_read(client, buf, sizeof(buf));
|
||||
}
|
||||
|
||||
ret = client_body_eof(client);
|
||||
switch (ret) {
|
||||
case 0:
|
||||
ICECAST_LOG_DEBUG("Slurping client %p ... needs more data");
|
||||
return CLIENT_SLURP_NEEDS_MORE_DATA;
|
||||
break;
|
||||
case 1:
|
||||
ICECAST_LOG_DEBUG("Slurping client %p ... was a success");
|
||||
return CLIENT_SLURP_SUCCESS;
|
||||
break;
|
||||
default:
|
||||
ICECAST_LOG_DEBUG("Slurping client %p ... failed");
|
||||
return CLIENT_SLURP_ERROR;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
19
src/client.h
19
src/client.h
@ -43,6 +43,13 @@ typedef enum _reuse_tag {
|
||||
ICECAST_REUSE_UPGRADETLS
|
||||
} reuse_t;
|
||||
|
||||
typedef enum {
|
||||
CLIENT_SLURP_ERROR,
|
||||
CLIENT_SLURP_NEEDS_MORE_DATA,
|
||||
CLIENT_SLURP_BUFFER_TO_SMALL,
|
||||
CLIENT_SLURP_SUCCESS
|
||||
} client_slurp_result_t;
|
||||
|
||||
struct _client_tag {
|
||||
/* mode of operation for this client */
|
||||
operation_mode mode;
|
||||
@ -62,6 +69,14 @@ struct _client_tag {
|
||||
/* protocol client uses */
|
||||
protocol_t protocol;
|
||||
|
||||
/* http request body length
|
||||
* -1 for streaming (e.g. chunked), 0 for no body, >0 for NNN bytes
|
||||
*/
|
||||
ssize_t request_body_length;
|
||||
|
||||
/* http request body length read so far */
|
||||
size_t request_body_read;
|
||||
|
||||
/* http response code for this client */
|
||||
int respcode;
|
||||
|
||||
@ -122,5 +137,9 @@ admin_format_t client_get_admin_format_by_content_negotiation(client_t *client);
|
||||
int client_send_bytes (client_t *client, const void *buf, unsigned len);
|
||||
int client_read_bytes (client_t *client, void *buf, unsigned len);
|
||||
void client_set_queue (client_t *client, refbuf_t *refbuf);
|
||||
ssize_t client_body_read(client_t *client, void *buf, size_t len);
|
||||
int client_body_eof(client_t *client);
|
||||
client_slurp_result_t client_body_slurp(client_t *client, void *buf, size_t *len);
|
||||
client_slurp_result_t client_body_skip(client_t *client);
|
||||
|
||||
#endif /* __CLIENT_H__ */
|
||||
|
@ -1 +1 @@
|
||||
Subproject commit fca416b126cb842034ac3468362c044895975b5a
|
||||
Subproject commit 9bfb3a34fc41cc8e0075328d7d6527bd84eb40ba
|
308
src/connection.c
308
src/connection.c
@ -79,23 +79,21 @@
|
||||
typedef struct client_queue_tag {
|
||||
client_t *client;
|
||||
int offset;
|
||||
int stream_offset;
|
||||
int shoutcast;
|
||||
char *shoutcast_mount;
|
||||
char *bodybuffer;
|
||||
size_t bodybufferlen;
|
||||
int tried_body;
|
||||
struct client_queue_tag *next;
|
||||
} client_queue_t;
|
||||
|
||||
typedef struct _thread_queue_tag {
|
||||
thread_type *thread_id;
|
||||
struct _thread_queue_tag *next;
|
||||
} thread_queue_t;
|
||||
|
||||
static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail
|
||||
static volatile unsigned long _current_id = 0;
|
||||
static int _initialized = 0;
|
||||
|
||||
static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
|
||||
static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
|
||||
static volatile client_queue_t *_body_queue = NULL, **_body_queue_tail = &_body_queue;
|
||||
static int tls_ok;
|
||||
static tls_ctx_t *tls_ctx;
|
||||
|
||||
@ -120,6 +118,8 @@ void connection_initialize(void)
|
||||
_req_queue_tail = &_req_queue;
|
||||
_con_queue = NULL;
|
||||
_con_queue_tail = &_con_queue;
|
||||
_body_queue = NULL;
|
||||
_body_queue_tail = &_body_queue;
|
||||
|
||||
_initialized = 1;
|
||||
}
|
||||
@ -271,6 +271,12 @@ void connection_uses_tls(connection_t *con)
|
||||
if (con->tls)
|
||||
return;
|
||||
|
||||
if (con->readbufferlen) {
|
||||
ICECAST_LOG_ERROR("Connection is now using TLS but has data put back. BAD. Discarding putback data.");
|
||||
free(con->readbuffer);
|
||||
con->readbufferlen = 0;
|
||||
}
|
||||
|
||||
con->tlsmode = ICECAST_TLSMODE_RFC2818;
|
||||
con->read = connection_read_tls;
|
||||
con->send = connection_send_tls;
|
||||
@ -282,7 +288,71 @@ void connection_uses_tls(connection_t *con)
|
||||
|
||||
ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len)
|
||||
{
|
||||
return con->read(con, buf, len);
|
||||
ssize_t done = 0;
|
||||
ssize_t ret;
|
||||
|
||||
if (con->readbufferlen) {
|
||||
ICECAST_LOG_DEBUG("On connection %p we read from putback buffer, filled with %zu bytes, requested are %zu bytes", con, con->readbufferlen, len);
|
||||
if (len >= con->readbufferlen) {
|
||||
memcpy(buf, con->readbuffer, con->readbufferlen);
|
||||
free(con->readbuffer);
|
||||
ICECAST_LOG_DEBUG("New fill in buffer=<empty>");
|
||||
if (len == con->readbufferlen) {
|
||||
con->readbufferlen = 0;
|
||||
return len;
|
||||
} else {
|
||||
len -= con->readbufferlen;
|
||||
buf += con->readbufferlen;
|
||||
done = con->readbufferlen;
|
||||
con->readbufferlen = 0;
|
||||
}
|
||||
} else {
|
||||
memcpy(buf, con->readbuffer, len);
|
||||
memmove(con->readbuffer, con->readbuffer+len, con->readbufferlen-len);
|
||||
con->readbufferlen -= len;
|
||||
return len;
|
||||
}
|
||||
}
|
||||
|
||||
ret = con->read(con, buf, len);
|
||||
|
||||
if (ret < 0) {
|
||||
if (done == 0) {
|
||||
return ret;
|
||||
} else {
|
||||
return done;
|
||||
}
|
||||
}
|
||||
|
||||
return done + ret;
|
||||
}
|
||||
|
||||
int connection_read_put_back(connection_t *con, const void *buf, size_t len)
|
||||
{
|
||||
void *n;
|
||||
|
||||
if (con->readbufferlen) {
|
||||
n = realloc(con->readbuffer, con->readbufferlen + len);
|
||||
if (!n)
|
||||
return -1;
|
||||
|
||||
memcpy(n + con->readbufferlen, buf, len);
|
||||
con->readbuffer = n;
|
||||
con->readbufferlen += len;
|
||||
|
||||
ICECAST_LOG_DEBUG("On connection %p %zu bytes have been put back.", con, len);
|
||||
return 0;
|
||||
} else {
|
||||
n = malloc(len);
|
||||
if (!n)
|
||||
return -1;
|
||||
|
||||
memcpy(n, buf, len);
|
||||
con->readbuffer = n;
|
||||
con->readbufferlen = len;
|
||||
ICECAST_LOG_DEBUG("On connection %p %zu bytes have been put back.", con, len);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static sock_t wait_for_serversock(int timeout)
|
||||
@ -466,6 +536,7 @@ static void process_request_queue (void)
|
||||
}
|
||||
|
||||
if (len > 0) {
|
||||
ssize_t stream_offset = -1;
|
||||
int pass_it = 1;
|
||||
char *ptr;
|
||||
|
||||
@ -487,23 +558,27 @@ static void process_request_queue (void)
|
||||
* http style headers, we don't want to lose those */
|
||||
ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n");
|
||||
if (ptr) {
|
||||
node->stream_offset = (ptr+6) - client->refbuf->data;
|
||||
stream_offset = (ptr+6) - client->refbuf->data;
|
||||
break;
|
||||
}
|
||||
ptr = strstr(client->refbuf->data, "\r\n\r\n");
|
||||
if (ptr) {
|
||||
node->stream_offset = (ptr+4) - client->refbuf->data;
|
||||
stream_offset = (ptr+4) - client->refbuf->data;
|
||||
break;
|
||||
}
|
||||
ptr = strstr(client->refbuf->data, "\n\n");
|
||||
if (ptr) {
|
||||
node->stream_offset = (ptr+2) - client->refbuf->data;
|
||||
stream_offset = (ptr+2) - client->refbuf->data;
|
||||
break;
|
||||
}
|
||||
pass_it = 0;
|
||||
} while (0);
|
||||
|
||||
if (pass_it) {
|
||||
if (stream_offset != -1) {
|
||||
connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset);
|
||||
node->offset = stream_offset;
|
||||
}
|
||||
if ((client_queue_t **)_req_queue_tail == &(node->next))
|
||||
_req_queue_tail = (volatile client_queue_t **)node_ref;
|
||||
*node_ref = node->next;
|
||||
@ -526,6 +601,96 @@ static void process_request_queue (void)
|
||||
_handle_connection();
|
||||
}
|
||||
|
||||
/* add client to body queue.
|
||||
*/
|
||||
static void _add_body_client(client_queue_t *node)
|
||||
{
|
||||
ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client);
|
||||
|
||||
thread_spin_lock(&_connection_lock);
|
||||
*_body_queue_tail = node;
|
||||
_body_queue_tail = (volatile client_queue_t **) &node->next;
|
||||
thread_spin_unlock(&_connection_lock);
|
||||
}
|
||||
|
||||
static client_slurp_result_t process_request_body_queue_one(client_queue_t *node, time_t timeout, size_t body_size_limit)
|
||||
{
|
||||
client_t *client = node->client;
|
||||
client_slurp_result_t res;
|
||||
|
||||
if (client->parser->req_type == httpp_req_post) {
|
||||
if (node->bodybuffer == NULL && client->request_body_read == 0) {
|
||||
if (client->request_body_length < 0) {
|
||||
node->bodybufferlen = body_size_limit;
|
||||
node->bodybuffer = malloc(node->bodybufferlen);
|
||||
} else if (client->request_body_length <= (ssize_t)body_size_limit) {
|
||||
node->bodybufferlen = client->request_body_length;
|
||||
node->bodybuffer = malloc(node->bodybufferlen);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (node->bodybuffer) {
|
||||
res = client_body_slurp(client, node->bodybuffer, &(node->bodybufferlen));
|
||||
if (res == CLIENT_SLURP_SUCCESS) {
|
||||
httpp_parse_postdata(client->parser, node->bodybuffer, node->bodybufferlen);
|
||||
free(node->bodybuffer);
|
||||
node->bodybuffer = NULL;
|
||||
}
|
||||
} else {
|
||||
res = client_body_skip(client);
|
||||
}
|
||||
|
||||
if (res != CLIENT_SLURP_SUCCESS) {
|
||||
if (client->con->con_time <= timeout || client->request_body_read >= body_size_limit) {
|
||||
return CLIENT_SLURP_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/* This queue reads data from the body of clients. */
|
||||
static void process_request_body_queue (void)
|
||||
{
|
||||
client_queue_t **node_ref = (client_queue_t **)&_body_queue;
|
||||
ice_config_t *config;
|
||||
time_t timeout;
|
||||
size_t body_size_limit;
|
||||
|
||||
ICECAST_LOG_DEBUG("Processing body queue.");
|
||||
|
||||
ICECAST_LOG_DEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail);
|
||||
|
||||
config = config_get_config();
|
||||
timeout = time(NULL) - config->body_timeout;
|
||||
body_size_limit = config->body_size_limit;
|
||||
config_release_config();
|
||||
|
||||
while (*node_ref) {
|
||||
client_queue_t *node = *node_ref;
|
||||
client_t *client = node->client;
|
||||
client_slurp_result_t res;
|
||||
|
||||
node->tried_body = 1;
|
||||
|
||||
ICECAST_LOG_DEBUG("Got client %p in body queue.", client);
|
||||
|
||||
res = process_request_body_queue_one(node, timeout, body_size_limit);
|
||||
|
||||
if (res != CLIENT_SLURP_NEEDS_MORE_DATA) {
|
||||
ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client);
|
||||
|
||||
if ((client_queue_t **)_body_queue_tail == &(node->next))
|
||||
_body_queue_tail = (volatile client_queue_t **)node_ref;
|
||||
*node_ref = node->next;
|
||||
node->next = NULL;
|
||||
_add_connection(node);
|
||||
continue;
|
||||
}
|
||||
node_ref = &node->next;
|
||||
}
|
||||
}
|
||||
|
||||
/* add node to the queue of requests. This is where the clients are when
|
||||
* initial http details are read.
|
||||
@ -621,6 +786,7 @@ void connection_accept_loop(void)
|
||||
duration = 300; /* use longer timeouts when nothing waiting */
|
||||
}
|
||||
process_request_queue();
|
||||
process_request_body_queue();
|
||||
}
|
||||
|
||||
/* Give all the other threads notification to shut down */
|
||||
@ -776,8 +942,7 @@ static inline void source_startup(client_t *client, const char *uri)
|
||||
ret = util_http_build_header(ok->data, PER_CLIENT_REFBUF_SIZE, 0, 0, status_to_send, NULL, NULL, NULL, NULL, NULL, client);
|
||||
snprintf(ok->data + ret, PER_CLIENT_REFBUF_SIZE - ret, "Content-Length: 0\r\n\r\n");
|
||||
ok->len = strlen(ok->data);
|
||||
/* we may have unprocessed data read in, so don't overwrite it */
|
||||
ok->associated = client->refbuf;
|
||||
refbuf_release(client->refbuf);
|
||||
client->refbuf = ok;
|
||||
fserve_add_client_callback(client, source_client_callback, source);
|
||||
}
|
||||
@ -1042,14 +1207,7 @@ static void _handle_shoutcast_compatible(client_queue_t *node)
|
||||
parser = httpp_create_parser();
|
||||
httpp_initialize(parser, NULL);
|
||||
if (httpp_parse(parser, http_compliant, strlen(http_compliant))) {
|
||||
/* we may have more than just headers, so prepare for it */
|
||||
if (node->stream_offset == node->offset) {
|
||||
client->refbuf->len = 0;
|
||||
} else {
|
||||
char *ptr = client->refbuf->data;
|
||||
client->refbuf->len = node->offset - node->stream_offset;
|
||||
memmove(ptr, ptr + node->stream_offset, client->refbuf->len);
|
||||
}
|
||||
client->refbuf->len = 0;
|
||||
client->parser = parser;
|
||||
client->protocol = ICECAST_PROTOCOL_SHOUTCAST;
|
||||
node->shoutcast = 0;
|
||||
@ -1236,6 +1394,7 @@ static void _handle_authed_client(client_t *client, void *uri, auth_result resul
|
||||
_handle_stats_request(client, uri);
|
||||
break;
|
||||
case httpp_req_get:
|
||||
case httpp_req_post:
|
||||
case httpp_req_options:
|
||||
_handle_get_request(client, uri);
|
||||
break;
|
||||
@ -1289,7 +1448,7 @@ static void _handle_authentication_mount_generic(client_t *client, void *uri, mo
|
||||
if (!mountproxy) {
|
||||
int command_type = admin_get_command_type(client->admin_command);
|
||||
if (command_type == ADMINTYPE_MOUNT || command_type == ADMINTYPE_HYBRID) {
|
||||
const char *mount = httpp_get_query_param(client->parser, "mount");
|
||||
const char *mount = httpp_get_param(client->parser, "mount");
|
||||
if (mount)
|
||||
mountproxy = __find_non_admin_mount(config, mount, type);
|
||||
}
|
||||
@ -1365,6 +1524,75 @@ static void __prepare_shoutcast_admin_cgi_request(client_t *client)
|
||||
global_unlock();
|
||||
}
|
||||
|
||||
static void _update_client_request_body_length(client_t *client)
|
||||
{
|
||||
const char *header;
|
||||
long long unsigned int scannumber;
|
||||
int have = 0;
|
||||
|
||||
if (!have) {
|
||||
if (client->parser->req_type == httpp_req_source) {
|
||||
client->request_body_length = -1; /* streaming */
|
||||
have = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!have) {
|
||||
header = httpp_getvar(client->parser, "transfer-encoding");
|
||||
if (header) {
|
||||
if (strcasecmp(header, "identity") != 0) {
|
||||
client->request_body_length = -1; /* streaming */
|
||||
have = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!have) {
|
||||
header = httpp_getvar(client->parser, "content-length");
|
||||
if (header) {
|
||||
if (sscanf(header, "%llu", &scannumber) == 1) {
|
||||
client->request_body_length = scannumber;
|
||||
have = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!have) {
|
||||
if (client->parser->req_type == httpp_req_put) {
|
||||
/* As we don't know yet, we asume this PUT is in streaming mode */
|
||||
client->request_body_length = -1; /* streaming */
|
||||
have = 1;
|
||||
}
|
||||
}
|
||||
|
||||
ICECAST_LOG_DEBUG("Client %p has request_body_length=%zi", client, client->request_body_length);
|
||||
}
|
||||
|
||||
/* Check if we need body of client */
|
||||
static int _need_body(client_queue_t *node)
|
||||
{
|
||||
client_t *client = node->client;
|
||||
|
||||
if (node->tried_body)
|
||||
return 0;
|
||||
|
||||
if (client->parser->req_type == httpp_req_source) {
|
||||
/* SOURCE connection. */
|
||||
return 0;
|
||||
} else if (client->parser->req_type == httpp_req_put) {
|
||||
/* PUT connection.
|
||||
* TODO: We may need body for /admin/ but we do not know if it's an admin request yet.
|
||||
*/
|
||||
return 0;
|
||||
} else if (client->request_body_length != -1 && (size_t)client->request_body_length != client->request_body_read) {
|
||||
return 1;
|
||||
} else if (client->request_body_length == -1 && client_body_eof(client) == 0) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Connection thread. Here we take clients off the connection queue and check
|
||||
* the contents provided. We set up the parser then hand off to the specific
|
||||
* request handler.
|
||||
@ -1401,13 +1629,32 @@ static void _handle_connection(void)
|
||||
char *uri;
|
||||
const char *upgrade, *connection;
|
||||
|
||||
/* we may have more than just headers, so prepare for it */
|
||||
if (node->stream_offset == node->offset) {
|
||||
client->refbuf->len = 0;
|
||||
} else {
|
||||
char *ptr = client->refbuf->data;
|
||||
client->refbuf->len = node->offset - node->stream_offset;
|
||||
memmove (ptr, ptr + node->stream_offset, client->refbuf->len);
|
||||
client->refbuf->len = 0;
|
||||
|
||||
/* early check if we need more data */
|
||||
_update_client_request_body_length(client);
|
||||
if (_need_body(node)) {
|
||||
/* Just calling _add_body_client() would do the job.
|
||||
* However, if the client only has a small body this might work without moving it between queues.
|
||||
* -> much faster.
|
||||
*/
|
||||
client_slurp_result_t res;
|
||||
ice_config_t *config;
|
||||
time_t timeout;
|
||||
size_t body_size_limit;
|
||||
|
||||
config = config_get_config();
|
||||
timeout = time(NULL) - config->body_timeout;
|
||||
body_size_limit = config->body_size_limit;
|
||||
config_release_config();
|
||||
|
||||
res = process_request_body_queue_one(node, timeout, body_size_limit);
|
||||
if (res != CLIENT_SLURP_SUCCESS) {
|
||||
_add_body_client(node);
|
||||
continue;
|
||||
} else {
|
||||
ICECAST_LOG_DEBUG("Success on fast lane");
|
||||
}
|
||||
}
|
||||
|
||||
rawuri = httpp_getvar(parser, HTTPP_VAR_URI);
|
||||
@ -1416,6 +1663,7 @@ static void _handle_connection(void)
|
||||
if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0)
|
||||
httpp_set_query_param (client->parser, "mount", node->shoutcast_mount);
|
||||
|
||||
free (node->bodybuffer);
|
||||
free (node->shoutcast_mount);
|
||||
free (node);
|
||||
|
||||
@ -1453,7 +1701,7 @@ static void _handle_connection(void)
|
||||
continue;
|
||||
}
|
||||
|
||||
client->mode = config_str_to_omode(httpp_get_query_param(client->parser, "omode"));
|
||||
client->mode = config_str_to_omode(httpp_get_param(client->parser, "omode"));
|
||||
|
||||
if (_handle_resources(client, &uri) != 0) {
|
||||
client_destroy (client);
|
||||
@ -1579,5 +1827,7 @@ void connection_close(connection_t *con)
|
||||
sock_close(con->sock);
|
||||
if (con->ip)
|
||||
free(con->ip);
|
||||
if (con->readbufferlen)
|
||||
free(con->readbuffer);
|
||||
free(con);
|
||||
}
|
||||
|
@ -40,6 +40,9 @@ struct connection_tag {
|
||||
int (*send)(connection_t *handle, const void *buf, size_t len);
|
||||
int (*read)(connection_t *handle, void *buf, size_t len);
|
||||
|
||||
void *readbuffer;
|
||||
size_t readbufferlen;
|
||||
|
||||
char *ip;
|
||||
};
|
||||
|
||||
@ -55,6 +58,7 @@ void connection_queue(connection_t *con);
|
||||
void connection_uses_tls(connection_t *con);
|
||||
|
||||
ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len);
|
||||
int connection_read_put_back(connection_t *con, const void *buf, size_t len);
|
||||
|
||||
extern rwlock_t _source_shutdown_rwlock;
|
||||
|
||||
|
@ -317,7 +317,7 @@ static refbuf_t *ebml_get_buffer(source_t *source)
|
||||
} else if(read_bytes == 0) {
|
||||
/* Feed more bytes into the parser */
|
||||
write_buffer = ebml_get_write_buffer(ebml_source_state->ebml, &write_bytes);
|
||||
read_bytes = client_read_bytes (source->client, write_buffer, write_bytes);
|
||||
read_bytes = client_body_read(source->client, write_buffer, write_bytes);
|
||||
if (read_bytes <= 0) {
|
||||
ebml_wrote (ebml_source_state->ebml, 0);
|
||||
return NULL;
|
||||
|
@ -465,7 +465,7 @@ static void format_mp3_free_plugin(format_plugin_t *self)
|
||||
*/
|
||||
static int complete_read(source_t *source)
|
||||
{
|
||||
int bytes;
|
||||
ssize_t bytes;
|
||||
format_plugin_t *format = source->format;
|
||||
mp3_state *source_mp3 = format->_state;
|
||||
char *buf;
|
||||
@ -480,10 +480,11 @@ static int complete_read(source_t *source)
|
||||
}
|
||||
buf = source_mp3->read_data->data + source_mp3->read_count;
|
||||
|
||||
bytes = client_read_bytes (source->client, buf, REFBUF_SIZE-source_mp3->read_count);
|
||||
bytes = client_body_read(source->client, buf, REFBUF_SIZE-source_mp3->read_count);
|
||||
if (bytes < 0)
|
||||
{
|
||||
if (source->client->con->error)
|
||||
/* Why do we do this here (not source.c)? -- ph3-der-loewe, 2018-04-17 */
|
||||
if (client_body_eof(source->client))
|
||||
{
|
||||
refbuf_release (source_mp3->read_data);
|
||||
source_mp3->read_data = NULL;
|
||||
|
@ -402,7 +402,7 @@ static refbuf_t *ogg_get_buffer(source_t *source)
|
||||
ogg_state_t *ogg_info = source->format->_state;
|
||||
format_plugin_t *format = source->format;
|
||||
char *data = NULL;
|
||||
int bytes = 0;
|
||||
ssize_t bytes = 0;
|
||||
|
||||
while (1)
|
||||
{
|
||||
@ -449,7 +449,7 @@ static refbuf_t *ogg_get_buffer(source_t *source)
|
||||
/* we need more data to continue getting pages */
|
||||
data = ogg_sync_buffer (&ogg_info->oy, 4096);
|
||||
|
||||
bytes = client_read_bytes (source->client, data, 4096);
|
||||
bytes = client_body_read(source->client, data, 4096);
|
||||
if (bytes <= 0)
|
||||
{
|
||||
ogg_sync_wrote (&ogg_info->oy, 0);
|
||||
|
12
src/source.c
12
src/source.c
@ -516,10 +516,7 @@ static refbuf_t *get_next_buffer (source_t *source)
|
||||
}
|
||||
source->last_read = current;
|
||||
refbuf = source->format->get_buffer (source);
|
||||
if (source->client->con->tls && tls_got_shutdown(source->client->con->tls) > 1)
|
||||
source->client->con->error = 1;
|
||||
if (source->client->con && source->client->con->error)
|
||||
{
|
||||
if (client_body_eof(source->client)) {
|
||||
ICECAST_LOG_INFO("End of Stream %s", source->mount);
|
||||
source->running = 0;
|
||||
continue;
|
||||
@ -1321,7 +1318,6 @@ void source_client_callback (client_t *client, void *arg)
|
||||
{
|
||||
const char *agent;
|
||||
source_t *source = arg;
|
||||
refbuf_t *old_data = client->refbuf;
|
||||
|
||||
if (client->con->error)
|
||||
{
|
||||
@ -1332,9 +1328,9 @@ void source_client_callback (client_t *client, void *arg)
|
||||
source_free_source (source);
|
||||
return;
|
||||
}
|
||||
client->refbuf = old_data->associated;
|
||||
old_data->associated = NULL;
|
||||
refbuf_release (old_data);
|
||||
|
||||
client->refbuf->len = 0;
|
||||
|
||||
stats_event (source->mount, "source_ip", source->client->con->ip);
|
||||
agent = httpp_getvar (source->client->parser, "user-agent");
|
||||
if (agent)
|
||||
|
@ -1020,7 +1020,7 @@ void stats_transform_xslt(client_t *client, const char *uri)
|
||||
{
|
||||
xmlDocPtr doc;
|
||||
char *xslpath = util_get_path_from_normalised_uri(uri);
|
||||
const char *mount = httpp_get_query_param(client->parser, "mount");
|
||||
const char *mount = httpp_get_param(client->parser, "mount");
|
||||
|
||||
doc = stats_get_xml(0, mount, client->mode);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user