From 843aff3266ae7f82d90d258cba781bb689ae03d0 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 19 Mar 2022 12:46:28 +0000 Subject: [PATCH 01/14] Cleanup: Removed parts of connection.[ch] into new connection_handle.[ch] --- src/Makefile.am | 2 + src/connection.c | 707 +------------------------------------ src/connection_handle.c | 747 ++++++++++++++++++++++++++++++++++++++++ src/connection_handle.h | 16 + 4 files changed, 767 insertions(+), 705 deletions(-) create mode 100644 src/connection_handle.c create mode 100644 src/connection_handle.h diff --git a/src/Makefile.am b/src/Makefile.am index e58b7016..9b161ff6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -13,6 +13,7 @@ noinst_HEADERS = \ logging.h \ sighandler.h \ connection.h \ + connection_handle.h \ global.h \ util.h \ errors.h \ @@ -68,6 +69,7 @@ icecast_SOURCES = \ logging.c \ sighandler.c \ connection.c \ + connection_handle.c \ global.c \ util.c \ errors.c \ diff --git a/src/connection.c b/src/connection.c index 3cb42b26..fc66e6d1 100644 --- a/src/connection.c +++ b/src/connection.c @@ -41,6 +41,7 @@ #include "compat.h" #include "connection.h" +#include "connection_handle.h" #include "cfgfile.h" #include "global.h" #include "util.h" @@ -107,7 +108,6 @@ static matchfile_t *banned_ip, *allowed_ip; rwlock_t _source_shutdown_rwlock; -static int _update_admin_command(client_t *client); static void _handle_connection(void); static void get_tls_certificate(ice_config_t *config); @@ -815,295 +815,6 @@ int connection_complete_source(source_t *source, int response) return -1; } -static inline void source_startup(client_t *client) -{ - source_t *source; - source = source_reserve(client->uri); - - if (source) { - source->client = client; - source->parser = client->parser; - source->con = client->con; - if (connection_complete_source(source, 1) < 0) { - source_clear_source(source); - source_free_source(source); - return; - } - client->respcode = 200; - if (client->protocol == ICECAST_PROTOCOL_SHOUTCAST) { - client->respcode = 200; - /* send this non-blocking but if there is only a partial write - * then leave to header timeout */ - client_send_bytes(client, "OK2\r\nicy-caps:11\r\n\r\n", 20); /* TODO: Replace Magic Number! */ - source->shoutcast_compat = 1; - source_client_callback(client, source); - } else { - refbuf_t *ok = refbuf_new(PER_CLIENT_REFBUF_SIZE); - const char *expectcontinue; - const char *transfer_encoding; - int status_to_send = 0; - ssize_t ret; - - transfer_encoding = httpp_getvar(source->parser, "transfer-encoding"); - if (transfer_encoding && strcasecmp(transfer_encoding, HTTPP_ENCODING_IDENTITY) != 0) { - client->encoding = httpp_encoding_new(transfer_encoding); - if (!client->encoding) { - client_send_error_by_id(client, ICECAST_ERROR_CON_UNIMPLEMENTED); - return; - } - } - - if (source->parser && source->parser->req_type == httpp_req_source) { - status_to_send = 200; - } else { - /* For PUT support we check for 100-continue and send back a 100 to stay in spec */ - expectcontinue = httpp_getvar (source->parser, "expect"); - - if (expectcontinue != NULL) { -#ifdef HAVE_STRCASESTR - if (strcasestr (expectcontinue, "100-continue") != NULL) -#else - ICECAST_LOG_WARN("OS doesn't support case insensitive substring checks..."); - if (strstr (expectcontinue, "100-continue") != NULL) -#endif - { - status_to_send = 100; - } - } - } - - client->respcode = 200; - if (status_to_send) { - ret = util_http_build_header(ok->data, PER_CLIENT_REFBUF_SIZE, 0, 0, status_to_send, NULL, NULL, NULL, NULL, NULL, client); - snprintf(ok->data + ret, PER_CLIENT_REFBUF_SIZE - ret, "Content-Length: 0\r\n\r\n"); - ok->len = strlen(ok->data); - } else { - ok->len = 0; - } - refbuf_release(client->refbuf); - client->refbuf = ok; - fserve_add_client_callback(client, source_client_callback, source); - } - } else { - client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNT_IN_USE); - ICECAST_LOG_WARN("Mountpoint %s in use", client->uri); - } -} - -/* only called for native icecast source clients */ -static void _handle_source_request(client_t *client) -{ - const char *method = httpp_getvar(client->parser, HTTPP_VAR_REQ_TYPE); - - ICECAST_LOG_INFO("Source logging in at mountpoint \"%s\" using %s%H%s from %s as role %s with acl %s", - client->uri, - ((method) ? "\"" : "<"), ((method) ? method : "unknown"), ((method) ? "\"" : ">"), - client->con->ip, client->role, acl_get_name(client->acl)); - - if (client->parser && client->parser->req_type == httpp_req_source) { - ICECAST_LOG_DEBUG("Source at mountpoint \"%s\" connected using deprecated SOURCE method.", client->uri); - } - - if (client->uri[0] != '/') { - ICECAST_LOG_WARN("source mountpoint not starting with /"); - client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNTPOINT_NOT_STARTING_WITH_SLASH); - return; - } - - source_startup(client); -} - - -static void _handle_stats_request(client_t *client) -{ - stats_event_inc(NULL, "stats_connections"); - - client->respcode = 200; - snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE, - "HTTP/1.0 200 OK\r\n\r\n"); - client->refbuf->len = strlen(client->refbuf->data); - fserve_add_client_callback(client, stats_callback, NULL); -} - -/* if 0 is returned then the client should not be touched, however if -1 - * is returned then the caller is responsible for handling the client - */ -static void __add_listener_to_source(source_t *source, client_t *client) -{ - size_t loop = 10; - - do { - ICECAST_LOG_DEBUG("max on %s is %ld (cur %lu)", source->mount, - source->max_listeners, source->listeners); - if (source->max_listeners == -1) - break; - if (source->listeners < (unsigned long)source->max_listeners) - break; - - if (loop && source->fallback_when_full && source->fallback_mount) { - source_t *next = source_find_mount (source->fallback_mount); - if (!next) { - ICECAST_LOG_ERROR("Fallback '%s' for full source '%s' not found", - source->mount, source->fallback_mount); - client_send_error_by_id(client, ICECAST_ERROR_SOURCE_MAX_LISTENERS); - return; - } - ICECAST_LOG_INFO("stream full, trying %s", next->mount); - source = next; - navigation_history_navigate_to(&(client->history), source->identifier, NAVIGATION_DIRECTION_DOWN); - loop--; - continue; - } - /* now we fail the client */ - client_send_error_by_id(client, ICECAST_ERROR_SOURCE_MAX_LISTENERS); - return; - } while (1); - - client->write_to_client = format_generic_write_to_client; - client->check_buffer = format_check_http_buffer; - client->refbuf->len = PER_CLIENT_REFBUF_SIZE; - memset(client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE); - - /* lets add the client to the active list */ - avl_tree_wlock(source->pending_tree); - avl_insert(source->pending_tree, client); - avl_tree_unlock(source->pending_tree); - - if (source->running == 0 && source->on_demand) { - /* enable on-demand relay to start, wake up the slave thread */ - ICECAST_LOG_DEBUG("kicking off on-demand relay"); - source->on_demand_req = 1; - } - ICECAST_LOG_DEBUG("Added client to %s", source->mount); -} - -/* count the number of clients on a mount with same username and same role as the given one */ -static inline ssize_t __count_user_role_on_mount (source_t *source, client_t *client) { - ssize_t ret = 0; - avl_node *node; - - avl_tree_rlock(source->client_tree); - node = avl_get_first(source->client_tree); - while (node) { - client_t *existing_client = (client_t *)node->key; - if (existing_client->username && client->username && - strcmp(existing_client->username, client->username) == 0 && - existing_client->role && client->role && - strcmp(existing_client->role, client->role) == 0) { - ret++; - } - node = avl_get_next(node); - } - avl_tree_unlock(source->client_tree); - - avl_tree_rlock(source->pending_tree); - node = avl_get_first(source->pending_tree); - while (node) { - client_t *existing_client = (client_t *)node->key; - if (existing_client->username && client->username && - strcmp(existing_client->username, client->username) == 0 && - existing_client->role && client->role && - strcmp(existing_client->role, client->role) == 0){ - ret++; - } - node = avl_get_next(node); - } - avl_tree_unlock(source->pending_tree); - return ret; -} - -static void _handle_get_request(client_t *client) { - source_t *source = NULL; - - ICECAST_LOG_DEBUG("Got client %p with URI %H", client, client->uri); - - /* there are several types of HTTP GET clients - * media clients, which are looking for a source (eg, URI = /stream.ogg), - * stats clients, which are looking for /admin/stats.xml and - * fserve clients, which are looking for static files. - */ - - stats_event_inc(NULL, "client_connections"); - - /* this is a web/ request. let's check if we are allowed to do that. */ - if (acl_test_web(client->acl) != ACL_POLICY_ALLOW) { - /* doesn't seem so, sad client :( */ - auth_reject_client_on_deny(client); - return; - } - - if (client->parser->req_type == httpp_req_options) { - client_send_204(client); - return; - } - - if (util_check_valid_extension(client->uri) == XSLT_CONTENT) { - /* If the file exists, then transform it, otherwise, write a 404 */ - ICECAST_LOG_DEBUG("Stats request, sending XSL transformed stats"); - stats_transform_xslt(client); - return; - } - - avl_tree_rlock(global.source_tree); - /* let's see if this is a source or just a random fserve file */ - source = source_find_mount_with_history(client->uri, &(client->history)); - if (source) { - /* true mount */ - do { - ssize_t max_connections_per_user = acl_get_max_connections_per_user(client->acl); - /* check for duplicate_logins */ - if (max_connections_per_user > 0) { /* -1 = not set (-> default=unlimited), 0 = unlimited */ - if (max_connections_per_user <= __count_user_role_on_mount(source, client)) { - client_send_error_by_id(client, ICECAST_ERROR_CON_PER_CRED_CLIENT_LIMIT); - break; - } - } - - if (!source->allow_direct_access) { - client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNT_NO_FOR_DIRECT_ACCESS); - break; - } - - /* Set max listening duration in case not already set. */ - if (client->con->discon_time == 0) { - time_t connection_duration = acl_get_max_connection_duration(client->acl); - if (connection_duration == -1) { - ice_config_t *config = config_get_config(); - mount_proxy *mount = config_find_mount(config, source->mount, MOUNT_TYPE_NORMAL); - if (mount && mount->max_listener_duration) - connection_duration = mount->max_listener_duration; - config_release_config(); - } - - if (connection_duration > 0) /* -1 = not set (-> default=unlimited), 0 = unlimited */ - client->con->discon_time = connection_duration + time(NULL); - } - - __add_listener_to_source(source, client); - } while (0); - avl_tree_unlock(global.source_tree); - } else { - /* file */ - avl_tree_unlock(global.source_tree); - fserve_client_create(client); - } -} - -static void _handle_delete_request(client_t *client) { - source_t *source; - - avl_tree_wlock(global.source_tree); - source = source_find_mount_raw(client->uri); - if (source) { - source->running = 0; - avl_tree_unlock(global.source_tree); - client_send_204(client); - } else { - avl_tree_unlock(global.source_tree); - client_send_error_by_id(client, ICECAST_ERROR_CON_UNKNOWN_REQUEST); - } -} - static void _handle_shoutcast_compatible(client_queue_t *node) { char *http_compliant; @@ -1190,353 +901,6 @@ static void _handle_shoutcast_compatible(client_queue_t *node) return; } -/* Handle lookups here. - */ - -static int _handle_resources(client_t *client, char **uri) -{ - const char *http_host = httpp_getvar(client->parser, "host"); - char *serverhost = NULL; - int serverport = 0; - char *vhost = NULL; - char *vhost_colon; - char *new_uri = NULL; - ice_config_t *config; - const listener_t *listen_sock; - resource_t *resource; - - if (http_host) { - vhost = strdup(http_host); - if (vhost) { - vhost_colon = strstr(vhost, ":"); - if (vhost_colon) - *vhost_colon = 0; - } - } - - config = config_get_config(); - listen_sock = listensocket_get_listener(client->con->listensocket_effective); - if (listen_sock) { - serverhost = listen_sock->bind_address; - serverport = listen_sock->port; - } - - resource = config->resources; - - /* We now go thru all resources and see if any matches. */ - for (; resource; resource = resource->next) { - /* We check for several aspects, if they DO NOT match, we continue with our search. */ - - /* Check for the URI to match. */ - if (resource->flags & ALIAS_FLAG_PREFIXMATCH) { - size_t len = strlen(resource->source); - if (strncmp(*uri, resource->source, len) != 0) - continue; - ICECAST_LOG_DEBUG("Match: *uri='%s', resource->source='%s', len=%zu", *uri, resource->source, len); - } else { - if (strcmp(*uri, resource->source) != 0) - continue; - } - - /* Check for the server's port to match. */ - if (resource->port != -1 && resource->port != serverport) - continue; - - /* Check for the server's bind address to match. */ - if (resource->bind_address != NULL && serverhost != NULL && strcmp(resource->bind_address, serverhost) != 0) - continue; - - if (resource->listen_socket != NULL && (listen_sock->id == NULL || strcmp(resource->listen_socket, listen_sock->id) != 0)) - continue; - - /* Check for the vhost to match. */ - if (resource->vhost != NULL && vhost != NULL && strcmp(resource->vhost, vhost) != 0) - continue; - - /* Ok, we found a matching entry. */ - - if (resource->destination) { - if (resource->flags & ALIAS_FLAG_PREFIXMATCH) { - size_t len = strlen(resource->source); - asprintf(&new_uri, "%s%s", resource->destination, (*uri) + len); - } else { - new_uri = strdup(resource->destination); - } - } - if (resource->omode != OMODE_DEFAULT) - client->mode = resource->omode; - - if (resource->module) { - module_t *module = module_container_get_module(global.modulecontainer, resource->module); - - if (module != NULL) { - refobject_unref(client->handler_module); - client->handler_module = module; - } else { - ICECAST_LOG_ERROR("Module used in alias not found: %s", resource->module); - } - } - - if (resource->handler) { - char *func = strdup(resource->handler); - if (func) { - free(client->handler_function); - client->handler_function = func; - } else { - ICECAST_LOG_ERROR("Can not allocate memory."); - } - } - - ICECAST_LOG_DEBUG("resource has made %s into %s", *uri, new_uri); - break; - } - - listensocket_release_listener(client->con->listensocket_effective); - config_release_config(); - - if (new_uri) { - free(*uri); - *uri = new_uri; - } - - if (vhost) - free(vhost); - - return 0; -} - -static void _handle_admin_request(client_t *client, char *adminuri) -{ - ICECAST_LOG_DEBUG("Client %p requesting admin interface.", client); - - stats_event_inc(NULL, "client_connections"); - - admin_handle_request(client, adminuri); -} - -/* Handle any client that passed the authing process. - */ -static void _handle_authed_client(client_t *client, void *userdata, auth_result result) -{ - auth_stack_release(client->authstack); - client->authstack = NULL; - - /* Update admin parameters just in case auth changed our URI */ - if (_update_admin_command(client) == -1) - return; - - fastevent_emit(FASTEVENT_TYPE_CLIENT_AUTHED, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_CLIENT, client); - - if (result != AUTH_OK) { - auth_reject_client_on_fail(client); - return; - } - - if (acl_test_method(client->acl, client->parser->req_type) != ACL_POLICY_ALLOW) { - ICECAST_LOG_ERROR("Client (role=%s, acl=%s, username=%s) not allowed to use this request method on %H", client->role, acl_get_name(client->acl), client->username, client->uri); - auth_reject_client_on_deny(client); - return; - } - - /* Dispatch legacy admin.cgi requests */ - if (strcmp(client->uri, "/admin.cgi") == 0) { - _handle_admin_request(client, client->uri + 1); - return; - } /* Dispatch all admin requests */ - else if (strncmp(client->uri, "/admin/", 7) == 0) { - _handle_admin_request(client, client->uri + 7); - return; - } - - if (client->handler_module && client->handler_function) { - const module_client_handler_t *handler = module_get_client_handler(client->handler_module, client->handler_function); - if (handler) { - handler->cb(client->handler_module, client); - return; - } else { - ICECAST_LOG_ERROR("No such handler function in module: %s", client->handler_function); - } - } - - switch (client->parser->req_type) { - case httpp_req_source: - case httpp_req_put: - _handle_source_request(client); - break; - case httpp_req_stats: - _handle_stats_request(client); - break; - case httpp_req_get: - case httpp_req_post: - case httpp_req_options: - _handle_get_request(client); - break; - case httpp_req_delete: - _handle_delete_request(client); - break; - default: - ICECAST_LOG_ERROR("Wrong request type from client"); - client_send_error_by_id(client, ICECAST_ERROR_CON_UNKNOWN_REQUEST); - break; - } -} - -/* Handle clients that still need to authenticate. - */ - -static void _handle_authentication_global(client_t *client, void *userdata, auth_result result) -{ - ice_config_t *config; - auth_stack_t *authstack; - - auth_stack_release(client->authstack); - client->authstack = NULL; - - if (result != AUTH_NOMATCH && - /* Allow global admins access to all mount points */ - !(result == AUTH_OK && client->admin_command != ADMIN_COMMAND_ERROR && acl_test_admin(client->acl, client->admin_command) == ACL_POLICY_DENY)) { - _handle_authed_client(client, userdata, result); - return; - } - - ICECAST_LOG_DEBUG("Trying global authenticators for client %p.", client); - config = config_get_config(); - authstack = config->authstack; - auth_stack_addref(authstack); - config_release_config(); - auth_stack_add_client(authstack, client, _handle_authed_client, userdata); - auth_stack_release(authstack); -} - -static inline mount_proxy * __find_non_admin_mount(ice_config_t *config, const char *name, mount_type type) -{ - if (strcmp(name, "/admin.cgi") == 0 || strncmp(name, "/admin/", 7) == 0) - return NULL; - - return config_find_mount(config, name, type); -} - -static void _handle_authentication_mount_generic(client_t *client, void *userdata, mount_type type, void (*callback)(client_t*, void*, auth_result)) -{ - ice_config_t *config; - mount_proxy *mountproxy; - auth_stack_t *stack = NULL; - - config = config_get_config(); - mountproxy = __find_non_admin_mount(config, client->uri, type); - if (!mountproxy) { - int command_type = admin_get_command_type(client->admin_command); - if (command_type == ADMINTYPE_MOUNT || command_type == ADMINTYPE_HYBRID) { - const char *mount = httpp_get_param(client->parser, "mount"); - if (mount) - mountproxy = __find_non_admin_mount(config, mount, type); - } - } - if (mountproxy && mountproxy->mounttype == type) - stack = mountproxy->authstack; - auth_stack_addref(stack); - config_release_config(); - - if (stack) { - auth_stack_add_client(stack, client, callback, userdata); - auth_stack_release(stack); - } else { - callback(client, userdata, AUTH_NOMATCH); - } -} - -static void _handle_authentication_mount_default(client_t *client, void *userdata, auth_result result) -{ - auth_stack_release(client->authstack); - client->authstack = NULL; - - if (result != AUTH_NOMATCH && - /* Allow global admins access to all mount points */ - !(result == AUTH_OK && client->admin_command != ADMIN_COMMAND_ERROR && acl_test_admin(client->acl, client->admin_command) == ACL_POLICY_DENY)) { - _handle_authed_client(client, userdata, result); - return; - } - - ICECAST_LOG_DEBUG("Trying specific authenticators for client %p.", client); - _handle_authentication_mount_generic(client, userdata, MOUNT_TYPE_DEFAULT, _handle_authentication_global); -} - -static void _handle_authentication_mount_normal(client_t *client, void *userdata, auth_result result) -{ - auth_stack_release(client->authstack); - client->authstack = NULL; - - if (result != AUTH_NOMATCH) { - _handle_authed_client(client, userdata, result); - return; - } - - ICECAST_LOG_DEBUG("Trying specific authenticators for client %p.", client); - _handle_authentication_mount_generic(client, userdata, MOUNT_TYPE_NORMAL, _handle_authentication_mount_default); -} - -static void _handle_authentication_listen_socket(client_t *client) -{ - auth_stack_t *stack = NULL; - const listener_t *listener; - - listener = listensocket_get_listener(client->con->listensocket_effective); - if (listener) { - if (listener->authstack) { - auth_stack_addref(stack = listener->authstack); - } - listensocket_release_listener(client->con->listensocket_effective); - } - - if (stack) { - auth_stack_add_client(stack, client, _handle_authentication_mount_normal, NULL); - auth_stack_release(stack); - } else { - _handle_authentication_mount_normal(client, NULL, AUTH_NOMATCH); - } -} - -static void _handle_authentication(client_t *client) -{ - fastevent_emit(FASTEVENT_TYPE_CLIENT_READY_FOR_AUTH, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_CLIENT, client); - _handle_authentication_listen_socket(client); -} - -static void __prepare_shoutcast_admin_cgi_request(client_t *client) -{ - ice_config_t *config; - const char *sc_mount; - const char *pass = httpp_get_query_param(client->parser, "pass"); - const listener_t *listener; - - if (pass == NULL) { - ICECAST_LOG_ERROR("missing pass parameter"); - return; - } - - if (client->password) { - ICECAST_LOG_INFO("Client already has password set"); - return; - } - - /* Why do we acquire a global lock here? -- ph3-der-loewe, 2018-05-11 */ - global_lock(); - config = config_get_config(); - sc_mount = config->shoutcast_mount; - - listener = listensocket_get_listener(client->con->listensocket_effective); - if (listener && listener->shoutcast_mount) - sc_mount = listener->shoutcast_mount; - - httpp_set_query_param(client->parser, "mount", sc_mount); - listensocket_release_listener(client->con->listensocket_effective); - - httpp_setvar(client->parser, HTTPP_VAR_PROTOCOL, "ICY"); - client->password = strdup(pass); - config_release_config(); - global_unlock(); -} - /* Check if we need body of client */ static int _need_body(client_queue_t *node) { @@ -1562,23 +926,6 @@ static int _need_body(client_queue_t *node) return 0; } -/* Updates client's admin_command */ -static int _update_admin_command(client_t *client) -{ - if (strcmp(client->uri, "/admin.cgi") == 0) { - client->admin_command = admin_get_command(client->uri + 1); - __prepare_shoutcast_admin_cgi_request(client); - if (!client->password) { - client_send_error_by_id(client, ICECAST_ERROR_CON_MISSING_PASS_PARAMETER); - return -1; - } - } else if (strncmp(client->uri, "/admin/", 7) == 0) { - client->admin_command = admin_get_command(client->uri + 7); - } - - return 0; -} - /* Connection thread. Here we take clients off the connection queue and check * the contents provided. We set up the parser then hand off to the specific * request handler. @@ -1612,9 +959,6 @@ static void _handle_connection(void) client->parser = parser; } if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) { - char *uri; - const char *upgrade, *connection; - client->refbuf->len = 0; /* early check if we need more data */ @@ -1653,54 +997,7 @@ static void _handle_connection(void) free (node->shoutcast_mount); free (node); - if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) && - strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) { - ICECAST_LOG_ERROR("Bad HTTP protocol detected"); - client_destroy (client); - continue; - } - - upgrade = httpp_getvar(parser, "upgrade"); - connection = httpp_getvar(parser, "connection"); - if (upgrade && connection && strcasecmp(connection, "upgrade") == 0) { - if (client->con->tlsmode == ICECAST_TLSMODE_DISABLED || client->con->tls || strstr(upgrade, "TLS/1.0") == NULL) { - client_send_error_by_id(client, ICECAST_ERROR_CON_UPGRADE_ERROR); - continue; - } else { - client_send_101(client, ICECAST_REUSE_UPGRADETLS); - continue; - } - } else if (client->con->tlsmode != ICECAST_TLSMODE_DISABLED && client->con->tlsmode != ICECAST_TLSMODE_AUTO && !client->con->tls) { - client_send_426(client, ICECAST_REUSE_UPGRADETLS); - continue; - } - - if (parser->req_type == httpp_req_options && strcmp(rawuri, "*") == 0) { - client->uri = strdup("*"); - client_send_204(client); - continue; - } - - uri = util_normalise_uri(rawuri); - - if (!uri) { - client_destroy (client); - continue; - } - - client->mode = config_str_to_omode(NULL, NULL, httpp_get_param(client->parser, "omode")); - - if (_handle_resources(client, &uri) != 0) { - client_destroy (client); - continue; - } - - client->uri = uri; - - if (_update_admin_command(client) == -1) - continue; - - _handle_authentication(client); + connection_handle_client(client); } else { free (node); ICECAST_LOG_ERROR("HTTP request parsing failed"); diff --git a/src/connection_handle.c b/src/connection_handle.c new file mode 100644 index 00000000..ce5e1be2 --- /dev/null +++ b/src/connection_handle.c @@ -0,0 +1,747 @@ +/* Icecast + * + * This program is distributed under the GNU General Public License, version 2. + * A copy of this license is included with this source. + * + * Copyright 2000-2004, Jack Moffitt , + * oddsock , + * Karl Heyes + * and others (see AUTHORS for details). + * Copyright 2011, Dave 'justdave' Miller , + * Copyright 2011-2022, Philipp "ph3-der-loewe" Schafft , + */ + +/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */ +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include + +#include "connection_handle.h" +#include "auth.h" +#include "acl.h" +#include "admin.h" +#include "global.h" +#include "fastevent.h" +#include "listensocket.h" +#include "source.h" +#include "errors.h" +#include "stats.h" +#include "fserve.h" + +#include "logging.h" +#define CATMODULE "connection-handle" + +/* Handle lookups here. + */ + +static bool _handle_resources(client_t *client, char **uri) +{ + const char *http_host = httpp_getvar(client->parser, "host"); + char *serverhost = NULL; + int serverport = 0; + char *vhost = NULL; + char *vhost_colon; + char *new_uri = NULL; + ice_config_t *config; + const listener_t *listen_sock; + resource_t *resource; + + if (http_host) { + vhost = strdup(http_host); + if (vhost) { + vhost_colon = strstr(vhost, ":"); + if (vhost_colon) + *vhost_colon = 0; + } + } + + config = config_get_config(); + listen_sock = listensocket_get_listener(client->con->listensocket_effective); + if (listen_sock) { + serverhost = listen_sock->bind_address; + serverport = listen_sock->port; + } + + resource = config->resources; + + /* We now go thru all resources and see if any matches. */ + for (; resource; resource = resource->next) { + /* We check for several aspects, if they DO NOT match, we continue with our search. */ + + /* Check for the URI to match. */ + if (resource->flags & ALIAS_FLAG_PREFIXMATCH) { + size_t len = strlen(resource->source); + if (strncmp(*uri, resource->source, len) != 0) + continue; + ICECAST_LOG_DEBUG("Match: *uri='%s', resource->source='%s', len=%zu", *uri, resource->source, len); + } else { + if (strcmp(*uri, resource->source) != 0) + continue; + } + + /* Check for the server's port to match. */ + if (resource->port != -1 && resource->port != serverport) + continue; + + /* Check for the server's bind address to match. */ + if (resource->bind_address != NULL && serverhost != NULL && strcmp(resource->bind_address, serverhost) != 0) + continue; + + if (resource->listen_socket != NULL && (listen_sock->id == NULL || strcmp(resource->listen_socket, listen_sock->id) != 0)) + continue; + + /* Check for the vhost to match. */ + if (resource->vhost != NULL && vhost != NULL && strcmp(resource->vhost, vhost) != 0) + continue; + + /* Ok, we found a matching entry. */ + + if (resource->destination) { + if (resource->flags & ALIAS_FLAG_PREFIXMATCH) { + size_t len = strlen(resource->source); + asprintf(&new_uri, "%s%s", resource->destination, (*uri) + len); + } else { + new_uri = strdup(resource->destination); + } + } + if (resource->omode != OMODE_DEFAULT) + client->mode = resource->omode; + + if (resource->module) { + module_t *module = module_container_get_module(global.modulecontainer, resource->module); + + if (module != NULL) { + refobject_unref(client->handler_module); + client->handler_module = module; + } else { + ICECAST_LOG_ERROR("Module used in alias not found: %s", resource->module); + } + } + + if (resource->handler) { + char *func = strdup(resource->handler); + if (func) { + free(client->handler_function); + client->handler_function = func; + } else { + ICECAST_LOG_ERROR("Can not allocate memory."); + } + } + + ICECAST_LOG_DEBUG("resource has made %s into %s", *uri, new_uri); + break; + } + + listensocket_release_listener(client->con->listensocket_effective); + config_release_config(); + + if (new_uri) { + free(*uri); + *uri = new_uri; + } + + if (vhost) + free(vhost); + + return true; +} + + +static void __prepare_shoutcast_admin_cgi_request(client_t *client) +{ + ice_config_t *config; + const char *sc_mount; + const char *pass = httpp_get_query_param(client->parser, "pass"); + const listener_t *listener; + + if (pass == NULL) { + ICECAST_LOG_ERROR("missing pass parameter"); + return; + } + + if (client->password) { + ICECAST_LOG_INFO("Client already has password set"); + return; + } + + /* Why do we acquire a global lock here? -- ph3-der-loewe, 2018-05-11 */ + global_lock(); + config = config_get_config(); + sc_mount = config->shoutcast_mount; + + listener = listensocket_get_listener(client->con->listensocket_effective); + if (listener && listener->shoutcast_mount) + sc_mount = listener->shoutcast_mount; + + httpp_set_query_param(client->parser, "mount", sc_mount); + listensocket_release_listener(client->con->listensocket_effective); + + httpp_setvar(client->parser, HTTPP_VAR_PROTOCOL, "ICY"); + client->password = strdup(pass); + config_release_config(); + global_unlock(); +} + +/* Updates client's admin_command */ +static bool _update_admin_command(client_t *client) +{ + if (strcmp(client->uri, "/admin.cgi") == 0) { + client->admin_command = admin_get_command(client->uri + 1); + __prepare_shoutcast_admin_cgi_request(client); + if (!client->password) { + client_send_error_by_id(client, ICECAST_ERROR_CON_MISSING_PASS_PARAMETER); + return false; + } + } else if (strncmp(client->uri, "/admin/", 7) == 0) { + client->admin_command = admin_get_command(client->uri + 7); + } + + return true; +} + +static inline void source_startup(client_t *client) +{ + source_t *source; + source = source_reserve(client->uri); + + if (source) { + source->client = client; + source->parser = client->parser; + source->con = client->con; + if (connection_complete_source(source, 1) < 0) { + source_clear_source(source); + source_free_source(source); + return; + } + client->respcode = 200; + if (client->protocol == ICECAST_PROTOCOL_SHOUTCAST) { + client->respcode = 200; + /* send this non-blocking but if there is only a partial write + * then leave to header timeout */ + client_send_bytes(client, "OK2\r\nicy-caps:11\r\n\r\n", 20); /* TODO: Replace Magic Number! */ + source->shoutcast_compat = 1; + source_client_callback(client, source); + } else { + refbuf_t *ok = refbuf_new(PER_CLIENT_REFBUF_SIZE); + const char *expectcontinue; + const char *transfer_encoding; + int status_to_send = 0; + ssize_t ret; + + transfer_encoding = httpp_getvar(source->parser, "transfer-encoding"); + if (transfer_encoding && strcasecmp(transfer_encoding, HTTPP_ENCODING_IDENTITY) != 0) { + client->encoding = httpp_encoding_new(transfer_encoding); + if (!client->encoding) { + client_send_error_by_id(client, ICECAST_ERROR_CON_UNIMPLEMENTED); + return; + } + } + + if (source->parser && source->parser->req_type == httpp_req_source) { + status_to_send = 200; + } else { + /* For PUT support we check for 100-continue and send back a 100 to stay in spec */ + expectcontinue = httpp_getvar (source->parser, "expect"); + + if (expectcontinue != NULL) { +#ifdef HAVE_STRCASESTR + if (strcasestr (expectcontinue, "100-continue") != NULL) +#else + ICECAST_LOG_WARN("OS doesn't support case insensitive substring checks..."); + if (strstr (expectcontinue, "100-continue") != NULL) +#endif + { + status_to_send = 100; + } + } + } + + client->respcode = 200; + if (status_to_send) { + ret = util_http_build_header(ok->data, PER_CLIENT_REFBUF_SIZE, 0, 0, status_to_send, NULL, NULL, NULL, NULL, NULL, client); + snprintf(ok->data + ret, PER_CLIENT_REFBUF_SIZE - ret, "Content-Length: 0\r\n\r\n"); + ok->len = strlen(ok->data); + } else { + ok->len = 0; + } + refbuf_release(client->refbuf); + client->refbuf = ok; + fserve_add_client_callback(client, source_client_callback, source); + } + } else { + client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNT_IN_USE); + ICECAST_LOG_WARN("Mountpoint %s in use", client->uri); + } +} + +/* only called for native icecast source clients */ +static void _handle_source_request(client_t *client) +{ + const char *method = httpp_getvar(client->parser, HTTPP_VAR_REQ_TYPE); + + ICECAST_LOG_INFO("Source logging in at mountpoint \"%s\" using %s%H%s from %s as role %s with acl %s", + client->uri, + ((method) ? "\"" : "<"), ((method) ? method : "unknown"), ((method) ? "\"" : ">"), + client->con->ip, client->role, acl_get_name(client->acl)); + + if (client->parser && client->parser->req_type == httpp_req_source) { + ICECAST_LOG_DEBUG("Source at mountpoint \"%s\" connected using deprecated SOURCE method.", client->uri); + } + + if (client->uri[0] != '/') { + ICECAST_LOG_WARN("source mountpoint not starting with /"); + client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNTPOINT_NOT_STARTING_WITH_SLASH); + return; + } + + source_startup(client); +} + + +static void _handle_stats_request(client_t *client) +{ + stats_event_inc(NULL, "stats_connections"); + + client->respcode = 200; + snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE, + "HTTP/1.0 200 OK\r\n\r\n"); + client->refbuf->len = strlen(client->refbuf->data); + fserve_add_client_callback(client, stats_callback, NULL); +} + +/* if 0 is returned then the client should not be touched, however if -1 + * is returned then the caller is responsible for handling the client + */ +static void __add_listener_to_source(source_t *source, client_t *client) +{ + size_t loop = 10; + + do { + ICECAST_LOG_DEBUG("max on %s is %ld (cur %lu)", source->mount, + source->max_listeners, source->listeners); + if (source->max_listeners == -1) + break; + if (source->listeners < (unsigned long)source->max_listeners) + break; + + if (loop && source->fallback_when_full && source->fallback_mount) { + source_t *next = source_find_mount (source->fallback_mount); + if (!next) { + ICECAST_LOG_ERROR("Fallback '%s' for full source '%s' not found", + source->mount, source->fallback_mount); + client_send_error_by_id(client, ICECAST_ERROR_SOURCE_MAX_LISTENERS); + return; + } + ICECAST_LOG_INFO("stream full, trying %s", next->mount); + source = next; + navigation_history_navigate_to(&(client->history), source->identifier, NAVIGATION_DIRECTION_DOWN); + loop--; + continue; + } + /* now we fail the client */ + client_send_error_by_id(client, ICECAST_ERROR_SOURCE_MAX_LISTENERS); + return; + } while (1); + + client->write_to_client = format_generic_write_to_client; + client->check_buffer = format_check_http_buffer; + client->refbuf->len = PER_CLIENT_REFBUF_SIZE; + memset(client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE); + + /* lets add the client to the active list */ + avl_tree_wlock(source->pending_tree); + avl_insert(source->pending_tree, client); + avl_tree_unlock(source->pending_tree); + + if (source->running == 0 && source->on_demand) { + /* enable on-demand relay to start, wake up the slave thread */ + ICECAST_LOG_DEBUG("kicking off on-demand relay"); + source->on_demand_req = 1; + } + ICECAST_LOG_DEBUG("Added client to %s", source->mount); +} + +/* count the number of clients on a mount with same username and same role as the given one */ +static inline ssize_t __count_user_role_on_mount (source_t *source, client_t *client) { + ssize_t ret = 0; + avl_node *node; + + avl_tree_rlock(source->client_tree); + node = avl_get_first(source->client_tree); + while (node) { + client_t *existing_client = (client_t *)node->key; + if (existing_client->username && client->username && + strcmp(existing_client->username, client->username) == 0 && + existing_client->role && client->role && + strcmp(existing_client->role, client->role) == 0) { + ret++; + } + node = avl_get_next(node); + } + avl_tree_unlock(source->client_tree); + + avl_tree_rlock(source->pending_tree); + node = avl_get_first(source->pending_tree); + while (node) { + client_t *existing_client = (client_t *)node->key; + if (existing_client->username && client->username && + strcmp(existing_client->username, client->username) == 0 && + existing_client->role && client->role && + strcmp(existing_client->role, client->role) == 0){ + ret++; + } + node = avl_get_next(node); + } + avl_tree_unlock(source->pending_tree); + return ret; +} + +static void _handle_get_request(client_t *client) { + source_t *source = NULL; + + ICECAST_LOG_DEBUG("Got client %p with URI %H", client, client->uri); + + /* there are several types of HTTP GET clients + * media clients, which are looking for a source (eg, URI = /stream.ogg), + * stats clients, which are looking for /admin/stats.xml and + * fserve clients, which are looking for static files. + */ + + stats_event_inc(NULL, "client_connections"); + + /* this is a web/ request. let's check if we are allowed to do that. */ + if (acl_test_web(client->acl) != ACL_POLICY_ALLOW) { + /* doesn't seem so, sad client :( */ + auth_reject_client_on_deny(client); + return; + } + + if (client->parser->req_type == httpp_req_options) { + client_send_204(client); + return; + } + + if (util_check_valid_extension(client->uri) == XSLT_CONTENT) { + /* If the file exists, then transform it, otherwise, write a 404 */ + ICECAST_LOG_DEBUG("Stats request, sending XSL transformed stats"); + stats_transform_xslt(client); + return; + } + + avl_tree_rlock(global.source_tree); + /* let's see if this is a source or just a random fserve file */ + source = source_find_mount_with_history(client->uri, &(client->history)); + if (source) { + /* true mount */ + do { + ssize_t max_connections_per_user = acl_get_max_connections_per_user(client->acl); + /* check for duplicate_logins */ + if (max_connections_per_user > 0) { /* -1 = not set (-> default=unlimited), 0 = unlimited */ + if (max_connections_per_user <= __count_user_role_on_mount(source, client)) { + client_send_error_by_id(client, ICECAST_ERROR_CON_PER_CRED_CLIENT_LIMIT); + break; + } + } + + if (!source->allow_direct_access) { + client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNT_NO_FOR_DIRECT_ACCESS); + break; + } + + /* Set max listening duration in case not already set. */ + if (client->con->discon_time == 0) { + time_t connection_duration = acl_get_max_connection_duration(client->acl); + if (connection_duration == -1) { + ice_config_t *config = config_get_config(); + mount_proxy *mount = config_find_mount(config, source->mount, MOUNT_TYPE_NORMAL); + if (mount && mount->max_listener_duration) + connection_duration = mount->max_listener_duration; + config_release_config(); + } + + if (connection_duration > 0) /* -1 = not set (-> default=unlimited), 0 = unlimited */ + client->con->discon_time = connection_duration + time(NULL); + } + + __add_listener_to_source(source, client); + } while (0); + avl_tree_unlock(global.source_tree); + } else { + /* file */ + avl_tree_unlock(global.source_tree); + fserve_client_create(client); + } +} + +static void _handle_delete_request(client_t *client) { + source_t *source; + + avl_tree_wlock(global.source_tree); + source = source_find_mount_raw(client->uri); + if (source) { + source->running = 0; + avl_tree_unlock(global.source_tree); + client_send_204(client); + } else { + avl_tree_unlock(global.source_tree); + client_send_error_by_id(client, ICECAST_ERROR_CON_UNKNOWN_REQUEST); + } +} + +static void _handle_admin_request(client_t *client, char *adminuri) +{ + ICECAST_LOG_DEBUG("Client %p requesting admin interface.", client); + + stats_event_inc(NULL, "client_connections"); + + admin_handle_request(client, adminuri); +} + +/* Handle any client that passed the authing process. + */ +static void _handle_authed_client(client_t *client, void *userdata, auth_result result) +{ + auth_stack_release(client->authstack); + client->authstack = NULL; + + /* Update admin parameters just in case auth changed our URI */ + if (!_update_admin_command(client)) + return; + + fastevent_emit(FASTEVENT_TYPE_CLIENT_AUTHED, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_CLIENT, client); + + if (result != AUTH_OK) { + auth_reject_client_on_fail(client); + return; + } + + if (acl_test_method(client->acl, client->parser->req_type) != ACL_POLICY_ALLOW) { + ICECAST_LOG_ERROR("Client (role=%s, acl=%s, username=%s) not allowed to use this request method on %H", client->role, acl_get_name(client->acl), client->username, client->uri); + auth_reject_client_on_deny(client); + return; + } + + /* Dispatch legacy admin.cgi requests */ + if (strcmp(client->uri, "/admin.cgi") == 0) { + _handle_admin_request(client, client->uri + 1); + return; + } /* Dispatch all admin requests */ + else if (strncmp(client->uri, "/admin/", 7) == 0) { + _handle_admin_request(client, client->uri + 7); + return; + } + + if (client->handler_module && client->handler_function) { + const module_client_handler_t *handler = module_get_client_handler(client->handler_module, client->handler_function); + if (handler) { + handler->cb(client->handler_module, client); + return; + } else { + ICECAST_LOG_ERROR("No such handler function in module: %s", client->handler_function); + } + } + + switch (client->parser->req_type) { + case httpp_req_source: + case httpp_req_put: + _handle_source_request(client); + break; + case httpp_req_stats: + _handle_stats_request(client); + break; + case httpp_req_get: + case httpp_req_post: + case httpp_req_options: + _handle_get_request(client); + break; + case httpp_req_delete: + _handle_delete_request(client); + break; + default: + ICECAST_LOG_ERROR("Wrong request type from client"); + client_send_error_by_id(client, ICECAST_ERROR_CON_UNKNOWN_REQUEST); + break; + } +} + +/* Handle clients that still need to authenticate. + */ + +static void _handle_authentication_global(client_t *client, void *userdata, auth_result result) +{ + ice_config_t *config; + auth_stack_t *authstack; + + auth_stack_release(client->authstack); + client->authstack = NULL; + + if (result != AUTH_NOMATCH && + /* Allow global admins access to all mount points */ + !(result == AUTH_OK && client->admin_command != ADMIN_COMMAND_ERROR && acl_test_admin(client->acl, client->admin_command) == ACL_POLICY_DENY)) { + _handle_authed_client(client, userdata, result); + return; + } + + ICECAST_LOG_DEBUG("Trying global authenticators for client %p.", client); + config = config_get_config(); + authstack = config->authstack; + auth_stack_addref(authstack); + config_release_config(); + auth_stack_add_client(authstack, client, _handle_authed_client, userdata); + auth_stack_release(authstack); +} + +static inline mount_proxy * __find_non_admin_mount(ice_config_t *config, const char *name, mount_type type) +{ + if (strcmp(name, "/admin.cgi") == 0 || strncmp(name, "/admin/", 7) == 0) + return NULL; + + return config_find_mount(config, name, type); +} + +static void _handle_authentication_mount_generic(client_t *client, void *userdata, mount_type type, void (*callback)(client_t*, void*, auth_result)) +{ + ice_config_t *config; + mount_proxy *mountproxy; + auth_stack_t *stack = NULL; + + config = config_get_config(); + mountproxy = __find_non_admin_mount(config, client->uri, type); + if (!mountproxy) { + int command_type = admin_get_command_type(client->admin_command); + if (command_type == ADMINTYPE_MOUNT || command_type == ADMINTYPE_HYBRID) { + const char *mount = httpp_get_param(client->parser, "mount"); + if (mount) + mountproxy = __find_non_admin_mount(config, mount, type); + } + } + if (mountproxy && mountproxy->mounttype == type) + stack = mountproxy->authstack; + auth_stack_addref(stack); + config_release_config(); + + if (stack) { + auth_stack_add_client(stack, client, callback, userdata); + auth_stack_release(stack); + } else { + callback(client, userdata, AUTH_NOMATCH); + } +} + +static void _handle_authentication_mount_default(client_t *client, void *userdata, auth_result result) +{ + auth_stack_release(client->authstack); + client->authstack = NULL; + + if (result != AUTH_NOMATCH && + /* Allow global admins access to all mount points */ + !(result == AUTH_OK && client->admin_command != ADMIN_COMMAND_ERROR && acl_test_admin(client->acl, client->admin_command) == ACL_POLICY_DENY)) { + _handle_authed_client(client, userdata, result); + return; + } + + ICECAST_LOG_DEBUG("Trying specific authenticators for client %p.", client); + _handle_authentication_mount_generic(client, userdata, MOUNT_TYPE_DEFAULT, _handle_authentication_global); +} + +static void _handle_authentication_mount_normal(client_t *client, void *userdata, auth_result result) +{ + auth_stack_release(client->authstack); + client->authstack = NULL; + + if (result != AUTH_NOMATCH) { + _handle_authed_client(client, userdata, result); + return; + } + + ICECAST_LOG_DEBUG("Trying specific authenticators for client %p.", client); + _handle_authentication_mount_generic(client, userdata, MOUNT_TYPE_NORMAL, _handle_authentication_mount_default); +} + +static void _handle_authentication_listen_socket(client_t *client) +{ + auth_stack_t *stack = NULL; + const listener_t *listener; + + listener = listensocket_get_listener(client->con->listensocket_effective); + if (listener) { + if (listener->authstack) { + auth_stack_addref(stack = listener->authstack); + } + listensocket_release_listener(client->con->listensocket_effective); + } + + if (stack) { + auth_stack_add_client(stack, client, _handle_authentication_mount_normal, NULL); + auth_stack_release(stack); + } else { + _handle_authentication_mount_normal(client, NULL, AUTH_NOMATCH); + } +} + +static void _handle_authentication(client_t *client) +{ + fastevent_emit(FASTEVENT_TYPE_CLIENT_READY_FOR_AUTH, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_CLIENT, client); + _handle_authentication_listen_socket(client); +} + +void connection_handle_client(client_t *client) +{ + http_parser_t *parser = client->parser; + const char *rawuri = httpp_getvar(parser, HTTPP_VAR_URI); + const char *upgrade, *connection; + char *uri; + + if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) && + strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) { + ICECAST_LOG_ERROR("Bad HTTP protocol detected"); + client_destroy(client); + return; + } + + upgrade = httpp_getvar(parser, "upgrade"); + connection = httpp_getvar(parser, "connection"); + if (upgrade && connection && strcasecmp(connection, "upgrade") == 0) { + if (client->con->tlsmode == ICECAST_TLSMODE_DISABLED || client->con->tls || strstr(upgrade, "TLS/1.0") == NULL) { + client_send_error_by_id(client, ICECAST_ERROR_CON_UPGRADE_ERROR); + return; + } else { + client_send_101(client, ICECAST_REUSE_UPGRADETLS); + return; + } + } else if (client->con->tlsmode != ICECAST_TLSMODE_DISABLED && client->con->tlsmode != ICECAST_TLSMODE_AUTO && !client->con->tls) { + client_send_426(client, ICECAST_REUSE_UPGRADETLS); + return; + } + + if (parser->req_type == httpp_req_options && strcmp(rawuri, "*") == 0) { + client->uri = strdup("*"); + client_send_204(client); + return; + } + + uri = util_normalise_uri(rawuri); + + if (!uri) { + client_destroy(client); + return; + } + + client->mode = config_str_to_omode(NULL, NULL, httpp_get_param(client->parser, "omode")); + + if (!_handle_resources(client, &uri)) { + client_destroy(client); + return; + } + + client->uri = uri; + + if (!_update_admin_command(client)) + return; + + _handle_authentication(client); +} diff --git a/src/connection_handle.h b/src/connection_handle.h new file mode 100644 index 00000000..16110d4f --- /dev/null +++ b/src/connection_handle.h @@ -0,0 +1,16 @@ +/* Icecast + * + * This program is distributed under the GNU General Public License, version 2. + * A copy of this license is included with this source. + * + * Copyright 2022-2022, Philipp "ph3-der-loewe" Schafft , + */ + +#ifndef __CONNECTION_HANDLE_H__ +#define __CONNECTION_HANDLE_H__ + +#include "icecasttypes.h" + +void connection_handle_client(client_t *client); + +#endif /* __CONNECTION_HANDLE_H__ */ From e428016746c21466a4a691d520c127a41d192ea0 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 19 Mar 2022 13:05:06 +0000 Subject: [PATCH 02/14] Fix: Changed %s -> %#H for client->uri --- src/connection_handle.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connection_handle.c b/src/connection_handle.c index ce5e1be2..3f87ee3e 100644 --- a/src/connection_handle.c +++ b/src/connection_handle.c @@ -274,7 +274,7 @@ static inline void source_startup(client_t *client) } } else { client_send_error_by_id(client, ICECAST_ERROR_CON_MOUNT_IN_USE); - ICECAST_LOG_WARN("Mountpoint %s in use", client->uri); + ICECAST_LOG_WARN("Mountpoint %#H in use", client->uri); } } From b10e302c2a4dfa201ff10348106e7fe2baf678a4 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 19 Mar 2022 17:38:12 +0000 Subject: [PATCH 03/14] Update: Renamed client_queue_t -> client_queue_entry_t --- src/connection.c | 62 ++++++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/connection.c b/src/connection.c index fc66e6d1..d5641c0c 100644 --- a/src/connection.c +++ b/src/connection.c @@ -91,15 +91,15 @@ typedef struct client_queue_tag { size_t bodybufferlen; int tried_body; struct client_queue_tag *next; -} client_queue_t; +} client_queue_entry_t; static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail static volatile connection_id_t _current_id = 0; static int _initialized = 0; -static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; -static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; -static volatile client_queue_t *_body_queue = NULL, **_body_queue_tail = &_body_queue; +static volatile client_queue_entry_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; +static volatile client_queue_entry_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; +static volatile client_queue_entry_t *_body_queue = NULL, **_body_queue_tail = &_body_queue; static bool tls_ok = false; static tls_ctx_t *tls_ctx; @@ -395,11 +395,11 @@ int connection_read_put_back(connection_t *con, const void *buf, size_t len) * has been collected, so we now pass it onto the connection thread for * further processing */ -static void _add_connection(client_queue_t *node) +static void _add_connection(client_queue_entry_t *node) { thread_spin_lock(&_connection_lock); *_con_queue_tail = node; - _con_queue_tail = (volatile client_queue_t **) &node->next; + _con_queue_tail = (volatile client_queue_entry_t **) &node->next; thread_spin_unlock(&_connection_lock); } @@ -407,14 +407,14 @@ static void _add_connection(client_queue_t *node) /* this returns queued clients for the connection thread. headers are * already provided, but need to be parsed. */ -static client_queue_t *_get_connection(void) +static client_queue_entry_t *_get_connection(void) { - client_queue_t *node = NULL; + client_queue_entry_t *node = NULL; thread_spin_lock(&_connection_lock); if (_con_queue){ - node = (client_queue_t *)_con_queue; + node = (client_queue_entry_t *)_con_queue; _con_queue = node->next; if (_con_queue == NULL) _con_queue_tail = &_con_queue; @@ -429,7 +429,7 @@ static client_queue_t *_get_connection(void) /* run along queue checking for any data that has come in or a timeout */ static void process_request_queue (void) { - client_queue_t **node_ref = (client_queue_t **)&_req_queue; + client_queue_entry_t **node_ref = (client_queue_entry_t **)&_req_queue; ice_config_t *config; int timeout; char peak; @@ -439,7 +439,7 @@ static void process_request_queue (void) config_release_config(); while (*node_ref) { - client_queue_t *node = *node_ref; + client_queue_entry_t *node = *node_ref; client_t *client = node->client; int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset; char *buf = client->refbuf->data + node->offset; @@ -512,8 +512,8 @@ static void process_request_queue (void) connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset); node->offset = stream_offset; } - if ((client_queue_t **)_req_queue_tail == &(node->next)) - _req_queue_tail = (volatile client_queue_t **)node_ref; + if ((client_queue_entry_t **)_req_queue_tail == &(node->next)) + _req_queue_tail = (volatile client_queue_entry_t **)node_ref; *node_ref = node->next; node->next = NULL; _add_connection(node); @@ -521,8 +521,8 @@ static void process_request_queue (void) } } else { if (len == 0 || client->con->error) { - if ((client_queue_t **)_req_queue_tail == &node->next) - _req_queue_tail = (volatile client_queue_t **)node_ref; + if ((client_queue_entry_t **)_req_queue_tail == &node->next) + _req_queue_tail = (volatile client_queue_entry_t **)node_ref; *node_ref = node->next; client_destroy(client); free(node); @@ -536,17 +536,17 @@ static void process_request_queue (void) /* add client to body queue. */ -static void _add_body_client(client_queue_t *node) +static void _add_body_client(client_queue_entry_t *node) { ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client); thread_spin_lock(&_connection_lock); *_body_queue_tail = node; - _body_queue_tail = (volatile client_queue_t **) &node->next; + _body_queue_tail = (volatile client_queue_entry_t **) &node->next; thread_spin_unlock(&_connection_lock); } -static client_slurp_result_t process_request_body_queue_one(client_queue_t *node, time_t timeout, size_t body_size_limit) +static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t *node, time_t timeout, size_t body_size_limit) { client_t *client = node->client; client_slurp_result_t res; @@ -586,7 +586,7 @@ static client_slurp_result_t process_request_body_queue_one(client_queue_t *node /* This queue reads data from the body of clients. */ static void process_request_body_queue (void) { - client_queue_t **node_ref = (client_queue_t **)&_body_queue; + client_queue_entry_t **node_ref = (client_queue_entry_t **)&_body_queue; ice_config_t *config; time_t timeout; size_t body_size_limit; @@ -601,7 +601,7 @@ static void process_request_body_queue (void) config_release_config(); while (*node_ref) { - client_queue_t *node = *node_ref; + client_queue_entry_t *node = *node_ref; client_t *client = node->client; client_slurp_result_t res; @@ -614,8 +614,8 @@ static void process_request_body_queue (void) if (res != CLIENT_SLURP_NEEDS_MORE_DATA) { ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); - if ((client_queue_t **)_body_queue_tail == &(node->next)) - _body_queue_tail = (volatile client_queue_t **)node_ref; + if ((client_queue_entry_t **)_body_queue_tail == &(node->next)) + _body_queue_tail = (volatile client_queue_entry_t **)node_ref; *node_ref = node->next; node->next = NULL; _add_connection(node); @@ -628,15 +628,15 @@ static void process_request_body_queue (void) /* add node to the queue of requests. This is where the clients are when * initial http details are read. */ -static void _add_request_queue(client_queue_t *node) +static void _add_request_queue(client_queue_entry_t *node) { *_req_queue_tail = node; - _req_queue_tail = (volatile client_queue_t **)&node->next; + _req_queue_tail = (volatile client_queue_entry_t **)&node->next; } -static client_queue_t *create_client_node(client_t *client) +static client_queue_entry_t *create_client_node(client_t *client) { - client_queue_t *node = calloc (1, sizeof (client_queue_t)); + client_queue_entry_t *node = calloc (1, sizeof (client_queue_entry_t)); const listener_t *listener; if (!node) @@ -663,7 +663,7 @@ static client_queue_t *create_client_node(client_t *client) void connection_queue(connection_t *con) { - client_queue_t *node; + client_queue_entry_t *node; client_t *client = NULL; global_lock(); @@ -815,7 +815,7 @@ int connection_complete_source(source_t *source, int response) return -1; } -static void _handle_shoutcast_compatible(client_queue_t *node) +static void _handle_shoutcast_compatible(client_queue_entry_t *node) { char *http_compliant; int http_compliant_len = 0; @@ -902,7 +902,7 @@ static void _handle_shoutcast_compatible(client_queue_t *node) } /* Check if we need body of client */ -static int _need_body(client_queue_t *node) +static int _need_body(client_queue_entry_t *node) { client_t *client = node->client; @@ -934,7 +934,7 @@ static void _handle_connection(void) { http_parser_t *parser; const char *rawuri; - client_queue_t *node; + client_queue_entry_t *node; while (1) { node = _get_connection(); @@ -1079,6 +1079,6 @@ void connection_close(connection_t *con) void connection_queue_client(client_t *client) { - client_queue_t *node = create_client_node(client); + client_queue_entry_t *node = create_client_node(client); _add_connection(node); } From c34f715f0d5cd5b6a5b4eb37ffaefcd556fe408a Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 19 Mar 2022 19:05:53 +0000 Subject: [PATCH 04/14] Update: Reworked connection queues --- src/connection.c | 344 +++++++++++++++++++++++------------------------ 1 file changed, 172 insertions(+), 172 deletions(-) diff --git a/src/connection.c b/src/connection.c index d5641c0c..27c2021c 100644 --- a/src/connection.c +++ b/src/connection.c @@ -93,13 +93,19 @@ typedef struct client_queue_tag { struct client_queue_tag *next; } client_queue_entry_t; -static spin_t _connection_lock; // protects _current_id, _con_queue, _con_queue_tail +typedef struct { + client_queue_entry_t *head; + client_queue_entry_t **tail; + mutex_t mutex; +} client_queue_t; + +static spin_t _connection_lock; // protects _current_id static volatile connection_id_t _current_id = 0; static int _initialized = 0; -static volatile client_queue_entry_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; -static volatile client_queue_entry_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; -static volatile client_queue_entry_t *_body_queue = NULL, **_body_queue_tail = &_body_queue; +static client_queue_t _request_queue; +static client_queue_t _connection_queue; +static client_queue_t _body_queue; static bool tls_ok = false; static tls_ctx_t *tls_ctx; @@ -111,6 +117,54 @@ rwlock_t _source_shutdown_rwlock; static void _handle_connection(void); static void get_tls_certificate(ice_config_t *config); +static void client_queue_init(client_queue_t *queue) +{ + memset(queue, 0, sizeof(*queue)); + queue->tail = &(queue->head); + thread_mutex_create(&(queue->mutex)); +} + +static void client_queue_destroy(client_queue_t *queue) +{ + thread_mutex_destroy(&(queue->mutex)); +} + +static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry) +{ + thread_mutex_lock(&(queue->mutex)); + *(queue->tail) = entry; + queue->tail = &(entry->next); + thread_mutex_unlock(&(queue->mutex)); +} + +static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_queue_entry_t *stop) +{ + client_queue_entry_t *ret; + + thread_mutex_lock(&(queue->mutex)); + ret = queue->head; + if (ret) { + if (ret == stop) { + ret = NULL; + } else { + queue->head = ret->next; + if (!queue->head) { + queue->tail = &(queue->head); + } + ret->next = NULL; + } + } + thread_mutex_unlock(&(queue->mutex)); + + return ret; +} + +static bool client_queue_empty(client_queue_t *queue) +{ + /* No need to lock here as this is a point-in-time thing anyway */ + return queue->head == NULL ? true : false; +} + void connection_initialize(void) { if (_initialized) @@ -119,12 +173,9 @@ void connection_initialize(void) thread_spin_create (&_connection_lock); thread_mutex_create(&move_clients_mutex); thread_rwlock_create(&_source_shutdown_rwlock); - _req_queue = NULL; - _req_queue_tail = &_req_queue; - _con_queue = NULL; - _con_queue_tail = &_con_queue; - _body_queue = NULL; - _body_queue_tail = &_body_queue; + client_queue_init(&_request_queue); + client_queue_init(&_connection_queue); + client_queue_init(&_body_queue); _initialized = 1; } @@ -141,6 +192,9 @@ void connection_shutdown(void) thread_rwlock_destroy(&_source_shutdown_rwlock); thread_spin_destroy (&_connection_lock); thread_mutex_destroy(&move_clients_mutex); + client_queue_destroy(&_request_queue); + client_queue_destroy(&_connection_queue); + client_queue_destroy(&_body_queue); _initialized = 0; } @@ -391,161 +445,118 @@ int connection_read_put_back(connection_t *con, const void *buf, size_t len) } } -/* add client to connection queue. At this point some header information - * has been collected, so we now pass it onto the connection thread for - * further processing - */ -static void _add_connection(client_queue_entry_t *node) +/* run along queue checking for any data that has come in or a timeout */ +static bool process_request_queue_one (client_queue_entry_t *node, int timeout) { - thread_spin_lock(&_connection_lock); - *_con_queue_tail = node; - _con_queue_tail = (volatile client_queue_entry_t **) &node->next; - thread_spin_unlock(&_connection_lock); -} + client_t *client = node->client; + int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset; + char *buf = client->refbuf->data + node->offset; + ICECAST_LOG_DDEBUG("Checking on client %p", client); -/* this returns queued clients for the connection thread. headers are - * already provided, but need to be parsed. - */ -static client_queue_entry_t *_get_connection(void) -{ - client_queue_entry_t *node = NULL; - - thread_spin_lock(&_connection_lock); - - if (_con_queue){ - node = (client_queue_entry_t *)_con_queue; - _con_queue = node->next; - if (_con_queue == NULL) - _con_queue_tail = &_con_queue; - node->next = NULL; + if (client->con->tlsmode == ICECAST_TLSMODE_AUTO || client->con->tlsmode == ICECAST_TLSMODE_AUTO_NO_PLAIN) { + char peak; + if (recv(client->con->sock, &peak, 1, MSG_PEEK) == 1) { + if (peak == 0x16) { /* TLS Record Protocol Content type 0x16 == Handshake */ + connection_uses_tls(client->con); + } + } } - thread_spin_unlock(&_connection_lock); - return node; + if (len > 0) { + if (client->con->con_time + timeout <= time(NULL)) { + len = 0; + } else { + len = client_read_bytes(client, buf, len); + } + } + + if (len > 0 || node->shoutcast > 1) { + ssize_t stream_offset = -1; + int pass_it = 1; + char *ptr; + + if (len < 0 && node->shoutcast > 1) + len = 0; + + /* handle \n, \r\n and nsvcap which for some strange reason has + * EOL as \r\r\n */ + node->offset += len; + client->refbuf->data[node->offset] = '\000'; + do { + if (node->shoutcast == 1) { + /* password line */ + if (strstr (client->refbuf->data, "\r\r\n") != NULL) + break; + if (strstr (client->refbuf->data, "\r\n") != NULL) + break; + if (strstr (client->refbuf->data, "\n") != NULL) + break; + } + /* stream_offset refers to the start of any data sent after the + * http style headers, we don't want to lose those */ + ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n"); + if (ptr) { + stream_offset = (ptr+6) - client->refbuf->data; + break; + } + ptr = strstr(client->refbuf->data, "\r\n\r\n"); + if (ptr) { + stream_offset = (ptr+4) - client->refbuf->data; + break; + } + ptr = strstr(client->refbuf->data, "\n\n"); + if (ptr) { + stream_offset = (ptr+2) - client->refbuf->data; + break; + } + pass_it = 0; + } while (0); + + ICECAST_LOG_DDEBUG("pass_it=%i, len=%i", pass_it, (int)len); + ICECAST_LOG_DDEBUG("Client %p has buffer: %H", client, client->refbuf->data); + + if (pass_it) { + if (stream_offset != -1) { + connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset); + node->offset = stream_offset; + } + client_queue_add(&_connection_queue, node); + return true; + } + } else { + if (len == 0 || client->con->error) { + client_destroy(client); + free(node); + return true; + } + } + + return false; } - -/* run along queue checking for any data that has come in or a timeout */ static void process_request_queue (void) { - client_queue_entry_t **node_ref = (client_queue_entry_t **)&_req_queue; + client_queue_entry_t *stop = NULL; + client_queue_entry_t *node; ice_config_t *config; int timeout; - char peak; config = config_get_config(); timeout = config->header_timeout; config_release_config(); - while (*node_ref) { - client_queue_entry_t *node = *node_ref; - client_t *client = node->client; - int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset; - char *buf = client->refbuf->data + node->offset; - - ICECAST_LOG_DDEBUG("Checking on client %p", client); - - if (client->con->tlsmode == ICECAST_TLSMODE_AUTO || client->con->tlsmode == ICECAST_TLSMODE_AUTO_NO_PLAIN) { - if (recv(client->con->sock, &peak, 1, MSG_PEEK) == 1) { - if (peak == 0x16) { /* TLS Record Protocol Content type 0x16 == Handshake */ - connection_uses_tls(client->con); - } - } + while ((node = client_queue_shift(&_request_queue, stop))) { + if (!process_request_queue_one(node, timeout)) { + client_queue_add(&_request_queue, node); + if (!stop) + stop = node; } - - if (len > 0) { - if (client->con->con_time + timeout <= time(NULL)) { - len = 0; - } else { - len = client_read_bytes(client, buf, len); - } - } - - if (len > 0 || node->shoutcast > 1) { - ssize_t stream_offset = -1; - int pass_it = 1; - char *ptr; - - if (len < 0 && node->shoutcast > 1) - len = 0; - - /* handle \n, \r\n and nsvcap which for some strange reason has - * EOL as \r\r\n */ - node->offset += len; - client->refbuf->data[node->offset] = '\000'; - do { - if (node->shoutcast == 1) { - /* password line */ - if (strstr (client->refbuf->data, "\r\r\n") != NULL) - break; - if (strstr (client->refbuf->data, "\r\n") != NULL) - break; - if (strstr (client->refbuf->data, "\n") != NULL) - break; - } - /* stream_offset refers to the start of any data sent after the - * http style headers, we don't want to lose those */ - ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n"); - if (ptr) { - stream_offset = (ptr+6) - client->refbuf->data; - break; - } - ptr = strstr(client->refbuf->data, "\r\n\r\n"); - if (ptr) { - stream_offset = (ptr+4) - client->refbuf->data; - break; - } - ptr = strstr(client->refbuf->data, "\n\n"); - if (ptr) { - stream_offset = (ptr+2) - client->refbuf->data; - break; - } - pass_it = 0; - } while (0); - - ICECAST_LOG_DDEBUG("pass_it=%i, len=%i", pass_it, (int)len); - ICECAST_LOG_DDEBUG("Client %p has buffer: %H", client, client->refbuf->data); - - if (pass_it) { - if (stream_offset != -1) { - connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset); - node->offset = stream_offset; - } - if ((client_queue_entry_t **)_req_queue_tail == &(node->next)) - _req_queue_tail = (volatile client_queue_entry_t **)node_ref; - *node_ref = node->next; - node->next = NULL; - _add_connection(node); - continue; - } - } else { - if (len == 0 || client->con->error) { - if ((client_queue_entry_t **)_req_queue_tail == &node->next) - _req_queue_tail = (volatile client_queue_entry_t **)node_ref; - *node_ref = node->next; - client_destroy(client); - free(node); - continue; - } - } - node_ref = &node->next; } + _handle_connection(); } -/* add client to body queue. - */ -static void _add_body_client(client_queue_entry_t *node) -{ - ICECAST_LOG_DEBUG("Putting client %p in body queue.", node->client); - - thread_spin_lock(&_connection_lock); - *_body_queue_tail = node; - _body_queue_tail = (volatile client_queue_entry_t **) &node->next; - thread_spin_unlock(&_connection_lock); -} - static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t *node, time_t timeout, size_t body_size_limit) { client_t *client = node->client; @@ -586,22 +597,20 @@ static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t /* This queue reads data from the body of clients. */ static void process_request_body_queue (void) { - client_queue_entry_t **node_ref = (client_queue_entry_t **)&_body_queue; + client_queue_entry_t *stop = NULL; + client_queue_entry_t *node; ice_config_t *config; time_t timeout; size_t body_size_limit; ICECAST_LOG_DDEBUG("Processing body queue."); - ICECAST_LOG_DDEBUG("_body_queue=%p, &_body_queue=%p, _body_queue_tail=%p", _body_queue, &_body_queue, _body_queue_tail); - config = config_get_config(); timeout = time(NULL) - config->body_timeout; body_size_limit = config->body_size_limit; config_release_config(); - while (*node_ref) { - client_queue_entry_t *node = *node_ref; + while ((node = client_queue_shift(&_body_queue, stop))) { client_t *client = node->client; client_slurp_result_t res; @@ -611,29 +620,19 @@ static void process_request_body_queue (void) res = process_request_body_queue_one(node, timeout, body_size_limit); - if (res != CLIENT_SLURP_NEEDS_MORE_DATA) { + if (res == CLIENT_SLURP_NEEDS_MORE_DATA) { + client_queue_add(&_body_queue, node); + if (!stop) + stop = node; + } else { ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); - if ((client_queue_entry_t **)_body_queue_tail == &(node->next)) - _body_queue_tail = (volatile client_queue_entry_t **)node_ref; - *node_ref = node->next; - node->next = NULL; - _add_connection(node); + client_queue_add(&_connection_queue, node); continue; } - node_ref = &node->next; } } -/* add node to the queue of requests. This is where the clients are when - * initial http details are read. - */ -static void _add_request_queue(client_queue_entry_t *node) -{ - *_req_queue_tail = node; - _req_queue_tail = (volatile client_queue_entry_t **)&node->next; -} - static client_queue_entry_t *create_client_node(client_t *client) { client_queue_entry_t *node = calloc (1, sizeof (client_queue_entry_t)); @@ -692,7 +691,7 @@ void connection_queue(connection_t *con) return; } - _add_request_queue(node); + client_queue_add(&_request_queue, node); stats_event_inc(NULL, "connections"); } @@ -713,7 +712,7 @@ void connection_accept_loop(void) connection_queue(con); duration = 5; } else { - if (_req_queue == NULL) + if (client_queue_empty(&_request_queue) && client_queue_empty(&_body_queue)) duration = 300; /* use longer timeouts when nothing waiting */ } process_request_queue(); @@ -863,7 +862,7 @@ static void _handle_shoutcast_compatible(client_queue_entry_t *node) memmove(client->refbuf->data, headers, node->offset+1); node->shoutcast = 2; /* we've checked the password, now send it back for reading headers */ - _add_request_queue(node); + client_queue_add(&_request_queue, node); ICECAST_LOG_DDEBUG("Client %p re-added to request queue", client); return; } @@ -937,7 +936,7 @@ static void _handle_connection(void) client_queue_entry_t *node; while (1) { - node = _get_connection(); + node = client_queue_shift(&_connection_queue, NULL); if (node) { client_t *client = node->client; int already_parsed = 0; @@ -964,7 +963,7 @@ static void _handle_connection(void) /* early check if we need more data */ client_complete(client); if (_need_body(node)) { - /* Just calling _add_body_client() would do the job. + /* Just calling client_queue_add(&_body_queue, node) would do the job. * However, if the client only has a small body this might work without moving it between queues. * -> much faster. */ @@ -980,7 +979,8 @@ static void _handle_connection(void) res = process_request_body_queue_one(node, timeout, body_size_limit); if (res != CLIENT_SLURP_SUCCESS) { - _add_body_client(node); + ICECAST_LOG_DEBUG("Putting client %p in body queue.", client); + client_queue_add(&_body_queue, node); continue; } else { ICECAST_LOG_DEBUG("Success on fast lane"); @@ -1080,5 +1080,5 @@ void connection_close(connection_t *con) void connection_queue_client(client_t *client) { client_queue_entry_t *node = create_client_node(client); - _add_connection(node); + client_queue_add(&_connection_queue, node); } From 2e0bb32535fa771a16642b3f30a639237567a0e1 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 19 Mar 2022 19:15:34 +0000 Subject: [PATCH 05/14] Update: Unified freeing of client_queue_entry_t nodes --- src/connection.c | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/connection.c b/src/connection.c index 27c2021c..30409fec 100644 --- a/src/connection.c +++ b/src/connection.c @@ -116,6 +116,7 @@ rwlock_t _source_shutdown_rwlock; static void _handle_connection(void); static void get_tls_certificate(ice_config_t *config); +static void free_client_node(client_queue_entry_t *node); static void client_queue_init(client_queue_t *queue) { @@ -527,7 +528,7 @@ static bool process_request_queue_one (client_queue_entry_t *node, int timeout) } else { if (len == 0 || client->con->error) { client_destroy(client); - free(node); + free_client_node(node); return true; } } @@ -660,6 +661,13 @@ static client_queue_entry_t *create_client_node(client_t *client) return node; } +static void free_client_node(client_queue_entry_t *node) +{ + free(node->shoutcast_mount); + free(node->bodybuffer); + free(node); +} + void connection_queue(connection_t *con) { client_queue_entry_t *node; @@ -848,8 +856,7 @@ static void _handle_shoutcast_compatible(client_queue_entry_t *node) if (ptr == NULL){ client_destroy(client); - free(node->shoutcast_mount); - free(node); + free_client_node(node); return; } *ptr = '\0'; @@ -895,8 +902,7 @@ static void _handle_shoutcast_compatible(client_queue_entry_t *node) client_destroy(client); } free(http_compliant); - free(node->shoutcast_mount); - free(node); + free_client_node(node); return; } @@ -993,9 +999,7 @@ static void _handle_connection(void) if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); - free (node->bodybuffer); - free (node->shoutcast_mount); - free (node); + free_client_node(node); connection_handle_client(client); } else { From 4cc76466ac8688960fd4d65f32ff3f56fbd06e5f Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 19 Mar 2022 21:50:42 +0000 Subject: [PATCH 06/14] Update: Check if clients are ready before interacting with them --- src/connection.c | 239 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 171 insertions(+), 68 deletions(-) diff --git a/src/connection.c b/src/connection.c index 30409fec..9f70dadb 100644 --- a/src/connection.c +++ b/src/connection.c @@ -90,6 +90,7 @@ typedef struct client_queue_tag { char *bodybuffer; size_t bodybufferlen; int tried_body; + bool ready; struct client_queue_tag *next; } client_queue_entry_t; @@ -97,6 +98,10 @@ typedef struct { client_queue_entry_t *head; client_queue_entry_t **tail; mutex_t mutex; +#ifdef HAVE_POLL + struct pollfd *pollfds; + size_t pollfds_len; +#endif } client_queue_t; static spin_t _connection_lock; // protects _current_id @@ -128,6 +133,9 @@ static void client_queue_init(client_queue_t *queue) static void client_queue_destroy(client_queue_t *queue) { thread_mutex_destroy(&(queue->mutex)); +#ifdef HAVE_POLL + free(queue->pollfds); +#endif } static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry) @@ -160,6 +168,104 @@ static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_q return ret; } +static bool client_queue_check_ready(client_queue_t *queue, int timeout) +{ + if (!queue->head) + return false; + +#ifdef HAVE_POLL + if (true) { + size_t count = 0; + size_t i; + client_queue_entry_t *cur; + + thread_mutex_lock(&(queue->mutex)); + for (cur = queue->head; cur; cur = cur->next) { + count++; + cur->ready = false; + } + + if (queue->pollfds_len < count) { + free(queue->pollfds); + queue->pollfds = calloc(count, sizeof(*queue->pollfds)); + if (queue->pollfds) { + queue->pollfds_len = count; + } else { + ICECAST_LOG_ERROR("Allocation of queue->pollfds failed. BAD."); + queue->pollfds_len = 0; + thread_mutex_unlock(&(queue->mutex)); + return false; + } + } else { + memset(queue->pollfds, 0, sizeof(*queue->pollfds)*count); + } + + for (cur = queue->head, i = 0; cur && i < count; cur = cur->next, i++) { + queue->pollfds[i].fd = cur->client->con->sock; + queue->pollfds[i].events = POLLIN; + } + thread_mutex_unlock(&(queue->mutex)); + + if (poll(queue->pollfds, count, timeout) < 1) + return false; + + thread_mutex_lock(&(queue->mutex)); + for (cur = queue->head; cur; cur = cur->next) { + for (i = 0; i < count; i++) { + if (queue->pollfds[i].fd == cur->client->con->sock) { + if (queue->pollfds[i].revents) { + cur->ready = true; + } + } + } + } + thread_mutex_unlock(&(queue->mutex)); + } +#endif + + return true; +} + +static client_queue_entry_t * client_queue_shift_ready(client_queue_t *queue, client_queue_entry_t *stop) +{ +#ifdef HAVE_POLL + client_queue_entry_t *cur; + client_queue_entry_t *last = NULL; + + if (!queue->head) + return NULL; + + thread_mutex_lock(&(queue->mutex)); + for (cur = queue->head; cur && cur != stop; cur = cur->next) { + if (cur->ready) { + // use this one. + if (last == NULL) { + /* we are the head */ + queue->head = cur->next; + if (!queue->head) { + queue->tail = &(queue->head); + } + } else { + last->next = cur->next; + if (queue->tail == &(cur->next)) { + queue->tail = &(last->next); + } + } + + cur->next = NULL; + thread_mutex_unlock(&(queue->mutex)); + return cur; + } + last = cur; + } + thread_mutex_unlock(&(queue->mutex)); + return NULL; +#else + /* just return any */ + return client_queue_shift(queue, stop); +#endif +} + static bool client_queue_empty(client_queue_t *queue) { /* No need to lock here as this is a point-in-time thing anyway */ @@ -547,7 +653,9 @@ static void process_request_queue (void) timeout = config->header_timeout; config_release_config(); - while ((node = client_queue_shift(&_request_queue, stop))) { + client_queue_check_ready(&_request_queue, 100); + + while ((node = client_queue_shift_ready(&_request_queue, stop))) { if (!process_request_queue_one(node, timeout)) { client_queue_add(&_request_queue, node); if (!stop) @@ -941,75 +1049,70 @@ static void _handle_connection(void) const char *rawuri; client_queue_entry_t *node; - while (1) { - node = client_queue_shift(&_connection_queue, NULL); - if (node) { - client_t *client = node->client; - int already_parsed = 0; + while ((node = client_queue_shift(&_connection_queue, NULL))) { + client_t *client = node->client; + int already_parsed = 0; - /* Check for special shoutcast compatability processing */ - if (node->shoutcast) { - _handle_shoutcast_compatible (node); - if (node->shoutcast) - continue; - } - - /* process normal HTTP headers */ - if (client->parser) { - already_parsed = 1; - parser = client->parser; - } else { - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); - client->parser = parser; - } - if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) { - client->refbuf->len = 0; - - /* early check if we need more data */ - client_complete(client); - if (_need_body(node)) { - /* Just calling client_queue_add(&_body_queue, node) would do the job. - * However, if the client only has a small body this might work without moving it between queues. - * -> much faster. - */ - client_slurp_result_t res; - ice_config_t *config; - time_t timeout; - size_t body_size_limit; - - config = config_get_config(); - timeout = time(NULL) - config->body_timeout; - body_size_limit = config->body_size_limit; - config_release_config(); - - res = process_request_body_queue_one(node, timeout, body_size_limit); - if (res != CLIENT_SLURP_SUCCESS) { - ICECAST_LOG_DEBUG("Putting client %p in body queue.", client); - client_queue_add(&_body_queue, node); - continue; - } else { - ICECAST_LOG_DEBUG("Success on fast lane"); - } - } - - rawuri = httpp_getvar(parser, HTTPP_VAR_URI); - - /* assign a port-based shoutcast mountpoint if required */ - if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) - httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); - - free_client_node(node); - - connection_handle_client(client); - } else { - free (node); - ICECAST_LOG_ERROR("HTTP request parsing failed"); - client_destroy (client); - } - continue; + /* Check for special shoutcast compatability processing */ + if (node->shoutcast) { + _handle_shoutcast_compatible (node); + if (node->shoutcast) + continue; + } + + /* process normal HTTP headers */ + if (client->parser) { + already_parsed = 1; + parser = client->parser; + } else { + parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + client->parser = parser; + } + if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) { + client->refbuf->len = 0; + + /* early check if we need more data */ + client_complete(client); + if (_need_body(node)) { + /* Just calling client_queue_add(&_body_queue, node) would do the job. + * However, if the client only has a small body this might work without moving it between queues. + * -> much faster. + */ + client_slurp_result_t res; + ice_config_t *config; + time_t timeout; + size_t body_size_limit; + + config = config_get_config(); + timeout = time(NULL) - config->body_timeout; + body_size_limit = config->body_size_limit; + config_release_config(); + + res = process_request_body_queue_one(node, timeout, body_size_limit); + if (res != CLIENT_SLURP_SUCCESS) { + ICECAST_LOG_DEBUG("Putting client %p in body queue.", client); + client_queue_add(&_body_queue, node); + continue; + } else { + ICECAST_LOG_DEBUG("Success on fast lane"); + } + } + + rawuri = httpp_getvar(parser, HTTPP_VAR_URI); + + /* assign a port-based shoutcast mountpoint if required */ + if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) + httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); + + free_client_node(node); + + connection_handle_client(client); + } else { + free_client_node(node); + ICECAST_LOG_ERROR("HTTP request parsing failed"); + client_destroy (client); } - break; } } From 0645d82bd1c0353207c962de424eaeb6da932f8a Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sun, 20 Mar 2022 11:10:17 +0000 Subject: [PATCH 07/14] Update: Use own thread for request and body queues --- src/connection.c | 161 +++++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 60 deletions(-) diff --git a/src/connection.c b/src/connection.c index 9f70dadb..69b3a576 100644 --- a/src/connection.c +++ b/src/connection.c @@ -98,12 +98,17 @@ typedef struct { client_queue_entry_t *head; client_queue_entry_t **tail; mutex_t mutex; + cond_t cond; + thread_type *thread; + bool running; #ifdef HAVE_POLL struct pollfd *pollfds; size_t pollfds_len; #endif } client_queue_t; +#define QUEUE_READY_TIMEOUT 800 + static spin_t _connection_lock; // protects _current_id static volatile connection_id_t _current_id = 0; static int _initialized = 0; @@ -122,28 +127,56 @@ rwlock_t _source_shutdown_rwlock; static void _handle_connection(void); static void get_tls_certificate(ice_config_t *config); static void free_client_node(client_queue_entry_t *node); +static void * process_request_queue (client_queue_t *queue); +static void * process_request_body_queue (client_queue_t *queue); static void client_queue_init(client_queue_t *queue) { memset(queue, 0, sizeof(*queue)); queue->tail = &(queue->head); thread_mutex_create(&(queue->mutex)); + thread_cond_create(&(queue->cond)); } static void client_queue_destroy(client_queue_t *queue) { + if (queue->thread) { + queue->running = false; + thread_cond_broadcast(&(queue->cond)); + thread_join(queue->thread); + } + thread_cond_destroy(&(queue->cond)); thread_mutex_destroy(&(queue->mutex)); #ifdef HAVE_POLL free(queue->pollfds); #endif } +static void client_queue_start_thread(client_queue_t *queue, void *(*func)(client_queue_t *)) +{ + if (queue->thread) + return; + queue->running = true; + queue->thread = thread_create("queue thread", (void*(*)(void*))func, queue, THREAD_ATTACHED); +} + +static inline bool client_queue_running(client_queue_t *queue) +{ + return queue->running; +} + static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry) { thread_mutex_lock(&(queue->mutex)); *(queue->tail) = entry; queue->tail = &(entry->next); thread_mutex_unlock(&(queue->mutex)); + thread_cond_broadcast(&(queue->cond)); +} + +static void client_queue_wait(client_queue_t *queue) +{ + thread_cond_wait(&(queue->cond)); } static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_queue_entry_t *stop) @@ -168,13 +201,14 @@ static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_q return ret; } -static bool client_queue_check_ready(client_queue_t *queue, int timeout) +static bool client_queue_check_ready(client_queue_t *queue, int timeout, time_t connection_timeout) { if (!queue->head) return false; #ifdef HAVE_POLL if (true) { + bool had_timeout = false; size_t count = 0; size_t i; client_queue_entry_t *cur; @@ -182,7 +216,12 @@ static bool client_queue_check_ready(client_queue_t *queue, int timeout) thread_mutex_lock(&(queue->mutex)); for (cur = queue->head; cur; cur = cur->next) { count++; - cur->ready = false; + if (cur->client->con->con_time <= connection_timeout) { + cur->ready = true; + had_timeout = true; + } else { + cur->ready = false; + } } if (queue->pollfds_len < count) { @@ -206,6 +245,9 @@ static bool client_queue_check_ready(client_queue_t *queue, int timeout) } thread_mutex_unlock(&(queue->mutex)); + if (had_timeout) + return true; + if (poll(queue->pollfds, count, timeout) < 1) return false; @@ -266,12 +308,6 @@ static client_queue_entry_t * client_queue_shift_ready(client_queue_t *queue, cl #endif } -static bool client_queue_empty(client_queue_t *queue) -{ - /* No need to lock here as this is a point-in-time thing anyway */ - return queue->head == NULL ? true : false; -} - void connection_initialize(void) { if (_initialized) @@ -284,6 +320,9 @@ void connection_initialize(void) client_queue_init(&_connection_queue); client_queue_init(&_body_queue); + client_queue_start_thread(&_request_queue, process_request_queue); + client_queue_start_thread(&_body_queue, process_request_body_queue); + _initialized = 1; } @@ -295,7 +334,7 @@ void connection_shutdown(void) tls_ctx_unref(tls_ctx); matchfile_release(banned_ip); matchfile_release(allowed_ip); - + thread_rwlock_destroy(&_source_shutdown_rwlock); thread_spin_destroy (&_connection_lock); thread_mutex_destroy(&move_clients_mutex); @@ -553,7 +592,7 @@ int connection_read_put_back(connection_t *con, const void *buf, size_t len) } /* run along queue checking for any data that has come in or a timeout */ -static bool process_request_queue_one (client_queue_entry_t *node, int timeout) +static bool process_request_queue_one (client_queue_entry_t *node, time_t timeout) { client_t *client = node->client; int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset; @@ -571,7 +610,7 @@ static bool process_request_queue_one (client_queue_entry_t *node, int timeout) } if (len > 0) { - if (client->con->con_time + timeout <= time(NULL)) { + if (client->con->con_time <= timeout) { len = 0; } else { len = client_read_bytes(client, buf, len); @@ -642,28 +681,32 @@ static bool process_request_queue_one (client_queue_entry_t *node, int timeout) return false; } -static void process_request_queue (void) +static void * process_request_queue (client_queue_t *queue) { - client_queue_entry_t *stop = NULL; - client_queue_entry_t *node; - ice_config_t *config; - int timeout; + while (client_queue_running(queue)) { + client_queue_entry_t *stop = NULL; + client_queue_entry_t *node; + ice_config_t *config; + time_t timeout; - config = config_get_config(); - timeout = config->header_timeout; - config_release_config(); + config = config_get_config(); + timeout = time(NULL) - config->header_timeout; + config_release_config(); - client_queue_check_ready(&_request_queue, 100); + client_queue_check_ready(queue, QUEUE_READY_TIMEOUT, timeout); - while ((node = client_queue_shift_ready(&_request_queue, stop))) { - if (!process_request_queue_one(node, timeout)) { - client_queue_add(&_request_queue, node); - if (!stop) - stop = node; + while ((node = client_queue_shift_ready(queue, stop))) { + if (!process_request_queue_one(node, timeout)) { + client_queue_add(queue, node); + if (!stop) + stop = node; + } } + + _handle_connection(); } - _handle_connection(); + return NULL; } static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t *node, time_t timeout, size_t body_size_limit) @@ -704,42 +747,48 @@ static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t } /* This queue reads data from the body of clients. */ -static void process_request_body_queue (void) +static void * process_request_body_queue (client_queue_t *queue) { - client_queue_entry_t *stop = NULL; - client_queue_entry_t *node; - ice_config_t *config; - time_t timeout; - size_t body_size_limit; + while (client_queue_running(queue)) { + client_queue_entry_t *stop = NULL; + client_queue_entry_t *node; + ice_config_t *config; + time_t timeout; + size_t body_size_limit; - ICECAST_LOG_DDEBUG("Processing body queue."); + ICECAST_LOG_DDEBUG("Processing body queue."); - config = config_get_config(); - timeout = time(NULL) - config->body_timeout; - body_size_limit = config->body_size_limit; - config_release_config(); + config = config_get_config(); + timeout = time(NULL) - config->body_timeout; + body_size_limit = config->body_size_limit; + config_release_config(); - while ((node = client_queue_shift(&_body_queue, stop))) { - client_t *client = node->client; - client_slurp_result_t res; + client_queue_check_ready(queue, QUEUE_READY_TIMEOUT, timeout); - node->tried_body = 1; + while ((node = client_queue_shift(queue, stop))) { + client_t *client = node->client; + client_slurp_result_t res; - ICECAST_LOG_DEBUG("Got client %p in body queue.", client); + node->tried_body = 1; - res = process_request_body_queue_one(node, timeout, body_size_limit); + ICECAST_LOG_DEBUG("Got client %p in body queue.", client); - if (res == CLIENT_SLURP_NEEDS_MORE_DATA) { - client_queue_add(&_body_queue, node); - if (!stop) - stop = node; - } else { - ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); + res = process_request_body_queue_one(node, timeout, body_size_limit); - client_queue_add(&_connection_queue, node); - continue; + if (res == CLIENT_SLURP_NEEDS_MORE_DATA) { + client_queue_add(queue, node); + if (!stop) + stop = node; + } else { + ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client); + + client_queue_add(&_connection_queue, node); + continue; + } } } + + return NULL; } static client_queue_entry_t *create_client_node(client_t *client) @@ -813,26 +862,18 @@ void connection_queue(connection_t *con) void connection_accept_loop(void) { - connection_t *con; ice_config_t *config; - int duration = 300; config = config_get_config(); get_tls_certificate(config); config_release_config(); while (global.running == ICECAST_RUNNING) { - con = listensocket_container_accept(global.listensockets, duration); + connection_t *con = listensocket_container_accept(global.listensockets, QUEUE_READY_TIMEOUT); if (con) { connection_queue(con); - duration = 5; - } else { - if (client_queue_empty(&_request_queue) && client_queue_empty(&_body_queue)) - duration = 300; /* use longer timeouts when nothing waiting */ } - process_request_queue(); - process_request_body_queue(); } /* Give all the other threads notification to shut down */ From 8a7513a420148bb9afdd8aec620f27d1bec4cd56 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sun, 20 Mar 2022 11:15:09 +0000 Subject: [PATCH 08/14] Update: Do not let queue threads spin freely --- src/connection.c | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/connection.c b/src/connection.c index 69b3a576..6fdb09b6 100644 --- a/src/connection.c +++ b/src/connection.c @@ -268,6 +268,19 @@ static bool client_queue_check_ready(client_queue_t *queue, int timeout, time_t return true; } +static bool client_queue_check_ready_wait(client_queue_t *queue, int timeout, time_t connection_timeout) +{ + while (queue->running) { + if (client_queue_check_ready(queue, timeout, connection_timeout)) + return true; + + if (!queue->head) + thread_cond_wait(&(queue->cond)); + } + + return false; +} + static client_queue_entry_t * client_queue_shift_ready(client_queue_t *queue, client_queue_entry_t *stop) { #ifdef HAVE_POLL @@ -693,7 +706,7 @@ static void * process_request_queue (client_queue_t *queue) timeout = time(NULL) - config->header_timeout; config_release_config(); - client_queue_check_ready(queue, QUEUE_READY_TIMEOUT, timeout); + client_queue_check_ready_wait(queue, QUEUE_READY_TIMEOUT, timeout); while ((node = client_queue_shift_ready(queue, stop))) { if (!process_request_queue_one(node, timeout)) { @@ -763,7 +776,7 @@ static void * process_request_body_queue (client_queue_t *queue) body_size_limit = config->body_size_limit; config_release_config(); - client_queue_check_ready(queue, QUEUE_READY_TIMEOUT, timeout); + client_queue_check_ready_wait(queue, QUEUE_READY_TIMEOUT, timeout); while ((node = client_queue_shift(queue, stop))) { client_t *client = node->client; From e0f5c94de16a3c4d4a88036438da939bc11bf126 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sun, 20 Mar 2022 13:15:31 +0000 Subject: [PATCH 09/14] Update: Avoid new client waiting on old ones --- src/connection.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connection.c b/src/connection.c index 6fdb09b6..c6e61303 100644 --- a/src/connection.c +++ b/src/connection.c @@ -107,7 +107,7 @@ typedef struct { #endif } client_queue_t; -#define QUEUE_READY_TIMEOUT 800 +#define QUEUE_READY_TIMEOUT 50 static spin_t _connection_lock; // protects _current_id static volatile connection_id_t _current_id = 0; @@ -882,7 +882,7 @@ void connection_accept_loop(void) config_release_config(); while (global.running == ICECAST_RUNNING) { - connection_t *con = listensocket_container_accept(global.listensockets, QUEUE_READY_TIMEOUT); + connection_t *con = listensocket_container_accept(global.listensockets, 800); if (con) { connection_queue(con); From 196223cb2fb2224baadc17fab698c2e95d126b20 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sun, 20 Mar 2022 13:17:16 +0000 Subject: [PATCH 10/14] Update: Set queue thread name --- src/connection.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connection.c b/src/connection.c index c6e61303..845c5703 100644 --- a/src/connection.c +++ b/src/connection.c @@ -152,12 +152,12 @@ static void client_queue_destroy(client_queue_t *queue) #endif } -static void client_queue_start_thread(client_queue_t *queue, void *(*func)(client_queue_t *)) +static void client_queue_start_thread(client_queue_t *queue, const char *name, void *(*func)(client_queue_t *)) { if (queue->thread) return; queue->running = true; - queue->thread = thread_create("queue thread", (void*(*)(void*))func, queue, THREAD_ATTACHED); + queue->thread = thread_create(name, (void*(*)(void*))func, queue, THREAD_ATTACHED); } static inline bool client_queue_running(client_queue_t *queue) @@ -333,8 +333,8 @@ void connection_initialize(void) client_queue_init(&_connection_queue); client_queue_init(&_body_queue); - client_queue_start_thread(&_request_queue, process_request_queue); - client_queue_start_thread(&_body_queue, process_request_body_queue); + client_queue_start_thread(&_request_queue, "request queue", process_request_queue); + client_queue_start_thread(&_body_queue, "body queue", process_request_body_queue); _initialized = 1; } From d5ca0002841ebf75a312941e712c61fc53874fa3 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sun, 20 Mar 2022 16:03:46 +0000 Subject: [PATCH 11/14] Update: Added thread for handling new clients --- src/connection.c | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/connection.c b/src/connection.c index 845c5703..9893d82e 100644 --- a/src/connection.c +++ b/src/connection.c @@ -116,6 +116,7 @@ static int _initialized = 0; static client_queue_t _request_queue; static client_queue_t _connection_queue; static client_queue_t _body_queue; +static client_queue_t _handle_queue; static bool tls_ok = false; static tls_ctx_t *tls_ctx; @@ -129,6 +130,7 @@ static void get_tls_certificate(ice_config_t *config); static void free_client_node(client_queue_entry_t *node); static void * process_request_queue (client_queue_t *queue); static void * process_request_body_queue (client_queue_t *queue); +static void * handle_client_worker(client_queue_t *queue); static void client_queue_init(client_queue_t *queue) { @@ -332,9 +334,11 @@ void connection_initialize(void) client_queue_init(&_request_queue); client_queue_init(&_connection_queue); client_queue_init(&_body_queue); + client_queue_init(&_handle_queue); client_queue_start_thread(&_request_queue, "request queue", process_request_queue); client_queue_start_thread(&_body_queue, "body queue", process_request_body_queue); + client_queue_start_thread(&_handle_queue, "Client Handler", handle_client_worker); _initialized = 1; } @@ -354,6 +358,7 @@ void connection_shutdown(void) client_queue_destroy(&_request_queue); client_queue_destroy(&_connection_queue); client_queue_destroy(&_body_queue); + client_queue_destroy(&_handle_queue); _initialized = 0; } @@ -1159,9 +1164,8 @@ static void _handle_connection(void) if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); - free_client_node(node); - connection_handle_client(client); + client_queue_add(&_handle_queue, node); } else { free_client_node(node); ICECAST_LOG_ERROR("HTTP request parsing failed"); @@ -1170,6 +1174,22 @@ static void _handle_connection(void) } } +static void * handle_client_worker(client_queue_t *queue) +{ + while (client_queue_running(queue)) { + client_queue_entry_t *node = client_queue_shift(queue, NULL); + if (node) { + client_t *client = node->client; + free_client_node(node); + + connection_handle_client(client); + } else { + client_queue_wait(queue); + } + } + + return NULL; +} static void __on_sock_count(size_t count, void *userdata) { From a167bb41d652b7670614d475c44fa2b57e602ca0 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sun, 20 Mar 2022 16:10:58 +0000 Subject: [PATCH 12/14] Update: Corrected thread names --- src/connection.c | 4 ++-- src/event.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connection.c b/src/connection.c index 9893d82e..4e615252 100644 --- a/src/connection.c +++ b/src/connection.c @@ -336,8 +336,8 @@ void connection_initialize(void) client_queue_init(&_body_queue); client_queue_init(&_handle_queue); - client_queue_start_thread(&_request_queue, "request queue", process_request_queue); - client_queue_start_thread(&_body_queue, "body queue", process_request_body_queue); + client_queue_start_thread(&_request_queue, "Request Queue", process_request_queue); + client_queue_start_thread(&_body_queue, "Body Queue", process_request_body_queue); client_queue_start_thread(&_handle_queue, "Client Handler", handle_client_worker); _initialized = 1; diff --git a/src/event.c b/src/event.c index c43b3f92..84cc2eae 100644 --- a/src/event.c +++ b/src/event.c @@ -208,7 +208,7 @@ void event_initialise(void) { thread_mutex_unlock(&event_lock); /* start thread */ - event_thread = thread_create("events thread", event_run_thread, NULL, THREAD_ATTACHED); + event_thread = thread_create("Events Thread", event_run_thread, NULL, THREAD_ATTACHED); } void event_shutdown(void) { From efbb64e618c7e6538585e458b37993c29bce7c2c Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 21 Mar 2022 08:48:23 +0000 Subject: [PATCH 13/14] Update: Replaced listensocket container's mutex with rwlock --- src/listensocket.c | 49 ++++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/src/listensocket.c b/src/listensocket.c index fb39fb06..6265b952 100644 --- a/src/listensocket.c +++ b/src/listensocket.c @@ -36,7 +36,7 @@ struct listensocket_container_tag { refobject_base_t __base; - mutex_t lock; + rwlock_t rwlock; bool prefer_inet6; listensocket_t **sock; int *sockref; @@ -146,10 +146,10 @@ static void __listensocket_container_clear_sockets(listensocket_container_t *sel static void __listensocket_container_free(refobject_t self, void **userdata) { listensocket_container_t *container = REFOBJECT_TO_TYPE(self, listensocket_container_t *); - thread_mutex_lock(&container->lock); + thread_rwlock_wlock(&container->rwlock); __listensocket_container_clear_sockets(container); - thread_mutex_unlock(&container->lock); - thread_mutex_destroy(&container->lock); + thread_rwlock_unlock(&container->rwlock); + thread_rwlock_destroy(&container->rwlock); } int __listensocket_container_new(refobject_t self, const refobject_type_t *type, va_list ap) @@ -161,7 +161,7 @@ int __listensocket_container_new(refobject_t self, const refobject_type_t *type, ret->sockcount_cb = NULL; ret->sockcount_userdata = NULL; - thread_mutex_create(&ret->lock); + thread_rwlock_create(&ret->rwlock); return 0; } @@ -203,9 +203,9 @@ int listensocket_container_configure(listensocket_contai if (!self) return -1; - thread_mutex_lock(&self->lock); + thread_rwlock_wlock(&self->rwlock); ret = listensocket_container_configure__unlocked(self, config); - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return ret; } @@ -281,7 +281,7 @@ int listensocket_container_configure_and_setup(listensoc prefer_inet6 = sock_is_ipv4_mapped_supported(); /* test before we enter lock to minimise locked time */ - thread_mutex_lock(&self->lock); + thread_rwlock_wlock(&self->rwlock); self->prefer_inet6 = prefer_inet6; cb = self->sockcount_cb; self->sockcount_cb = NULL; @@ -294,7 +294,7 @@ int listensocket_container_configure_and_setup(listensoc self->sockcount_cb = cb; __call_sockcount_cb(self); - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return ret; } @@ -309,10 +309,10 @@ int listensocket_container_setup(listensocket_container_ prefer_inet6 = sock_is_ipv4_mapped_supported(); /* test before we enter lock to minimise locked time */ - thread_mutex_lock(&self->lock); + thread_rwlock_wlock(&self->rwlock); self->prefer_inet6 = prefer_inet6; ret = listensocket_container_setup__unlocked(self); - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return ret; } @@ -442,10 +442,10 @@ connection_t * listensocket_container_accept(listensocket_container if (!self) return NULL; - thread_mutex_lock(&self->lock); + thread_rwlock_rlock(&self->rwlock); ls = listensocket_container_accept__inner(self, timeout); refobject_ref(ls); - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); ret = listensocket_accept(ls, self); refobject_unref(ls); @@ -458,10 +458,10 @@ int listensocket_container_set_sockcount_cb(listensocket if (!self) return -1; - thread_mutex_lock(&self->lock); + thread_rwlock_wlock(&self->rwlock); self->sockcount_cb = cb; self->sockcount_userdata = userdata; - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return 0; } @@ -473,9 +473,9 @@ ssize_t listensocket_container_sockcount(listensocket_contai if (!self) return -1; - thread_mutex_lock(&self->lock); + thread_rwlock_rlock(&self->rwlock); ret = listensocket_container_sockcount__unlocked(self); - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return ret; } @@ -499,6 +499,7 @@ listensocket_t * listensocket_container_get_by_id(listensocket_container_t *self size_t i; const listener_t *listener; + thread_rwlock_rlock(&self->rwlock); for (i = 0; i < self->sock_len; i++) { if (self->sock[i] != NULL) { listener = listensocket_get_listener(self->sock[i]); @@ -506,6 +507,7 @@ listensocket_t * listensocket_container_get_by_id(listensocket_container_t *self if (listener->id != NULL && strcmp(listener->id, id) == 0) { if (refobject_ref(self->sock[i]) == 0) { listensocket_release_listener(self->sock[i]); + thread_rwlock_unlock(&self->rwlock); return self->sock[i]; } } @@ -513,6 +515,7 @@ listensocket_t * listensocket_container_get_by_id(listensocket_container_t *self } } } + thread_rwlock_unlock(&self->rwlock); return NULL; } @@ -523,10 +526,10 @@ listensocket_t ** listensocket_container_list_sockets(listensocket_con size_t idx = 0; size_t i; - thread_mutex_lock(&self->lock); + thread_rwlock_rlock(&self->rwlock); res = calloc(self->sock_len + 1, sizeof(*res)); if (!res) { - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return NULL; } @@ -536,7 +539,7 @@ listensocket_t ** listensocket_container_list_sockets(listensocket_con } } - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return res; } @@ -545,16 +548,16 @@ bool listensocket_container_is_family_included(listensock { size_t i; - thread_mutex_lock(&self->lock); + thread_rwlock_rlock(&self->rwlock); for (i = 0; i < self->sock_len; i++) { if (self->sock[i] != NULL) { if (listensocket_get_family(self->sock[i]) == family) { - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return true; } } } - thread_mutex_unlock(&self->lock); + thread_rwlock_unlock(&self->rwlock); return false; } From 4384287195b171c7cf3a73b78e4de630e34a06ed Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Mon, 21 Mar 2022 08:49:12 +0000 Subject: [PATCH 14/14] Update: Run _handle_connection() as own thread --- src/connection.c | 128 +++++++++++++++++++++++++---------------------- 1 file changed, 67 insertions(+), 61 deletions(-) diff --git a/src/connection.c b/src/connection.c index 4e615252..5fcb89cf 100644 --- a/src/connection.c +++ b/src/connection.c @@ -125,9 +125,9 @@ static matchfile_t *banned_ip, *allowed_ip; rwlock_t _source_shutdown_rwlock; -static void _handle_connection(void); static void get_tls_certificate(ice_config_t *config); static void free_client_node(client_queue_entry_t *node); +static void * _handle_connection(client_queue_t *queue); static void * process_request_queue (client_queue_t *queue); static void * process_request_body_queue (client_queue_t *queue); static void * handle_client_worker(client_queue_t *queue); @@ -337,6 +337,7 @@ void connection_initialize(void) client_queue_init(&_handle_queue); client_queue_start_thread(&_request_queue, "Request Queue", process_request_queue); + client_queue_start_thread(&_connection_queue, "Con Queue", _handle_connection); client_queue_start_thread(&_body_queue, "Body Queue", process_request_body_queue); client_queue_start_thread(&_handle_queue, "Client Handler", handle_client_worker); @@ -720,8 +721,6 @@ static void * process_request_queue (client_queue_t *queue) stop = node; } } - - _handle_connection(); } return NULL; @@ -1102,76 +1101,83 @@ static int _need_body(client_queue_entry_t *node) * the contents provided. We set up the parser then hand off to the specific * request handler. */ -static void _handle_connection(void) +static void * _handle_connection(client_queue_t *queue) { - http_parser_t *parser; - const char *rawuri; - client_queue_entry_t *node; + while (client_queue_running(queue)) { + client_queue_entry_t *node; - while ((node = client_queue_shift(&_connection_queue, NULL))) { - client_t *client = node->client; - int already_parsed = 0; + node = client_queue_shift(&_connection_queue, NULL); + if (node) { + client_t *client = node->client; + http_parser_t *parser; + const char *rawuri; + int already_parsed = 0; - /* Check for special shoutcast compatability processing */ - if (node->shoutcast) { - _handle_shoutcast_compatible (node); - if (node->shoutcast) - continue; - } - - /* process normal HTTP headers */ - if (client->parser) { - already_parsed = 1; - parser = client->parser; - } else { - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); - client->parser = parser; - } - if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) { - client->refbuf->len = 0; - - /* early check if we need more data */ - client_complete(client); - if (_need_body(node)) { - /* Just calling client_queue_add(&_body_queue, node) would do the job. - * However, if the client only has a small body this might work without moving it between queues. - * -> much faster. - */ - client_slurp_result_t res; - ice_config_t *config; - time_t timeout; - size_t body_size_limit; - - config = config_get_config(); - timeout = time(NULL) - config->body_timeout; - body_size_limit = config->body_size_limit; - config_release_config(); - - res = process_request_body_queue_one(node, timeout, body_size_limit); - if (res != CLIENT_SLURP_SUCCESS) { - ICECAST_LOG_DEBUG("Putting client %p in body queue.", client); - client_queue_add(&_body_queue, node); + /* Check for special shoutcast compatability processing */ + if (node->shoutcast) { + _handle_shoutcast_compatible (node); + if (node->shoutcast) continue; - } else { - ICECAST_LOG_DEBUG("Success on fast lane"); - } } - rawuri = httpp_getvar(parser, HTTPP_VAR_URI); + /* process normal HTTP headers */ + if (client->parser) { + already_parsed = 1; + parser = client->parser; + } else { + parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + client->parser = parser; + } + if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) { + client->refbuf->len = 0; - /* assign a port-based shoutcast mountpoint if required */ - if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) - httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); + /* early check if we need more data */ + client_complete(client); + if (_need_body(node)) { + /* Just calling client_queue_add(&_body_queue, node) would do the job. + * However, if the client only has a small body this might work without moving it between queues. + * -> much faster. + */ + client_slurp_result_t res; + ice_config_t *config; + time_t timeout; + size_t body_size_limit; + + config = config_get_config(); + timeout = time(NULL) - config->body_timeout; + body_size_limit = config->body_size_limit; + config_release_config(); + + res = process_request_body_queue_one(node, timeout, body_size_limit); + if (res != CLIENT_SLURP_SUCCESS) { + ICECAST_LOG_DEBUG("Putting client %p in body queue.", client); + client_queue_add(&_body_queue, node); + continue; + } else { + ICECAST_LOG_DEBUG("Success on fast lane"); + } + } + + rawuri = httpp_getvar(parser, HTTPP_VAR_URI); + + /* assign a port-based shoutcast mountpoint if required */ + if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) + httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); - client_queue_add(&_handle_queue, node); + client_queue_add(&_handle_queue, node); + } else { + free_client_node(node); + ICECAST_LOG_ERROR("HTTP request parsing failed"); + client_destroy (client); + } } else { - free_client_node(node); - ICECAST_LOG_ERROR("HTTP request parsing failed"); - client_destroy (client); + client_queue_wait(queue); } } + + return NULL; } static void * handle_client_worker(client_queue_t *queue)