1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2025-01-03 14:56:34 -05:00

Update: Use own thread for request and body queues

This commit is contained in:
Philipp Schafft 2022-03-20 11:10:17 +00:00
parent 4cc76466ac
commit 0645d82bd1

View File

@ -98,12 +98,17 @@ typedef struct {
client_queue_entry_t *head;
client_queue_entry_t **tail;
mutex_t mutex;
cond_t cond;
thread_type *thread;
bool running;
#ifdef HAVE_POLL
struct pollfd *pollfds;
size_t pollfds_len;
#endif
} client_queue_t;
#define QUEUE_READY_TIMEOUT 800
static spin_t _connection_lock; // protects _current_id
static volatile connection_id_t _current_id = 0;
static int _initialized = 0;
@ -122,28 +127,56 @@ rwlock_t _source_shutdown_rwlock;
static void _handle_connection(void);
static void get_tls_certificate(ice_config_t *config);
static void free_client_node(client_queue_entry_t *node);
static void * process_request_queue (client_queue_t *queue);
static void * process_request_body_queue (client_queue_t *queue);
static void client_queue_init(client_queue_t *queue)
{
memset(queue, 0, sizeof(*queue));
queue->tail = &(queue->head);
thread_mutex_create(&(queue->mutex));
thread_cond_create(&(queue->cond));
}
static void client_queue_destroy(client_queue_t *queue)
{
if (queue->thread) {
queue->running = false;
thread_cond_broadcast(&(queue->cond));
thread_join(queue->thread);
}
thread_cond_destroy(&(queue->cond));
thread_mutex_destroy(&(queue->mutex));
#ifdef HAVE_POLL
free(queue->pollfds);
#endif
}
static void client_queue_start_thread(client_queue_t *queue, void *(*func)(client_queue_t *))
{
if (queue->thread)
return;
queue->running = true;
queue->thread = thread_create("queue thread", (void*(*)(void*))func, queue, THREAD_ATTACHED);
}
static inline bool client_queue_running(client_queue_t *queue)
{
return queue->running;
}
static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry)
{
thread_mutex_lock(&(queue->mutex));
*(queue->tail) = entry;
queue->tail = &(entry->next);
thread_mutex_unlock(&(queue->mutex));
thread_cond_broadcast(&(queue->cond));
}
static void client_queue_wait(client_queue_t *queue)
{
thread_cond_wait(&(queue->cond));
}
static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_queue_entry_t *stop)
@ -168,13 +201,14 @@ static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_q
return ret;
}
static bool client_queue_check_ready(client_queue_t *queue, int timeout)
static bool client_queue_check_ready(client_queue_t *queue, int timeout, time_t connection_timeout)
{
if (!queue->head)
return false;
#ifdef HAVE_POLL
if (true) {
bool had_timeout = false;
size_t count = 0;
size_t i;
client_queue_entry_t *cur;
@ -182,7 +216,12 @@ static bool client_queue_check_ready(client_queue_t *queue, int timeout)
thread_mutex_lock(&(queue->mutex));
for (cur = queue->head; cur; cur = cur->next) {
count++;
cur->ready = false;
if (cur->client->con->con_time <= connection_timeout) {
cur->ready = true;
had_timeout = true;
} else {
cur->ready = false;
}
}
if (queue->pollfds_len < count) {
@ -206,6 +245,9 @@ static bool client_queue_check_ready(client_queue_t *queue, int timeout)
}
thread_mutex_unlock(&(queue->mutex));
if (had_timeout)
return true;
if (poll(queue->pollfds, count, timeout) < 1)
return false;
@ -266,12 +308,6 @@ static client_queue_entry_t * client_queue_shift_ready(client_queue_t *queue, cl
#endif
}
static bool client_queue_empty(client_queue_t *queue)
{
/* No need to lock here as this is a point-in-time thing anyway */
return queue->head == NULL ? true : false;
}
void connection_initialize(void)
{
if (_initialized)
@ -284,6 +320,9 @@ void connection_initialize(void)
client_queue_init(&_connection_queue);
client_queue_init(&_body_queue);
client_queue_start_thread(&_request_queue, process_request_queue);
client_queue_start_thread(&_body_queue, process_request_body_queue);
_initialized = 1;
}
@ -295,7 +334,7 @@ void connection_shutdown(void)
tls_ctx_unref(tls_ctx);
matchfile_release(banned_ip);
matchfile_release(allowed_ip);
thread_rwlock_destroy(&_source_shutdown_rwlock);
thread_spin_destroy (&_connection_lock);
thread_mutex_destroy(&move_clients_mutex);
@ -553,7 +592,7 @@ int connection_read_put_back(connection_t *con, const void *buf, size_t len)
}
/* run along queue checking for any data that has come in or a timeout */
static bool process_request_queue_one (client_queue_entry_t *node, int timeout)
static bool process_request_queue_one (client_queue_entry_t *node, time_t timeout)
{
client_t *client = node->client;
int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
@ -571,7 +610,7 @@ static bool process_request_queue_one (client_queue_entry_t *node, int timeout)
}
if (len > 0) {
if (client->con->con_time + timeout <= time(NULL)) {
if (client->con->con_time <= timeout) {
len = 0;
} else {
len = client_read_bytes(client, buf, len);
@ -642,28 +681,32 @@ static bool process_request_queue_one (client_queue_entry_t *node, int timeout)
return false;
}
static void process_request_queue (void)
static void * process_request_queue (client_queue_t *queue)
{
client_queue_entry_t *stop = NULL;
client_queue_entry_t *node;
ice_config_t *config;
int timeout;
while (client_queue_running(queue)) {
client_queue_entry_t *stop = NULL;
client_queue_entry_t *node;
ice_config_t *config;
time_t timeout;
config = config_get_config();
timeout = config->header_timeout;
config_release_config();
config = config_get_config();
timeout = time(NULL) - config->header_timeout;
config_release_config();
client_queue_check_ready(&_request_queue, 100);
client_queue_check_ready(queue, QUEUE_READY_TIMEOUT, timeout);
while ((node = client_queue_shift_ready(&_request_queue, stop))) {
if (!process_request_queue_one(node, timeout)) {
client_queue_add(&_request_queue, node);
if (!stop)
stop = node;
while ((node = client_queue_shift_ready(queue, stop))) {
if (!process_request_queue_one(node, timeout)) {
client_queue_add(queue, node);
if (!stop)
stop = node;
}
}
_handle_connection();
}
_handle_connection();
return NULL;
}
static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t *node, time_t timeout, size_t body_size_limit)
@ -704,42 +747,48 @@ static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t
}
/* This queue reads data from the body of clients. */
static void process_request_body_queue (void)
static void * process_request_body_queue (client_queue_t *queue)
{
client_queue_entry_t *stop = NULL;
client_queue_entry_t *node;
ice_config_t *config;
time_t timeout;
size_t body_size_limit;
while (client_queue_running(queue)) {
client_queue_entry_t *stop = NULL;
client_queue_entry_t *node;
ice_config_t *config;
time_t timeout;
size_t body_size_limit;
ICECAST_LOG_DDEBUG("Processing body queue.");
ICECAST_LOG_DDEBUG("Processing body queue.");
config = config_get_config();
timeout = time(NULL) - config->body_timeout;
body_size_limit = config->body_size_limit;
config_release_config();
config = config_get_config();
timeout = time(NULL) - config->body_timeout;
body_size_limit = config->body_size_limit;
config_release_config();
while ((node = client_queue_shift(&_body_queue, stop))) {
client_t *client = node->client;
client_slurp_result_t res;
client_queue_check_ready(queue, QUEUE_READY_TIMEOUT, timeout);
node->tried_body = 1;
while ((node = client_queue_shift(queue, stop))) {
client_t *client = node->client;
client_slurp_result_t res;
ICECAST_LOG_DEBUG("Got client %p in body queue.", client);
node->tried_body = 1;
res = process_request_body_queue_one(node, timeout, body_size_limit);
ICECAST_LOG_DEBUG("Got client %p in body queue.", client);
if (res == CLIENT_SLURP_NEEDS_MORE_DATA) {
client_queue_add(&_body_queue, node);
if (!stop)
stop = node;
} else {
ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client);
res = process_request_body_queue_one(node, timeout, body_size_limit);
client_queue_add(&_connection_queue, node);
continue;
if (res == CLIENT_SLURP_NEEDS_MORE_DATA) {
client_queue_add(queue, node);
if (!stop)
stop = node;
} else {
ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client);
client_queue_add(&_connection_queue, node);
continue;
}
}
}
return NULL;
}
static client_queue_entry_t *create_client_node(client_t *client)
@ -813,26 +862,18 @@ void connection_queue(connection_t *con)
void connection_accept_loop(void)
{
connection_t *con;
ice_config_t *config;
int duration = 300;
config = config_get_config();
get_tls_certificate(config);
config_release_config();
while (global.running == ICECAST_RUNNING) {
con = listensocket_container_accept(global.listensockets, duration);
connection_t *con = listensocket_container_accept(global.listensockets, QUEUE_READY_TIMEOUT);
if (con) {
connection_queue(con);
duration = 5;
} else {
if (client_queue_empty(&_request_queue) && client_queue_empty(&_body_queue))
duration = 300; /* use longer timeouts when nothing waiting */
}
process_request_queue();
process_request_body_queue();
}
/* Give all the other threads notification to shut down */