From 11ae1e87df39ad9ce4e8c87be632ce3b8b4cce02 Mon Sep 17 00:00:00 2001
From: Philipp Schafft <phschafft@de.loewenfelsen.net>
Date: Wed, 24 Jan 2024 16:06:32 +0000
Subject: [PATCH] Feature: Added most basic event stream implementation

---
 src/acl.c          |   2 +-
 src/admin.c        |  10 ++
 src/event.c        |  17 +--
 src/event_stream.c | 355 ++++++++++++++++++++++++++++++++++++++++++++-
 src/event_stream.h |   8 +
 src/icecasttypes.h |   5 +
 src/main.c         |   3 +
 7 files changed, 383 insertions(+), 17 deletions(-)

diff --git a/src/acl.c b/src/acl.c
index 908ffe64..fc8c60dd 100644
--- a/src/acl.c
+++ b/src/acl.c
@@ -111,7 +111,7 @@ acl_t *acl_new(void)
     acl_set_method_str(ret, ACL_POLICY_ALLOW, "get,options");
 
     acl_set_admin_str(ret, ACL_POLICY_DENY, "*");
-    acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,publicstats,publicstats.json");
+    acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,eventstream,publicstats,publicstats.json");
 
     acl_set_web_policy(ret, ACL_POLICY_ALLOW);
 
diff --git a/src/admin.c b/src/admin.c
index 825d3bd5..958d55b4 100644
--- a/src/admin.c
+++ b/src/admin.c
@@ -53,6 +53,7 @@
 #include "listensocket.h"
 #include "refbuf.h"
 #include "client.h"
+#include "event_stream.h"
 #include "source.h"
 #include "global.h"
 #include "stats.h"
@@ -149,6 +150,7 @@
 #define DEFAULT_RAW_REQUEST                 ""
 #define DEFAULT_HTML_REQUEST                ""
 #define BUILDM3U_RAW_REQUEST                "buildm3u"
