mirror of
https://gitlab.xiph.org/xiph/icecast-server.git
synced 2025-02-02 15:07:36 -05:00
Update: Check if clients are ready before interacting with them
This commit is contained in:
parent
2e0bb32535
commit
4cc76466ac
239
src/connection.c
239
src/connection.c
@ -90,6 +90,7 @@ typedef struct client_queue_tag {
|
|||||||
char *bodybuffer;
|
char *bodybuffer;
|
||||||
size_t bodybufferlen;
|
size_t bodybufferlen;
|
||||||
int tried_body;
|
int tried_body;
|
||||||
|
bool ready;
|
||||||
struct client_queue_tag *next;
|
struct client_queue_tag *next;
|
||||||
} client_queue_entry_t;
|
} client_queue_entry_t;
|
||||||
|
|
||||||
@ -97,6 +98,10 @@ typedef struct {
|
|||||||
client_queue_entry_t *head;
|
client_queue_entry_t *head;
|
||||||
client_queue_entry_t **tail;
|
client_queue_entry_t **tail;
|
||||||
mutex_t mutex;
|
mutex_t mutex;
|
||||||
|
#ifdef HAVE_POLL
|
||||||
|
struct pollfd *pollfds;
|
||||||
|
size_t pollfds_len;
|
||||||
|
#endif
|
||||||
} client_queue_t;
|
} client_queue_t;
|
||||||
|
|
||||||
static spin_t _connection_lock; // protects _current_id
|
static spin_t _connection_lock; // protects _current_id
|
||||||
@ -128,6 +133,9 @@ static void client_queue_init(client_queue_t *queue)
|
|||||||
static void client_queue_destroy(client_queue_t *queue)
|
static void client_queue_destroy(client_queue_t *queue)
|
||||||
{
|
{
|
||||||
thread_mutex_destroy(&(queue->mutex));
|
thread_mutex_destroy(&(queue->mutex));
|
||||||
|
#ifdef HAVE_POLL
|
||||||
|
free(queue->pollfds);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry)
|
static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry)
|
||||||
@ -160,6 +168,104 @@ static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_q
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool client_queue_check_ready(client_queue_t *queue, int timeout)
|
||||||
|
{
|
||||||
|
if (!queue->head)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
#ifdef HAVE_POLL
|
||||||
|
if (true) {
|
||||||
|
size_t count = 0;
|
||||||
|
size_t i;
|
||||||
|
client_queue_entry_t *cur;
|
||||||
|
|
||||||
|
thread_mutex_lock(&(queue->mutex));
|
||||||
|
for (cur = queue->head; cur; cur = cur->next) {
|
||||||
|
count++;
|
||||||
|
cur->ready = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (queue->pollfds_len < count) {
|
||||||
|
free(queue->pollfds);
|
||||||
|
queue->pollfds = calloc(count, sizeof(*queue->pollfds));
|
||||||
|
if (queue->pollfds) {
|
||||||
|
queue->pollfds_len = count;
|
||||||
|
} else {
|
||||||
|
ICECAST_LOG_ERROR("Allocation of queue->pollfds failed. BAD.");
|
||||||
|
queue->pollfds_len = 0;
|
||||||
|
thread_mutex_unlock(&(queue->mutex));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
memset(queue->pollfds, 0, sizeof(*queue->pollfds)*count);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (cur = queue->head, i = 0; cur && i < count; cur = cur->next, i++) {
|
||||||
|
queue->pollfds[i].fd = cur->client->con->sock;
|
||||||
|
queue->pollfds[i].events = POLLIN;
|
||||||
|
}
|
||||||
|
thread_mutex_unlock(&(queue->mutex));
|
||||||
|
|
||||||
|
if (poll(queue->pollfds, count, timeout) < 1)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
thread_mutex_lock(&(queue->mutex));
|
||||||
|
for (cur = queue->head; cur; cur = cur->next) {
|
||||||
|
for (i = 0; i < count; i++) {
|
||||||
|
if (queue->pollfds[i].fd == cur->client->con->sock) {
|
||||||
|
if (queue->pollfds[i].revents) {
|
||||||
|
cur->ready = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
thread_mutex_unlock(&(queue->mutex));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static client_queue_entry_t * client_queue_shift_ready(client_queue_t *queue, client_queue_entry_t *stop)
|
||||||
|
{
|
||||||
|
#ifdef HAVE_POLL
|
||||||
|
client_queue_entry_t *cur;
|
||||||
|
client_queue_entry_t *last = NULL;
|
||||||
|
|
||||||
|
if (!queue->head)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
thread_mutex_lock(&(queue->mutex));
|
||||||
|
for (cur = queue->head; cur && cur != stop; cur = cur->next) {
|
||||||
|
if (cur->ready) {
|
||||||
|
// use this one.
|
||||||
|
if (last == NULL) {
|
||||||
|
/* we are the head */
|
||||||
|
queue->head = cur->next;
|
||||||
|
if (!queue->head) {
|
||||||
|
queue->tail = &(queue->head);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
last->next = cur->next;
|
||||||
|
if (queue->tail == &(cur->next)) {
|
||||||
|
queue->tail = &(last->next);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cur->next = NULL;
|
||||||
|
thread_mutex_unlock(&(queue->mutex));
|
||||||
|
return cur;
|
||||||
|
}
|
||||||
|
last = cur;
|
||||||
|
}
|
||||||
|
thread_mutex_unlock(&(queue->mutex));
|
||||||
|
return NULL;
|
||||||
|
#else
|
||||||
|
/* just return any */
|
||||||
|
return client_queue_shift(queue, stop);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
static bool client_queue_empty(client_queue_t *queue)
|
static bool client_queue_empty(client_queue_t *queue)
|
||||||
{
|
{
|
||||||
/* No need to lock here as this is a point-in-time thing anyway */
|
/* No need to lock here as this is a point-in-time thing anyway */
|
||||||
@ -547,7 +653,9 @@ static void process_request_queue (void)
|
|||||||
timeout = config->header_timeout;
|
timeout = config->header_timeout;
|
||||||
config_release_config();
|
config_release_config();
|
||||||
|
|
||||||
while ((node = client_queue_shift(&_request_queue, stop))) {
|
client_queue_check_ready(&_request_queue, 100);
|
||||||
|
|
||||||
|
while ((node = client_queue_shift_ready(&_request_queue, stop))) {
|
||||||
if (!process_request_queue_one(node, timeout)) {
|
if (!process_request_queue_one(node, timeout)) {
|
||||||
client_queue_add(&_request_queue, node);
|
client_queue_add(&_request_queue, node);
|
||||||
if (!stop)
|
if (!stop)
|
||||||
@ -941,75 +1049,70 @@ static void _handle_connection(void)
|
|||||||
const char *rawuri;
|
const char *rawuri;
|
||||||
client_queue_entry_t *node;
|
client_queue_entry_t *node;
|
||||||
|
|
||||||
while (1) {
|
while ((node = client_queue_shift(&_connection_queue, NULL))) {
|
||||||
node = client_queue_shift(&_connection_queue, NULL);
|
client_t *client = node->client;
|
||||||
if (node) {
|
int already_parsed = 0;
|
||||||
client_t *client = node->client;
|
|
||||||
int already_parsed = 0;
|
|
||||||
|
|
||||||
/* Check for special shoutcast compatability processing */
|
/* Check for special shoutcast compatability processing */
|
||||||
if (node->shoutcast) {
|
if (node->shoutcast) {
|
||||||
_handle_shoutcast_compatible (node);
|
_handle_shoutcast_compatible (node);
|
||||||
if (node->shoutcast)
|
if (node->shoutcast)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* process normal HTTP headers */
|
/* process normal HTTP headers */
|
||||||
if (client->parser) {
|
if (client->parser) {
|
||||||
already_parsed = 1;
|
already_parsed = 1;
|
||||||
parser = client->parser;
|
parser = client->parser;
|
||||||
} else {
|
} else {
|
||||||
parser = httpp_create_parser();
|
parser = httpp_create_parser();
|
||||||
httpp_initialize(parser, NULL);
|
httpp_initialize(parser, NULL);
|
||||||
client->parser = parser;
|
client->parser = parser;
|
||||||
}
|
}
|
||||||
if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) {
|
if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) {
|
||||||
client->refbuf->len = 0;
|
client->refbuf->len = 0;
|
||||||
|
|
||||||
/* early check if we need more data */
|
/* early check if we need more data */
|
||||||
client_complete(client);
|
client_complete(client);
|
||||||
if (_need_body(node)) {
|
if (_need_body(node)) {
|
||||||
/* Just calling client_queue_add(&_body_queue, node) would do the job.
|
/* Just calling client_queue_add(&_body_queue, node) would do the job.
|
||||||
* However, if the client only has a small body this might work without moving it between queues.
|
* However, if the client only has a small body this might work without moving it between queues.
|
||||||
* -> much faster.
|
* -> much faster.
|
||||||
*/
|
*/
|
||||||
client_slurp_result_t res;
|
client_slurp_result_t res;
|
||||||
ice_config_t *config;
|
ice_config_t *config;
|
||||||
time_t timeout;
|
time_t timeout;
|
||||||
size_t body_size_limit;
|
size_t body_size_limit;
|
||||||
|
|
||||||
config = config_get_config();
|
config = config_get_config();
|
||||||
timeout = time(NULL) - config->body_timeout;
|
timeout = time(NULL) - config->body_timeout;
|
||||||
body_size_limit = config->body_size_limit;
|
body_size_limit = config->body_size_limit;
|
||||||
config_release_config();
|
config_release_config();
|
||||||
|
|
||||||
res = process_request_body_queue_one(node, timeout, body_size_limit);
|
res = process_request_body_queue_one(node, timeout, body_size_limit);
|
||||||
if (res != CLIENT_SLURP_SUCCESS) {
|
if (res != CLIENT_SLURP_SUCCESS) {
|
||||||
ICECAST_LOG_DEBUG("Putting client %p in body queue.", client);
|
ICECAST_LOG_DEBUG("Putting client %p in body queue.", client);
|
||||||
client_queue_add(&_body_queue, node);
|
client_queue_add(&_body_queue, node);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ICECAST_LOG_DEBUG("Success on fast lane");
|
ICECAST_LOG_DEBUG("Success on fast lane");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rawuri = httpp_getvar(parser, HTTPP_VAR_URI);
|
rawuri = httpp_getvar(parser, HTTPP_VAR_URI);
|
||||||
|
|
||||||
/* assign a port-based shoutcast mountpoint if required */
|
/* assign a port-based shoutcast mountpoint if required */
|
||||||
if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0)
|
if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0)
|
||||||
httpp_set_query_param (client->parser, "mount", node->shoutcast_mount);
|
httpp_set_query_param (client->parser, "mount", node->shoutcast_mount);
|
||||||
|
|
||||||
free_client_node(node);
|
free_client_node(node);
|
||||||
|
|
||||||
connection_handle_client(client);
|
connection_handle_client(client);
|
||||||
} else {
|
} else {
|
||||||
free (node);
|
free_client_node(node);
|
||||||
ICECAST_LOG_ERROR("HTTP request parsing failed");
|
ICECAST_LOG_ERROR("HTTP request parsing failed");
|
||||||
client_destroy (client);
|
client_destroy (client);
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user