From ae0e8e01837e0f6687e9bc3737493f496627e330 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Wed, 24 Jan 2024 11:20:30 +0000 Subject: [PATCH 01/17] Feature: Added stubs for event_stream.[ch] --- src/Makefile.am | 2 ++ src/event_stream.c | 23 +++++++++++++++++++++++ src/event_stream.h | 14 ++++++++++++++ 3 files changed, 39 insertions(+) create mode 100644 src/event_stream.c create mode 100644 src/event_stream.h diff --git a/src/Makefile.am b/src/Makefile.am index 2ee0f0af..f05666a4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -47,6 +47,7 @@ noinst_HEADERS = \ fastevent.h \ navigation.h \ event.h \ + event_stream.h \ ping.h \ acl.h auth.h \ metadata_xiph.h \ @@ -114,6 +115,7 @@ icecast_SOURCES = \ format_skeleton.c \ format_opus.c \ event.c \ + event_stream.c \ event_log.c \ event_exec.c \ acl.c \ diff --git a/src/event_stream.c b/src/event_stream.c new file mode 100644 index 00000000..50ff4a23 --- /dev/null +++ b/src/event_stream.c @@ -0,0 +1,23 @@ +/* Icecast + * + * This program is distributed under the GNU General Public License, version 2. + * A copy of this license is included with this source. + * + * Copyright 2024 , Philipp "ph3-der-loewe" Schafft , + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include + +#include "event_stream.h" +#include "string_renderer.h" +#include "json.h" +#include "global.h" +#include "errors.h" +#include "logging.h" +#define CATMODULE "event-stream" + diff --git a/src/event_stream.h b/src/event_stream.h new file mode 100644 index 00000000..8928dd69 --- /dev/null +++ b/src/event_stream.h @@ -0,0 +1,14 @@ +/* Icecast + * + * This program is distributed under the GNU General Public License, version 2. + * A copy of this license is included with this source. + * + * Copyright 2024 , Philipp "ph3-der-loewe" Schafft , + */ + +#ifndef __EVENT_STREAM_H__ +#define __EVENT_STREAM_H__ + +#include "icecasttypes.h" + +#endif From 11ae1e87df39ad9ce4e8c87be632ce3b8b4cce02 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Wed, 24 Jan 2024 16:06:32 +0000 Subject: [PATCH 02/17] Feature: Added most basic event stream implementation --- src/acl.c | 2 +- src/admin.c | 10 ++ src/event.c | 17 +-- src/event_stream.c | 355 ++++++++++++++++++++++++++++++++++++++++++++- src/event_stream.h | 8 + src/icecasttypes.h | 5 + src/main.c | 3 + 7 files changed, 383 insertions(+), 17 deletions(-) diff --git a/src/acl.c b/src/acl.c index 908ffe64..fc8c60dd 100644 --- a/src/acl.c +++ b/src/acl.c @@ -111,7 +111,7 @@ acl_t *acl_new(void) acl_set_method_str(ret, ACL_POLICY_ALLOW, "get,options"); acl_set_admin_str(ret, ACL_POLICY_DENY, "*"); - acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,publicstats,publicstats.json"); + acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,eventstream,publicstats,publicstats.json"); acl_set_web_policy(ret, ACL_POLICY_ALLOW); diff --git a/src/admin.c b/src/admin.c index 825d3bd5..958d55b4 100644 --- a/src/admin.c +++ b/src/admin.c @@ -53,6 +53,7 @@ #include "listensocket.h" #include "refbuf.h" #include "client.h" +#include "event_stream.h" #include "source.h" #include "global.h" #include "stats.h" @@ -149,6 +150,7 @@ #define DEFAULT_RAW_REQUEST "" #define DEFAULT_HTML_REQUEST "" #define BUILDM3U_RAW_REQUEST "buildm3u" +#define EVENTSTREAM_RAW_REQUEST "eventstream" typedef struct { size_t listeners; @@ -179,6 +181,7 @@ static void command_dumpfile_control (client_t *client, source_t *source, adm static void command_manageauth (client_t *client, source_t *source, admin_format_t response); static void command_updatemetadata (client_t *client, source_t *source, admin_format_t response); static void command_buildm3u (client_t *client, source_t *source, admin_format_t response); +static void command_eventstream (client_t *client, source_t *source, admin_format_t response); static void command_show_log (client_t *client, source_t *source, admin_format_t response); static void command_mark_log (client_t *client, source_t *source, admin_format_t response); static void command_dashboard (client_t *client, source_t *source, admin_format_t response); @@ -232,6 +235,7 @@ static const admin_command_handler_t handlers[] = { { UPDATEMETADATA_HTML_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_HTML, ADMINSAFE_SAFE, command_updatemetadata, NULL}, { UPDATEMETADATA_JSON_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_JSON, ADMINSAFE_SAFE, command_updatemetadata, NULL}, { BUILDM3U_RAW_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_buildm3u, NULL}, + { EVENTSTREAM_RAW_REQUEST, ADMINTYPE_HYBRID, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_eventstream, NULL}, { SHOWLOG_RAW_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_show_log, NULL}, { SHOWLOG_HTML_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_HTML, ADMINSAFE_SAFE, command_show_log, NULL}, { SHOWLOG_JSON_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_JSON, ADMINSAFE_SAFE, command_show_log, NULL}, @@ -1097,6 +1101,12 @@ static void command_buildm3u(client_t *client, source_t *source, admin_format_t client_send_buffer(client, 200, "audio/x-mpegurl", NULL, buffer, -1, "Content-Disposition: attachment; filename=listen.m3u\r\n"); } +static void command_eventstream (client_t *client, source_t *source, admin_format_t response) +{ + (void)source, (void)response; + event_stream_add_client(client); +} + xmlNodePtr admin_add_role_to_authentication(auth_t *auth, xmlNodePtr parent) { xmlNodePtr rolenode = xmlNewChild(parent, NULL, XMLSTR("role"), NULL); diff --git a/src/event.c b/src/event.c index daa737bc..a675f71b 100644 --- a/src/event.c +++ b/src/event.c @@ -21,6 +21,7 @@ #include #include "event.h" +#include "event_stream.h" #include "fastevent.h" #include "logging.h" #include "string_renderer.h" @@ -426,6 +427,7 @@ void event_registration_push(event_registration_t **er, event_registration_t *ta /* event signaling */ void event_emit(event_t *event) { fastevent_emit(FASTEVENT_TYPE_SLOWEVENT, FASTEVENT_FLAG_NONE, FASTEVENT_DATATYPE_EVENT, event); + event_stream_emit_event(event); thread_mutex_lock(&event_lock); event_push(&event_queue, event); thread_mutex_unlock(&event_lock); @@ -476,21 +478,6 @@ void event_emit_va(const char *trigger, ...) { event_push_reglist(event, mount->event); config_release_config(); - /* This isn't perfectly clean but is an important speedup: - * If first element of reglist is NULL none of the above pushed in - * some registrations. If there are no registrations we can just drop - * this event now and here. - * We do this before inserting all the data into the object to avoid - * all the strdups() and stuff in case they aren't needed. - */ -#ifndef FASTEVENT_ENABLED - if (event->reglist[0] == NULL) { - /* we have no registrations, drop this event. */ - igloo_ro_unref(&event); - return; - } -#endif - if (uri) extra_add(event, EVENT_EXTRA_KEY_URI, uri); diff --git a/src/event_stream.c b/src/event_stream.c index 50ff4a23..ed746cdc 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -10,14 +10,367 @@ #include #endif -#include +#include +#include +#include +#include "icecasttypes.h" + #include +#include +#include +#include + +#include "common/thread/thread.h" +#include "common/avl/avl.h" #include "event_stream.h" #include "string_renderer.h" #include "json.h" +#include "util.h" +#include "fserve.h" #include "global.h" +#include "event.h" +#include "client.h" +#include "connection.h" #include "errors.h" #include "logging.h" #define CATMODULE "event-stream" +struct event_stream_event_tag { + igloo_ro_full_t __parent; + + const char * uuid; + const char * rendered; + size_t rendered_length; + + event_t *event; + + event_stream_event_t *next; +}; + +typedef struct { + const char *current_buffer; + size_t todo; + event_stream_event_t *current_event; +} event_stream_clientstate_t; + +static void event_stream_event_free(igloo_ro_t self); +static void *event_stream_thread_function(void *arg); +static void event_stream_event_render(event_stream_event_t *event); + +igloo_RO_PUBLIC_TYPE(event_stream_event_t, igloo_ro_full_t, + igloo_RO_TYPEDECL_FREE(event_stream_event_free), + ); + +static mutex_t event_stream_event_mutex; // protects: event_queue, event_queue_next, event_stream_thread, alive +static event_stream_event_t *event_queue; +static event_stream_event_t **event_queue_next = &event_queue; +static thread_type *event_stream_thread; +static cond_t event_stream_cond; +static avl_tree *client_tree; +static bool alive; + +static void event_stream_clientstate_free(client_t *client) +{ + event_stream_clientstate_t *state = client->format_data; + client->format_data = NULL; + + if (!state) + return; + + igloo_ro_unref(&(state->current_event)); + + free(state); +} + +static void event_stream_event_free(igloo_ro_t self) +{ + event_stream_event_t *event = igloo_ro_to_type(self, event_stream_event_t); + igloo_sp_unref(&(event->uuid), igloo_instance); + igloo_sp_unref(&(event->rendered), igloo_instance); + igloo_ro_unref(&(event->event)); + igloo_ro_unref(&(event->next)); +} + +static event_stream_event_t * event_stream_event_new(void) +{ + event_stream_event_t *event; + + if (igloo_ro_new_raw(&event, event_stream_event_t, igloo_instance) != igloo_ERROR_NONE) + return NULL; + + if (igloo_uuid_new_random_sp(&(event->uuid), igloo_instance) != igloo_ERROR_NONE) { + igloo_ro_unref(&event); + return NULL; + } + + return event; +} + +static void event_stream_queue(event_stream_event_t *event) +{ + event_stream_event_render(event); + + thread_mutex_lock(&event_stream_event_mutex); + *event_queue_next = event; + event_queue_next = &(event->next); + thread_mutex_unlock(&event_stream_event_mutex); + + thread_cond_broadcast(&event_stream_cond); + ICECAST_LOG_INFO("event queued"); +} + +static int _free_client(void *key) +{ + client_t *client = (client_t *)key; + client_destroy(client); + return 1; +} + +void event_stream_initialise(void) +{ + thread_mutex_create(&event_stream_event_mutex); + thread_cond_create(&event_stream_cond); + client_tree = avl_tree_new(client_compare, NULL); + alive = true; +} + +void event_stream_shutdown(void) +{ + thread_mutex_lock(&event_stream_event_mutex); + alive = false; + thread_mutex_unlock(&event_stream_event_mutex); + + thread_cond_broadcast(&event_stream_cond); + + if (event_stream_thread) + thread_join(event_stream_thread); + + avl_tree_free(client_tree, _free_client); + + thread_mutex_lock(&event_stream_event_mutex); + igloo_ro_unref(&event_queue); + thread_mutex_unlock(&event_stream_event_mutex); + + thread_mutex_destroy(&event_stream_event_mutex); + thread_cond_destroy(&event_stream_cond); +} + +void event_stream_add_client_inner(client_t *client, void *ud) +{ + event_stream_clientstate_t *state = calloc(1, sizeof(event_stream_clientstate_t)); + + (void)ud; + + if (!state) { + client_destroy(client); + return; + } + + thread_mutex_lock(&event_stream_event_mutex); + igloo_ro_ref(event_queue, &(state->current_event), event_stream_event_t); + thread_mutex_unlock(&event_stream_event_mutex); + + client->format_data = state; + client->free_client_data = event_stream_clientstate_free; + + avl_tree_wlock(client_tree); + avl_insert(client_tree, client); + avl_tree_unlock(client_tree); + + thread_mutex_lock(&event_stream_event_mutex); + if (!event_stream_thread) { + event_stream_thread = thread_create("Event Stream Thread", event_stream_thread_function, (void *)NULL, THREAD_ATTACHED); + } + thread_mutex_unlock(&event_stream_event_mutex); +} + +void event_stream_add_client(client_t *client) +{ + ssize_t len = util_http_build_header(client->refbuf->data, PER_CLIENT_REFBUF_SIZE, 0, + 0, 200, NULL, + "text/event-stream", NULL, + "", NULL, client + ); + if (len < 1 || len > PER_CLIENT_REFBUF_SIZE) + return; + + client->refbuf->len = len; + + fserve_add_client_callback(client, event_stream_add_client_inner, NULL); +} + +void event_stream_emit_event(event_t *event) +{ + event_stream_event_t *el = event_stream_event_new(); + if (!el) + return; + + igloo_ro_ref_replace(event, &(el->event), event_t); + + event_stream_queue(el); +} + +static void event_stream_send_to_client(client_t *client) +{ + event_stream_clientstate_t *state = client->format_data; + bool going = true; + + ICECAST_LOG_INFO("Sending to client %p {.con.id = %llu}, with state %p", client, (long long unsigned int)client->con->id, state); + + do { + if (!state->current_buffer) { + state->current_buffer = state->current_event->rendered; + state->todo = state->current_event->rendered_length; + } + + if (state->todo) { + ssize_t done = client_send_bytes(client, state->current_buffer, state->todo); + if (done < 0) { + return; + } else { + state->todo -= done; + state->current_buffer += done; + } + } + + if (!state->todo) { + if (state->current_event->next) { + igloo_ro_ref_replace(state->current_event->next, &(state->current_event), event_stream_event_t); + state->current_buffer = NULL; + } else { + going = false; + } + } + } while (going); +} + +static void *event_stream_thread_function(void *arg) +{ + bool running = true; + + ICECAST_LOG_INFO("Good morning!"); + + do { + thread_cond_wait(&event_stream_cond); + ICECAST_LOG_INFO("Tick!"); + + { + avl_tree_wlock(client_tree); + avl_node *next; + + next = avl_get_first(client_tree); + while (next) { + avl_node *node = next; + client_t *client = node->key; + + next = avl_get_next(next); + + event_stream_send_to_client(client); + if (client->con->error) { + avl_delete(client_tree, (void *) client, _free_client); + } + } + avl_tree_unlock(client_tree); + } + + thread_mutex_lock(&event_stream_event_mutex); + running = alive; + + { + event_stream_event_t *head = event_queue; + while (head) { + ICECAST_LOG_INFO("Event: % #H", head->rendered); + head = head->next; + } + } + + thread_mutex_unlock(&event_stream_event_mutex); + } while (running); + + ICECAST_LOG_INFO("Good evening!"); + return NULL; +} + +static void add_number(json_renderer_t *json, const char *key, intmax_t val) +{ + if (val < 1) + return; + + json_renderer_write_key(json, key, JSON_RENDERER_FLAGS_NONE); + json_renderer_write_uint(json, val); +} + +static void event_stream_event_render(event_stream_event_t *event) +{ + string_renderer_t * renderer; + json_renderer_t *json; + char *body; + + if (event->rendered) + return; + + if (igloo_ro_new(&renderer, string_renderer_t, igloo_instance) != igloo_ERROR_NONE) + return; + json = json_renderer_create(JSON_RENDERER_FLAGS_NONE); + if (!json) { + igloo_ro_unref(&renderer); + return; + } + + json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT); + if (event->event) { + event_t *uevent = event->event; + + json_renderer_write_key(json, "type", JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, "event", JSON_RENDERER_FLAGS_NONE); + + json_renderer_write_key(json, "crude", JSON_RENDERER_FLAGS_NONE); + json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT); + { + static const event_extra_key_t key_list[] = { + EVENT_EXTRA_KEY_URI, + EVENT_EXTRA_KEY_SOURCE_MEDIA_TYPE, + EVENT_EXTRA_KEY_SOURCE_INSTANCE_UUID, + EVENT_EXTRA_KEY_CONNECTION_IP, + EVENT_EXTRA_KEY_CLIENT_ROLE, + EVENT_EXTRA_KEY_CLIENT_USERNAME, + EVENT_EXTRA_KEY_CLIENT_USERAGENT, + EVENT_EXTRA_KEY_DUMPFILE_FILENAME, + EVENT_EXTRA_LIST_END + }; + json_renderer_write_key(json, "trigger", JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, uevent->trigger, JSON_RENDERER_FLAGS_NONE); + + for (size_t i = 0; key_list[i] != EVENT_EXTRA_LIST_END; i++) { + const char *value = event_extra_get(uevent, key_list[i]); + + if (!value) + continue; + + json_renderer_write_key(json, event_extra_key_name(key_list[i]), JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, value, JSON_RENDERER_FLAGS_NONE); + } + + add_number(json, "connection-id", uevent->connection_id); + add_number(json, "connection-time", uevent->connection_time); + add_number(json, "client-admin-command", uevent->client_admin_command); + } + json_renderer_end(json); + } + json_renderer_end(json); + + body = json_renderer_finish(&json); + + string_renderer_start_list(renderer, "\r\n", ": ", false, false, STRING_RENDERER_ENCODING_PLAIN); + string_renderer_add_kv(renderer, "id", event->uuid); + string_renderer_add_kv(renderer, "data", body); + string_renderer_end_list(renderer); + string_renderer_add_string(renderer, "\r\n\r\n"); + + free(body); + + igloo_sp_replace(string_renderer_to_string_zero_copy(renderer), &(event->rendered), igloo_instance); + event->rendered_length = strlen(event->rendered); + igloo_ro_unref(&renderer); +} diff --git a/src/event_stream.h b/src/event_stream.h index 8928dd69..f2ac6ffe 100644 --- a/src/event_stream.h +++ b/src/event_stream.h @@ -11,4 +11,12 @@ #include "icecasttypes.h" +igloo_RO_FORWARD_TYPE(event_stream_event_t); + +void event_stream_initialise(void); +void event_stream_shutdown(void); + +void event_stream_add_client(client_t *client); +void event_stream_emit_event(event_t *event); + #endif diff --git a/src/icecasttypes.h b/src/icecasttypes.h index dc30b8d8..6a60b28c 100644 --- a/src/icecasttypes.h +++ b/src/icecasttypes.h @@ -143,6 +143,10 @@ typedef struct geoip_db_tag geoip_db_t; typedef struct event_tag event_t; typedef struct event_registration_tag event_registration_t; +/* ---[ event_stream.[ch] ]--- */ + +typedef struct event_stream_event_tag event_stream_event_t; + /* ---[ refobject.[ch] ]--- */ typedef struct refobject_base_tag refobject_base_t; @@ -167,6 +171,7 @@ typedef void * refobject_t; igloo_RO_TYPE(geoip_db_t) \ igloo_RO_TYPE(event_t) \ igloo_RO_TYPE(event_registration_t) \ + igloo_RO_TYPE(event_stream_event_t) \ igloo_RO_TYPE(module_t) \ igloo_RO_TYPE(module_container_t) diff --git a/src/main.c b/src/main.c index e021b3fd..842cf7c2 100644 --- a/src/main.c +++ b/src/main.c @@ -88,6 +88,7 @@ #include "yp.h" #include "auth.h" #include "event.h" +#include "event_stream.h" #include "ping.h" #include "listensocket.h" #include "fastevent.h" @@ -186,6 +187,7 @@ static void initialize_subsystems(void) static void shutdown_subsystems(void) { ICECAST_LOG_DEBUG("Shuting down subsystems..."); + event_stream_shutdown(); event_shutdown(); fserve_shutdown(); refbuf_shutdown(); @@ -785,6 +787,7 @@ int main(int argc, char **argv) slave_initialize(); auth_initialise (); event_initialise(); + event_stream_initialise(); event_emit_global("icecast-start"); _server_proc(); From 1c4ce97d6ced660246b1a2c976f56fa27f3cece7 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 10:41:23 +0000 Subject: [PATCH 03/17] Update: Let the event stream run once a second even if no new event is there to process clients --- src/event_stream.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/event_stream.c b/src/event_stream.c index ed746cdc..ad71d7b9 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -183,6 +183,8 @@ void event_stream_add_client_inner(client_t *client, void *ud) event_stream_thread = thread_create("Event Stream Thread", event_stream_thread_function, (void *)NULL, THREAD_ATTACHED); } thread_mutex_unlock(&event_stream_event_mutex); + + thread_cond_broadcast(&event_stream_cond); } void event_stream_add_client(client_t *client) @@ -252,8 +254,7 @@ static void *event_stream_thread_function(void *arg) ICECAST_LOG_INFO("Good morning!"); do { - thread_cond_wait(&event_stream_cond); - ICECAST_LOG_INFO("Tick!"); + thread_cond_timedwait(&event_stream_cond, 1000); { avl_tree_wlock(client_tree); From 165a56c819443152759a6b5914c8e187404d388e Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 11:03:18 +0000 Subject: [PATCH 04/17] Feature: Added support to kick client that have fallen behind --- src/event_stream.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/event_stream.c b/src/event_stream.c index ad71d7b9..e9abf23e 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -39,6 +39,8 @@ struct event_stream_event_tag { igloo_ro_full_t __parent; + bool removed; // removed from the queue, clients referencing this are fallen too far behind + const char * uuid; const char * rendered; size_t rendered_length; @@ -245,6 +247,13 @@ static void event_stream_send_to_client(client_t *client) } } } while (going); + + + if (state->current_event->removed) { + ICECAST_LOG_INFO("Client %p %lu (%s) has fallen too far behind, removing", + client, client->con->id, client->con->ip); + client->con->error = 1; + } } static void *event_stream_thread_function(void *arg) From 7f59a845a8dbff5a7ee85ab2fd55d3cc04e12d77 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 11:11:00 +0000 Subject: [PATCH 05/17] Feature: Trim the queue to 32 entries max --- src/event_stream.c | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/event_stream.c b/src/event_stream.c index e9abf23e..cfc78760 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -256,6 +256,32 @@ static void event_stream_send_to_client(client_t *client) } } +static void event_stream_cleanup_queue(void) +{ + thread_mutex_lock(&event_stream_event_mutex); + { + static const size_t to_keep = 32; + event_stream_event_t *cur; + size_t count = 0; + + cur = event_queue; + while (cur) { + count++; + cur = cur->next; + } + + if (count > to_keep) { + for (size_t to_remove = count - to_keep; to_remove; to_remove--) { + cur = event_queue; + event_queue = cur->next; + cur->removed = 1; + cur->next = NULL; + } + } + } + thread_mutex_unlock(&event_stream_event_mutex); +} + static void *event_stream_thread_function(void *arg) { bool running = true; @@ -265,6 +291,8 @@ static void *event_stream_thread_function(void *arg) do { thread_cond_timedwait(&event_stream_cond, 1000); + event_stream_cleanup_queue(); + { avl_tree_wlock(client_tree); avl_node *next; From a3e6c78f1194efa67f08815ffa90b9ed4a04cca9 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 11:11:56 +0000 Subject: [PATCH 06/17] Update: Run the check for removed events outside of the sending function --- src/event_stream.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/event_stream.c b/src/event_stream.c index cfc78760..78b5c563 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -247,13 +247,6 @@ static void event_stream_send_to_client(client_t *client) } } } while (going); - - - if (state->current_event->removed) { - ICECAST_LOG_INFO("Client %p %lu (%s) has fallen too far behind, removing", - client, client->con->id, client->con->ip); - client->con->error = 1; - } } static void event_stream_cleanup_queue(void) @@ -305,6 +298,14 @@ static void *event_stream_thread_function(void *arg) next = avl_get_next(next); event_stream_send_to_client(client); + { + event_stream_clientstate_t *state = client->format_data; + if (state->current_event->removed) { + ICECAST_LOG_INFO("Client %p %lu (%s) has fallen too far behind, removing", + client, client->con->id, client->con->ip); + client->con->error = 1; + } + } if (client->con->error) { avl_delete(client_tree, (void *) client, _free_client); } From 4bc4992bd40bd75b4b7007393e22d2b74f5dc946 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 14:23:32 +0000 Subject: [PATCH 07/17] Feature: Filter events based on mount point and being global --- src/event_stream.c | 47 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/src/event_stream.c b/src/event_stream.c index 78b5c563..5068d27e 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -42,6 +42,7 @@ struct event_stream_event_tag { bool removed; // removed from the queue, clients referencing this are fallen too far behind const char * uuid; + const char * mount; const char * rendered; size_t rendered_length; @@ -51,9 +52,12 @@ struct event_stream_event_tag { }; typedef struct { + const char *mount; const char *current_buffer; - size_t todo; event_stream_event_t *current_event; + size_t todo; + bool events_global; + bool events_any_mount; } event_stream_clientstate_t; static void event_stream_event_free(igloo_ro_t self); @@ -81,6 +85,7 @@ static void event_stream_clientstate_free(client_t *client) return; igloo_ro_unref(&(state->current_event)); + igloo_sp_unref(&(state->mount), igloo_instance); free(state); } @@ -158,9 +163,30 @@ void event_stream_shutdown(void) thread_cond_destroy(&event_stream_cond); } -void event_stream_add_client_inner(client_t *client, void *ud) +static bool event_stream_match_event_with_client(client_t *client, event_stream_event_t *event) +{ + event_stream_clientstate_t *state = client->format_data; + + if (event->mount) { + if (!state->events_any_mount) { + if (!state->mount) + return false; + if (strcmp(state->mount, event->mount) != 0) + return false; + } + } else { + if (!state->events_global) + return false; + } + + return true; +} + +static void event_stream_add_client_inner(client_t *client, void *ud) { event_stream_clientstate_t *state = calloc(1, sizeof(event_stream_clientstate_t)); + const char *mount = httpp_get_param(client->parser, "mount"); + const char *request_global = httpp_get_param(client->parser, "request-global"); (void)ud; @@ -169,6 +195,14 @@ void event_stream_add_client_inner(client_t *client, void *ud) return; } + if (mount) + igloo_sp_replace(mount, &(state->mount), igloo_instance); + + state->events_any_mount = !mount; + + if (request_global) + igloo_cs_to_bool(request_global, &(state->events_global)); + thread_mutex_lock(&event_stream_event_mutex); igloo_ro_ref(event_queue, &(state->current_event), event_stream_event_t); thread_mutex_unlock(&event_stream_event_mutex); @@ -211,6 +245,7 @@ void event_stream_emit_event(event_t *event) return; igloo_ro_ref_replace(event, &(el->event), event_t); + igloo_sp_replace(event_extra_get(event, EVENT_EXTRA_KEY_URI), &(el->mount), igloo_instance); event_stream_queue(el); } @@ -224,8 +259,12 @@ static void event_stream_send_to_client(client_t *client) do { if (!state->current_buffer) { - state->current_buffer = state->current_event->rendered; - state->todo = state->current_event->rendered_length; + if (event_stream_match_event_with_client(client, state->current_event)) { + state->current_buffer = state->current_event->rendered; + state->todo = state->current_event->rendered_length; + } else { + state->todo = 0; + } } if (state->todo) { From d30139e71c39e9d390d2f1cebc37ca69b1c7101b Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 14:39:12 +0000 Subject: [PATCH 08/17] Feature: Find best spot to put event listener into the queue --- src/event_stream.c | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/event_stream.c b/src/event_stream.c index 5068d27e..7427c65e 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -187,6 +187,7 @@ static void event_stream_add_client_inner(client_t *client, void *ud) event_stream_clientstate_t *state = calloc(1, sizeof(event_stream_clientstate_t)); const char *mount = httpp_get_param(client->parser, "mount"); const char *request_global = httpp_get_param(client->parser, "request-global"); + const char *last_event_id = httpp_getvar(client->parser, "last-event-id"); (void)ud; @@ -204,7 +205,22 @@ static void event_stream_add_client_inner(client_t *client, void *ud) igloo_cs_to_bool(request_global, &(state->events_global)); thread_mutex_lock(&event_stream_event_mutex); - igloo_ro_ref(event_queue, &(state->current_event), event_stream_event_t); + { /* find the best possible event! */ + event_stream_event_t * next = event_queue; + event_stream_event_t * event = NULL; + + while (next) { + event = next; + next = event->next; + + if (last_event_id && strcmp(event->uuid, last_event_id) == 0 && next) { + event = next; + break; + } + } + + igloo_ro_ref(event, &(state->current_event), event_stream_event_t); + } thread_mutex_unlock(&event_stream_event_mutex); client->format_data = state; From 43e28370c8122dd69ef4ca20240339b4329e91df Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 18:17:47 +0000 Subject: [PATCH 09/17] Cleanup: Despammed error.log --- src/event_stream.c | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/event_stream.c b/src/event_stream.c index 7427c65e..276cfd84 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -271,7 +271,7 @@ static void event_stream_send_to_client(client_t *client) event_stream_clientstate_t *state = client->format_data; bool going = true; - ICECAST_LOG_INFO("Sending to client %p {.con.id = %llu}, with state %p", client, (long long unsigned int)client->con->id, state); + ICECAST_LOG_DINFO("Sending to client %p {.con.id = %llu}, with state %p", client, (long long unsigned int)client->con->id, state); do { if (!state->current_buffer) { @@ -371,14 +371,6 @@ static void *event_stream_thread_function(void *arg) thread_mutex_lock(&event_stream_event_mutex); running = alive; - { - event_stream_event_t *head = event_queue; - while (head) { - ICECAST_LOG_INFO("Event: % #H", head->rendered); - head = head->next; - } - } - thread_mutex_unlock(&event_stream_event_mutex); } while (running); From 0ff2a9a24ed2994427537de51fb06749c73b2644 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 18:23:22 +0000 Subject: [PATCH 10/17] Update: Improved logging in stats.c --- src/stats.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stats.c b/src/stats.c index 87deddef..ef95969c 100644 --- a/src/stats.c +++ b/src/stats.c @@ -528,7 +528,7 @@ static void process_source_event (stats_event_t *event) snode = (stats_source_t *)calloc(1,sizeof(stats_source_t)); if (snode == NULL) return; - ICECAST_LOG_DEBUG("new source stat %s", event->source); + ICECAST_LOG_DEBUG("new source stat %#H", event->source); snode->source = (char *)strdup(event->source); snode->stats_tree = avl_tree_new(_compare_stats, NULL); if (event->action == STATS_EVENT_HIDDEN) @@ -546,7 +546,7 @@ static void process_source_event (stats_event_t *event) return; /* adding node */ if (event->value) { - ICECAST_LOG_DEBUG("new node %s (%s)", event->name, event->value); + ICECAST_LOG_DEBUG("new node %H on %#H (% H)", event->name, event->source, event->value); node = (stats_node_t *)calloc(1,sizeof(stats_node_t)); node->name = (char *)strdup(event->name); node->value = (char *)strdup(event->value); From 12dbe9110aeffcc32464445a7b1504b9b164e8ca Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 18:35:24 +0000 Subject: [PATCH 11/17] Fix: Included missing header --- src/event_stream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/event_stream.c b/src/event_stream.c index 276cfd84..e177cc89 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -11,6 +11,7 @@ #endif #include +#include #include #include #include "icecasttypes.h" From 7965610b26ea132802393d374f7da834a99007fc Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 18:35:39 +0000 Subject: [PATCH 12/17] Feature: Include a "mount" key for events with a mount point set --- src/event_stream.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/event_stream.c b/src/event_stream.c index e177cc89..c03b07cb 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -445,6 +445,11 @@ static void event_stream_event_render(event_stream_event_t *event) } json_renderer_end(json); } + + if (event->mount) { + json_renderer_write_key(json, "mount", JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, event->mount, JSON_RENDERER_FLAGS_NONE); + } json_renderer_end(json); body = json_renderer_finish(&json); From dbffcb6dc02f90d1e3f7853832f749f89a9d2241 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 19:59:43 +0000 Subject: [PATCH 13/17] Feature: Added support to send vorbis comments --- src/event_stream.c | 139 +++++++++++++++++++++++++++++++++++++++++++++ src/event_stream.h | 3 + src/format_mp3.c | 2 + src/format_ogg.c | 2 + 4 files changed, 146 insertions(+) diff --git a/src/event_stream.c b/src/event_stream.c index c03b07cb..e508373d 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include "common/thread/thread.h" @@ -33,7 +34,9 @@ #include "event.h" #include "client.h" #include "connection.h" +#include "source.h" #include "errors.h" +#include "format_mp3.h" #include "logging.h" #define CATMODULE "event-stream" @@ -44,10 +47,12 @@ struct event_stream_event_tag { const char * uuid; const char * mount; + const char * source_instance_uuid; const char * rendered; size_t rendered_length; event_t *event; + vorbis_comment *vc; event_stream_event_t *next; }; @@ -95,6 +100,8 @@ static void event_stream_event_free(igloo_ro_t self) { event_stream_event_t *event = igloo_ro_to_type(self, event_stream_event_t); igloo_sp_unref(&(event->uuid), igloo_instance); + igloo_sp_unref(&(event->mount), igloo_instance); + igloo_sp_unref(&(event->source_instance_uuid), igloo_instance); igloo_sp_unref(&(event->rendered), igloo_instance); igloo_ro_unref(&(event->event)); igloo_ro_unref(&(event->next)); @@ -255,6 +262,12 @@ void event_stream_add_client(client_t *client) fserve_add_client_callback(client, event_stream_add_client_inner, NULL); } +static void event_stream_set_source(event_stream_event_t *event, source_t *source) +{ + igloo_sp_replace(source->mount, &(event->mount), igloo_instance); + igloo_sp_replace(source->instance_uuid, &(event->source_instance_uuid), igloo_instance); +} + void event_stream_emit_event(event_t *event) { event_stream_event_t *el = event_stream_event_new(); @@ -267,6 +280,22 @@ void event_stream_emit_event(event_t *event) event_stream_queue(el); } +void event_stream_emit_vc(source_t *source, vorbis_comment *vc) +{ + event_stream_event_t *el = event_stream_event_new(); + if (!el) + return; + + event_stream_set_source(el, source); + + /* we only have a temp reference to vc, so we set it, render, and then unset before we queue. */ + el->vc = vc; + event_stream_event_render(el); + el->vc = NULL; + + event_stream_queue(el); +} + static void event_stream_send_to_client(client_t *client) { event_stream_clientstate_t *state = client->format_data; @@ -393,6 +422,8 @@ static void event_stream_event_render(event_stream_event_t *event) string_renderer_t * renderer; json_renderer_t *json; char *body; + bool has_type = false; + bool has_crude = false; if (event->rendered) return; @@ -409,6 +440,9 @@ static void event_stream_event_render(event_stream_event_t *event) if (event->event) { event_t *uevent = event->event; + has_type = true; + has_crude = true; + json_renderer_write_key(json, "type", JSON_RENDERER_FLAGS_NONE); json_renderer_write_string(json, "event", JSON_RENDERER_FLAGS_NONE); @@ -450,6 +484,111 @@ static void event_stream_event_render(event_stream_event_t *event) json_renderer_write_key(json, "mount", JSON_RENDERER_FLAGS_NONE); json_renderer_write_string(json, event->mount, JSON_RENDERER_FLAGS_NONE); } + + if (event->vc) { + if (!has_type) { + json_renderer_write_key(json, "type", JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, "vc", JSON_RENDERER_FLAGS_NONE); + has_type = true; + } + + json_renderer_write_key(json, "vc", JSON_RENDERER_FLAGS_NONE); + json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT); + { + /* ok, this part is tricky, we first need to figure out all the keys. + * we however cheat here by assuming a few things. + * TODO: Fix this. + */ + struct { + char name[64]; + size_t count; + } keys[64]; + memset(keys, 0, sizeof(keys)); + + /* it is an int in libvorbis, not a size_t */ + for (int i = 0; i < event->vc->comments; i++) { + const char *comment = event->vc->user_comments[i]; + const char *keyend = strchr(comment, '='); + bool found = false; + size_t keylen; + + if (!keyend) + continue; + keylen = keyend - comment; + if (keylen >= sizeof(keys->name)) + continue; + + for (size_t j = 0; j < (sizeof(keys)/sizeof(*keys)); j++) { + char *name = keys[j].name; + + if (*name) { + if (strncasecmp(comment, name, keylen) == 0) { + found = true; + keys[j].count++; + } + } + } + + if (found) + break; + + for (size_t j = 0; j < (sizeof(keys)/sizeof(*keys)); j++) { + char *name = keys[j].name; + + if (!*name) { + memcpy(name, comment, keylen); + name[keylen] = 0; + keys[j].count = 1; + igloo_cs_to_upper(name); + break; + } + } + } + + /* Now we have a list of all keys... */ + for (size_t j = 0; j < (sizeof(keys)/sizeof(*keys)); j++) { + const char *name = keys[j].name; + if (!*name) + continue; + + json_renderer_write_key(json, name, JSON_RENDERER_FLAGS_NONE); + json_renderer_begin(json, JSON_ELEMENT_TYPE_ARRAY); + for (size_t i = 0; i < keys[j].count; i++) { + const char *value = vorbis_comment_query(event->vc, name, i); + json_renderer_write_string(json, value, JSON_RENDERER_FLAGS_NONE); + } + json_renderer_end(json); + } + } + json_renderer_end(json); + + if (!has_crude) { + has_crude = true; + json_renderer_write_key(json, "crude", JSON_RENDERER_FLAGS_NONE); + json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT); + { + static const char * display_title_keys[] = {"TITLE", MP3_METADATA_TITLE}; + const char *display_title = NULL; + for (size_t i = 0; i < (sizeof(display_title_keys)/sizeof(*display_title_keys)); i++) { + display_title = vorbis_comment_query(event->vc, display_title_keys[i], 0); + if (display_title) + break; + } + json_renderer_write_key(json, "display-title", JSON_RENDERER_FLAGS_NONE); + if (display_title) { + json_renderer_write_string(json, display_title, JSON_RENDERER_FLAGS_NONE); + } else { + json_renderer_write_null(json); + } + if (event->source_instance_uuid) { + json_renderer_write_key(json, "source-instance", JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, event->source_instance_uuid, JSON_RENDERER_FLAGS_NONE); + } + } + json_renderer_end(json); + } + } + json_renderer_end(json); body = json_renderer_finish(&json); diff --git a/src/event_stream.h b/src/event_stream.h index f2ac6ffe..526892fb 100644 --- a/src/event_stream.h +++ b/src/event_stream.h @@ -9,6 +9,8 @@ #ifndef __EVENT_STREAM_H__ #define __EVENT_STREAM_H__ +#include + #include "icecasttypes.h" igloo_RO_FORWARD_TYPE(event_stream_event_t); @@ -18,5 +20,6 @@ void event_stream_shutdown(void); void event_stream_add_client(client_t *client); void event_stream_emit_event(event_t *event); +void event_stream_emit_vc(source_t *source, vorbis_comment *vc); #endif diff --git a/src/format_mp3.c b/src/format_mp3.c index 8af8178b..b7c62f83 100644 --- a/src/format_mp3.c +++ b/src/format_mp3.c @@ -39,6 +39,7 @@ #include "source.h" #include "client.h" #include "connection.h" +#include "event_stream.h" #include "stats.h" #include "format.h" @@ -206,6 +207,7 @@ static void filter_shoutcast_metadata (source_t *source, char *metadata, unsigne yp_touch (source->mount); free (p); playlist_push_track(source->history, &source->format->vc); + event_stream_emit_vc(source, &source->format->vc); } } while (0); } diff --git a/src/format_ogg.c b/src/format_ogg.c index 3ffaecdd..ce595d62 100644 --- a/src/format_ogg.c +++ b/src/format_ogg.c @@ -37,6 +37,7 @@ #include "stats.h" #include "playlist.h" +#include "event_stream.h" #include "event.h" #include "format.h" #include "format_ogg.h" @@ -332,6 +333,7 @@ static void update_comments(source_t *source) stats_event (source->mount, "display-title", title); playlist_push_track(source->history, &source->format->vc); + event_stream_emit_vc(source, &source->format->vc); codec = ogg_info->codecs; while (codec) From ce91c5b8914840e63ca5cf8db4d509bd7c2ee928 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Fri, 9 Feb 2024 10:06:46 +0000 Subject: [PATCH 14/17] Fix: Attach the client at the right spot --- src/event_stream.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/event_stream.c b/src/event_stream.c index e508373d..60e75167 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -221,13 +221,16 @@ static void event_stream_add_client_inner(client_t *client, void *ud) event = next; next = event->next; - if (last_event_id && strcmp(event->uuid, last_event_id) == 0 && next) { - event = next; + if (last_event_id && strcmp(event->uuid, last_event_id) == 0) { break; } } igloo_ro_ref(event, &(state->current_event), event_stream_event_t); + + /* emulate the the state of us just being done */ + state->current_buffer = ""; + state->todo = 0; } thread_mutex_unlock(&event_stream_event_mutex); From 264507627547f69a1275c2e1ed25ebb2c0c0e36e Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Fri, 9 Feb 2024 10:09:45 +0000 Subject: [PATCH 15/17] Update: Updated src/common/ --- src/common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common b/src/common index 85d54c53..07df2860 160000 --- a/src/common +++ b/src/common @@ -1 +1 @@ -Subproject commit 85d54c53de2e625192ded6731e371b586c4fd677 +Subproject commit 07df2860f838e8e523a1bafe69680ebc6108b3cc From d76359c66da1e7f17c229f3862a24daee04ad609 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 18 May 2024 09:12:47 +0000 Subject: [PATCH 16/17] Update: Disallow access to the event stream by default --- src/acl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/acl.c b/src/acl.c index fc8c60dd..908ffe64 100644 --- a/src/acl.c +++ b/src/acl.c @@ -111,7 +111,7 @@ acl_t *acl_new(void) acl_set_method_str(ret, ACL_POLICY_ALLOW, "get,options"); acl_set_admin_str(ret, ACL_POLICY_DENY, "*"); - acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,eventstream,publicstats,publicstats.json"); + acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,publicstats,publicstats.json"); acl_set_web_policy(ret, ACL_POLICY_ALLOW); From 3feceeb74839b0b6f4dee1736223324972e67b5b Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Sat, 18 May 2024 09:13:12 +0000 Subject: [PATCH 17/17] Update: Renamed /admin/eventstream to /admin/eventfeed --- src/admin.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/admin.c b/src/admin.c index 958d55b4..4d37cc22 100644 --- a/src/admin.c +++ b/src/admin.c @@ -150,7 +150,7 @@ #define DEFAULT_RAW_REQUEST "" #define DEFAULT_HTML_REQUEST "" #define BUILDM3U_RAW_REQUEST "buildm3u" -#define EVENTSTREAM_RAW_REQUEST "eventstream" +#define EVENTSTREAM_RAW_REQUEST "eventfeed" typedef struct { size_t listeners;