From 4bc4992bd40bd75b4b7007393e22d2b74f5dc946 Mon Sep 17 00:00:00 2001 From: Philipp Schafft Date: Thu, 25 Jan 2024 14:23:32 +0000 Subject: [PATCH] Feature: Filter events based on mount point and being global --- src/event_stream.c | 47 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/src/event_stream.c b/src/event_stream.c index 78b5c563..5068d27e 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -42,6 +42,7 @@ struct event_stream_event_tag { bool removed; // removed from the queue, clients referencing this are fallen too far behind const char * uuid; + const char * mount; const char * rendered; size_t rendered_length; @@ -51,9 +52,12 @@ struct event_stream_event_tag { }; typedef struct { + const char *mount; const char *current_buffer; - size_t todo; event_stream_event_t *current_event; + size_t todo; + bool events_global; + bool events_any_mount; } event_stream_clientstate_t; static void event_stream_event_free(igloo_ro_t self); @@ -81,6 +85,7 @@ static void event_stream_clientstate_free(client_t *client) return; igloo_ro_unref(&(state->current_event)); + igloo_sp_unref(&(state->mount), igloo_instance); free(state); } @@ -158,9 +163,30 @@ void event_stream_shutdown(void) thread_cond_destroy(&event_stream_cond); } -void event_stream_add_client_inner(client_t *client, void *ud) +static bool event_stream_match_event_with_client(client_t *client, event_stream_event_t *event) +{ + event_stream_clientstate_t *state = client->format_data; + + if (event->mount) { + if (!state->events_any_mount) { + if (!state->mount) + return false; + if (strcmp(state->mount, event->mount) != 0) + return false; + } + } else { + if (!state->events_global) + return false; + } + + return true; +} + +static void event_stream_add_client_inner(client_t *client, void *ud) { event_stream_clientstate_t *state = calloc(1, sizeof(event_stream_clientstate_t)); + const char *mount = httpp_get_param(client->parser, "mount"); + const char *request_global = httpp_get_param(client->parser, "request-global"); (void)ud; @@ -169,6 +195,14 @@ void event_stream_add_client_inner(client_t *client, void *ud) return; } + if (mount) + igloo_sp_replace(mount, &(state->mount), igloo_instance); + + state->events_any_mount = !mount; + + if (request_global) + igloo_cs_to_bool(request_global, &(state->events_global)); + thread_mutex_lock(&event_stream_event_mutex); igloo_ro_ref(event_queue, &(state->current_event), event_stream_event_t); thread_mutex_unlock(&event_stream_event_mutex); @@ -211,6 +245,7 @@ void event_stream_emit_event(event_t *event) return; igloo_ro_ref_replace(event, &(el->event), event_t); + igloo_sp_replace(event_extra_get(event, EVENT_EXTRA_KEY_URI), &(el->mount), igloo_instance); event_stream_queue(el); } @@ -224,8 +259,12 @@ static void event_stream_send_to_client(client_t *client) do { if (!state->current_buffer) { - state->current_buffer = state->current_event->rendered; - state->todo = state->current_event->rendered_length; + if (event_stream_match_event_with_client(client, state->current_event)) { + state->current_buffer = state->current_event->rendered; + state->todo = state->current_event->rendered_length; + } else { + state->todo = 0; + } } if (state->todo) {