+#define EVENTSTREAM_RAW_REQUEST             "eventstream"
 
 typedef struct {
     size_t listeners;
@@ -179,6 +181,7 @@ static void command_dumpfile_control    (client_t *client, source_t *source, adm
 static void command_manageauth          (client_t *client, source_t *source, admin_format_t response);
 static void command_updatemetadata      (client_t *client, source_t *source, admin_format_t response);
 static void command_buildm3u            (client_t *client, source_t *source, admin_format_t response);
+static void command_eventstream         (client_t *client, source_t *source, admin_format_t response);
 static void command_show_log            (client_t *client, source_t *source, admin_format_t response);
 static void command_mark_log            (client_t *client, source_t *source, admin_format_t response);
 static void command_dashboard           (client_t *client, source_t *source, admin_format_t response);
@@ -232,6 +235,7 @@ static const admin_command_handler_t handlers[] = {
     { UPDATEMETADATA_HTML_REQUEST,          ADMINTYPE_MOUNT,        ADMIN_FORMAT_HTML,          ADMINSAFE_SAFE,     command_updatemetadata, NULL},
     { UPDATEMETADATA_JSON_REQUEST,          ADMINTYPE_MOUNT,        ADMIN_FORMAT_JSON,          ADMINSAFE_SAFE,     command_updatemetadata, NULL},
     { BUILDM3U_RAW_REQUEST,                 ADMINTYPE_MOUNT,        ADMIN_FORMAT_RAW,           ADMINSAFE_SAFE,     command_buildm3u, NULL},
+    { EVENTSTREAM_RAW_REQUEST,              ADMINTYPE_HYBRID,       ADMIN_FORMAT_RAW,           ADMINSAFE_SAFE,     command_eventstream, NULL},
     { SHOWLOG_RAW_REQUEST,                  ADMINTYPE_GENERAL,      ADMIN_FORMAT_RAW,           ADMINSAFE_SAFE,     command_show_log, NULL},
     { SHOWLOG_HTML_REQUEST,                 ADMINTYPE_GENERAL,      ADMIN_FORMAT_HTML,          ADMINSAFE_SAFE,     command_show_log, NULL},
     { SHOWLOG_JSON_REQUEST,                 ADMINTYPE_GENERAL,      ADMIN_FORMAT_JSON,          ADMINSAFE_SAFE,     command_show_log, NULL},
@@ -1097,6 +1101,12 @@ static void command_buildm3u(client_t *client, source_t *source, admin_format_t
     client_send_buffer(client, 200, "audio/x-mpegurl", NULL, buffer, -1, "Content-Disposition: attachment; filename=listen.m3u\r\n");
 }
 
+static void command_eventstream         (client_t *client, source_t *source, admin_format_t response)
+{
+    (void)source, (void)response;
+    event_stream_add_client(client);
+}
+
 xmlNodePtr admin_add_role_to_authentication(auth_t *auth, xmlNodePtr parent)
 {
     xmlNodePtr rolenode = xmlNewChild(parent, NULL, XMLSTR("role"), NULL);
diff --git a/src/event.c b/src/event.c
index daa737bc..a675f71b 100644
--- a/src/event.c
+++ b/src/event.c
@@ -21,6 +21,7 @@
 #include <igloo/sp.h>
 
 #include "event.h"
+#include "event_stream.h"
 #include "fastevent.h"
 #include "logging.h"
 #include "string_renderer.h"
@@ -426,6 +427,7 @@ void event_registration_push(event_registration_t **er, event_registration_t *ta
 /* event signaling */
 void event_emit(event_t *event) {
     fastevent_emit(FASTEVENT_TYPE_SLOWEVENT, FASTEVENT_FLAG_NONE, FASTEVENT_DATATYPE_EVENT, event);
+    event_stream_emit_event(event);
     thread_mutex_lock(&event_lock);
     event_push(&event_queue, event);
     thread_mutex_unlock(&event_lock);
@@ -476,21 +478,6 @@ void event_emit_va(const char *trigger, ...) {
         event_push_reglist(event, mount->event);
     config_release_config();
 
-    /* This isn't perfectly clean but is an important speedup:
-     * If first element of reglist is NULL none of the above pushed in
-     * some registrations. If there are no registrations we can just drop
-     * this event now and here.
-     * We do this before inserting all the data into the object to avoid
-     * all the strdups() and stuff in case they aren't needed.
-     */
-#ifndef FASTEVENT_ENABLED
-    if (event->reglist[0] == NULL) {
-        /* we have no registrations, drop this event. */
-        igloo_ro_unref(&event);
-        return;
-    }
-#endif
-
     if (uri)
         extra_add(event, EVENT_EXTRA_KEY_URI, uri);
 
diff --git a/src/event_stream.c b/src/event_stream.c
index 50ff4a23..ed746cdc 100644
--- a/src/event_stream.c
+++ b/src/event_stream.c
@@ -10,14 +10,367 @@
 #include <config.h>
 #endif
 
-#include <igloo/ro.h>
+#include <stdbool.h>
+#include <string.h>
+#include <igloo/typedef.h>
+#include "icecasttypes.h"
+
 #include <igloo/error.h>
+#include <igloo/ro.h>
+#include <igloo/sp.h>
+#include <igloo/uuid.h>
+
+#include "common/thread/thread.h"
+#include "common/avl/avl.h"
 
 #include "event_stream.h"
 #include "string_renderer.h"
 #include "json.h"
+#include "util.h"
+#include "fserve.h"
 #include "global.h"
+#include "event.h"
+#include "client.h"
+#include "connection.h"
 #include "errors.h"
 #include "logging.h"
 #define CATMODULE "event-stream"
 
+struct event_stream_event_tag {
+    igloo_ro_full_t __parent;
+
+    const char * uuid;
+    const char * rendered;
+    size_t rendered_length;
+
+    event_t *event;
+
+    event_stream_event_t *next;
+};
+
+typedef struct {
+    const char *current_buffer;
+    size_t todo;
+    event_stream_event_t *current_event;
+} event_stream_clientstate_t;
+
+static void event_stream_event_free(igloo_ro_t self);
+static void *event_stream_thread_function(void *arg);
+static void event_stream_event_render(event_stream_event_t *event);
+
+igloo_RO_PUBLIC_TYPE(event_stream_event_t, igloo_ro_full_t,
+        igloo_RO_TYPEDECL_FREE(event_stream_event_free),
+        );
+
+static mutex_t                  event_stream_event_mutex; // protects: event_queue, event_queue_next, event_stream_thread, alive
+static event_stream_event_t    *event_queue;
+static event_stream_event_t   **event_queue_next = &event_queue;
+static thread_type             *event_stream_thread;
+static cond_t                   event_stream_cond;
+static avl_tree                *client_tree;
+static bool                     alive;
+
+static void event_stream_clientstate_free(client_t *client)
+{
+    event_stream_clientstate_t *state = client->format_data;
+    client->format_data = NULL;
+
+    if (!state)
+        return;
+
+    igloo_ro_unref(&(state->current_event));
+
+    free(state);
+}
+
+static void event_stream_event_free(igloo_ro_t self)
+{
+    event_stream_event_t *event = igloo_ro_to_type(self, event_stream_event_t);
+    igloo_sp_unref(&(event->uuid), igloo_instance);
+    igloo_sp_unref(&(event->rendered), igloo_instance);
+    igloo_ro_unref(&(event->event));
+    igloo_ro_unref(&(event->next));
+}
+
+static event_stream_event_t * event_stream_event_new(void)
+{
+    event_stream_event_t *event;
+
+    if (igloo_ro_new_raw(&event, event_stream_event_t, igloo_instance) != igloo_ERROR_NONE)
+        return NULL;
+
+    if (igloo_uuid_new_random_sp(&(event->uuid), igloo_instance) != igloo_ERROR_NONE) {
+        igloo_ro_unref(&event);
+        return NULL;
+    }
+
+    return event;
+}
+
+static void event_stream_queue(event_stream_event_t *event)
+{
+    event_stream_event_render(event);
+
+    thread_mutex_lock(&event_stream_event_mutex);
+    *event_queue_next = event;
+    event_queue_next = &(event->next);
+    thread_mutex_unlock(&event_stream_event_mutex);
+
+    thread_cond_broadcast(&event_stream_cond);
+    ICECAST_LOG_INFO("event queued");
+}
+
+static int _free_client(void *key)
+{
+    client_t *client = (client_t *)key;
+    client_destroy(client);
+    return 1;
+}
+
+void event_stream_initialise(void)
+{
+    thread_mutex_create(&event_stream_event_mutex);
+    thread_cond_create(&event_stream_cond);
+    client_tree = avl_tree_new(client_compare, NULL);
+    alive = true;
+}
+
+void event_stream_shutdown(void)
+{
+    thread_mutex_lock(&event_stream_event_mutex);
+    alive = false;
+    thread_mutex_unlock(&event_stream_event_mutex);
+
+    thread_cond_broadcast(&event_stream_cond);
+
+    if (event_stream_thread)
+        thread_join(event_stream_thread);
+
+    avl_tree_free(client_tree, _free_client);
+
+    thread_mutex_lock(&event_stream_event_mutex);
+    igloo_ro_unref(&event_queue);
+    thread_mutex_unlock(&event_stream_event_mutex);
+
+    thread_mutex_destroy(&event_stream_event_mutex);
+    thread_cond_destroy(&event_stream_cond);
+}
+
+void event_stream_add_client_inner(client_t *client, void *ud)
+{
+    event_stream_clientstate_t *state = calloc(1, sizeof(event_stream_clientstate_t));
+
+    (void)ud;
+
+    if (!state) {
+        client_destroy(client);
+        return;
+    }
+
+    thread_mutex_lock(&event_stream_event_mutex);
+    igloo_ro_ref(event_queue, &(state->current_event), event_stream_event_t);
+    thread_mutex_unlock(&event_stream_event_mutex);
+
+    client->format_data = state;
+    client->free_client_data = event_stream_clientstate_free;
+
+    avl_tree_wlock(client_tree);
+    avl_insert(client_tree, client);
+    avl_tree_unlock(client_tree);
+
+    thread_mutex_lock(&event_stream_event_mutex);
+    if (!event_stream_thread) {
+        event_stream_thread = thread_create("Event Stream Thread", event_stream_thread_function, (void *)NULL, THREAD_ATTACHED);
+    }
+    thread_mutex_unlock(&event_stream_event_mutex);
+}
+
+void event_stream_add_client(client_t *client)
+{
+    ssize_t len = util_http_build_header(client->refbuf->data, PER_CLIENT_REFBUF_SIZE, 0,
+            0, 200, NULL,
+            "text/event-stream", NULL,
+            "", NULL, client
+            );
+    if (len < 1 || len > PER_CLIENT_REFBUF_SIZE)
+        return;
+
+    client->refbuf->len = len;
+
+    fserve_add_client_callback(client, event_stream_add_client_inner, NULL);
+}
+
+void event_stream_emit_event(event_t *event)
+{
+    event_stream_event_t *el = event_stream_event_new();
+    if (!el)
+        return;
+
+    igloo_ro_ref_replace(event, &(el->event), event_t);
+
+    event_stream_queue(el);
+}
+
+static void event_stream_send_to_client(client_t *client)
+{
+    event_stream_clientstate_t *state = client->format_data;
+    bool going = true;
+
+    ICECAST_LOG_INFO("Sending to client %p {.con.id = %llu}, with state %p", client, (long long unsigned int)client->con->id, state);
+
+    do {
+        if (!state->current_buffer) {
+            state->current_buffer = state->current_event->rendered;
+            state->todo           = state->current_event->rendered_length;
+        }
+
+        if (state->todo) {
+            ssize_t done = client_send_bytes(client, state->current_buffer, state->todo);
+            if (done < 0) {
+                return;
+            } else {
+                state->todo -= done;
+                state->current_buffer += done;
+            }
+        }
+
+        if (!state->todo) {
+            if (state->current_event->next) {
+                igloo_ro_ref_replace(state->current_event->next, &(state->current_event), event_stream_event_t);
+                state->current_buffer = NULL;
+            } else {
+                going = false;
+            }
+        }
+    } while (going);
+}
+
+static void *event_stream_thread_function(void *arg)
+{
+    bool running = true;
+
+    ICECAST_LOG_INFO("Good morning!");
+
+    do {
+        thread_cond_wait(&event_stream_cond);
+        ICECAST_LOG_INFO("Tick!");
+
+        {
+            avl_tree_wlock(client_tree);
+            avl_node *next;
+
+            next = avl_get_first(client_tree);
+            while (next) {
+                avl_node *node = next;
+                client_t *client = node->key;
+
+                next = avl_get_next(next);
+
+                event_stream_send_to_client(client);
+                if (client->con->error) {
+                    avl_delete(client_tree, (void *) client, _free_client);
+                }
+            }
+            avl_tree_unlock(client_tree);
+        }
+
+        thread_mutex_lock(&event_stream_event_mutex);
+        running = alive;
+
+        {
+            event_stream_event_t *head = event_queue;
+            while (head) {
+                ICECAST_LOG_INFO("Event: % #H", head->rendered);
+                head = head->next;
+            }
+        }
+
+        thread_mutex_unlock(&event_stream_event_mutex);
+    } while (running);
+
+    ICECAST_LOG_INFO("Good evening!");
+    return NULL;
+}
+
+static void add_number(json_renderer_t *json, const char *key, intmax_t val)
+{
+    if (val < 1)
+        return;
+
+    json_renderer_write_key(json, key, JSON_RENDERER_FLAGS_NONE);
+    json_renderer_write_uint(json, val);
+}
+
+static void event_stream_event_render(event_stream_event_t *event)
+{
+    string_renderer_t * renderer;
+    json_renderer_t *json;
+    char *body;
+
+    if (event->rendered)
+        return;
+
+    if (igloo_ro_new(&renderer, string_renderer_t, igloo_instance) != igloo_ERROR_NONE)
+        return;
+    json = json_renderer_create(JSON_RENDERER_FLAGS_NONE);
+    if (!json) {
+        igloo_ro_unref(&renderer);
+        return;
+    }
+
+    json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT);
+    if (event->event) {
+        event_t *uevent = event->event;
+
+        json_renderer_write_key(json, "type", JSON_RENDERER_FLAGS_NONE);
+        json_renderer_write_string(json, "event", JSON_RENDERER_FLAGS_NONE);
+
+        json_renderer_write_key(json, "crude", JSON_RENDERER_FLAGS_NONE);
+        json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT);
+        {
+            static const event_extra_key_t key_list[] = {
+                EVENT_EXTRA_KEY_URI,
+                EVENT_EXTRA_KEY_SOURCE_MEDIA_TYPE,
+                EVENT_EXTRA_KEY_SOURCE_INSTANCE_UUID,
+                EVENT_EXTRA_KEY_CONNECTION_IP,
+                EVENT_EXTRA_KEY_CLIENT_ROLE,
+                EVENT_EXTRA_KEY_CLIENT_USERNAME,
+                EVENT_EXTRA_KEY_CLIENT_USERAGENT,
+                EVENT_EXTRA_KEY_DUMPFILE_FILENAME,
+                EVENT_EXTRA_LIST_END
+            };
+            json_renderer_write_key(json, "trigger", JSON_RENDERER_FLAGS_NONE);
+            json_renderer_write_string(json, uevent->trigger, JSON_RENDERER_FLAGS_NONE);
+
+            for (size_t i = 0; key_list[i] != EVENT_EXTRA_LIST_END; i++) {
+                const char *value = event_extra_get(uevent, key_list[i]);
+
+                if (!value)
+                    continue;
+
+                json_renderer_write_key(json, event_extra_key_name(key_list[i]), JSON_RENDERER_FLAGS_NONE);
+                json_renderer_write_string(json, value, JSON_RENDERER_FLAGS_NONE);
+            }
+
+            add_number(json, "connection-id", uevent->connection_id);
+            add_number(json, "connection-time", uevent->connection_time);
+            add_number(json, "client-admin-command", uevent->client_admin_command);
+        }
+        json_renderer_end(json);
+    }
+    json_renderer_end(json);
+
+    body = json_renderer_finish(&json);
+
+    string_renderer_start_list(renderer, "\r\n", ": ", false, false, STRING_RENDERER_ENCODING_PLAIN);
+    string_renderer_add_kv(renderer, "id", event->uuid);
+    string_renderer_add_kv(renderer, "data", body);
+    string_renderer_end_list(renderer);
+    string_renderer_add_string(renderer, "\r\n\r\n");
+
+    free(body);
+
+    igloo_sp_replace(string_renderer_to_string_zero_copy(renderer), &(event->rendered), igloo_instance);
+    event->rendered_length = strlen(event->rendered);
+    igloo_ro_unref(&renderer);
+}
diff --git a/src/event_stream.h b/src/event_stream.h
index 8928dd69..f2ac6ffe 100644
--- a/src/event_stream.h
+++ b/src/event_stream.h
@@ -11,4 +11,12 @@
 
 #include "icecasttypes.h"
 
+igloo_RO_FORWARD_TYPE(event_stream_event_t);
+
+void event_stream_initialise(void);
+void event_stream_shutdown(void);
+
+void event_stream_add_client(client_t *client);
+void event_stream_emit_event(event_t *event);
+
 #endif
diff --git a/src/icecasttypes.h b/src/icecasttypes.h
index dc30b8d8..6a60b28c 100644
--- a/src/icecasttypes.h
+++ b/src/icecasttypes.h
@@ -143,6 +143,10 @@ typedef struct geoip_db_tag geoip_db_t;
 typedef struct event_tag event_t;
 typedef struct event_registration_tag event_registration_t;
 
+/* ---[ event_stream.[ch] ]--- */
+
+typedef struct event_stream_event_tag event_stream_event_t;
+
 /* ---[ refobject.[ch] ]--- */
 
 typedef struct refobject_base_tag refobject_base_t;
@@ -167,6 +171,7 @@ typedef void * refobject_t;
     igloo_RO_TYPE(geoip_db_t) \
     igloo_RO_TYPE(event_t) \
     igloo_RO_TYPE(event_registration_t) \
+    igloo_RO_TYPE(event_stream_event_t) \
     igloo_RO_TYPE(module_t) \
     igloo_RO_TYPE(module_container_t)
 
diff --git a/src/main.c b/src/main.c
index e021b3fd..842cf7c2 100644
--- a/src/main.c
+++ b/src/main.c
@@ -88,6 +88,7 @@
 #include "yp.h"
 #include "auth.h"
 #include "event.h"
+#include "event_stream.h"
 #include "ping.h"
 #include "listensocket.h"
 #include "fastevent.h"
@@ -186,6 +187,7 @@ static void initialize_subsystems(void)
 static void shutdown_subsystems(void)
 {
     ICECAST_LOG_DEBUG("Shuting down subsystems...");
+    event_stream_shutdown();
     event_shutdown();
     fserve_shutdown();
     refbuf_shutdown();
@@ -785,6 +787,7 @@ int main(int argc, char **argv)
     slave_initialize();
     auth_initialise ();
     event_initialise();
+    event_stream_initialise();
 
     event_emit_global("icecast-start");
     _server_proc();