diff --git a/src/acl.c b/src/acl.c index 908ffe64..fc8c60dd 100644 --- a/src/acl.c +++ b/src/acl.c @@ -111,7 +111,7 @@ acl_t *acl_new(void) acl_set_method_str(ret, ACL_POLICY_ALLOW, "get,options"); acl_set_admin_str(ret, ACL_POLICY_DENY, "*"); - acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,publicstats,publicstats.json"); + acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,eventstream,publicstats,publicstats.json"); acl_set_web_policy(ret, ACL_POLICY_ALLOW); diff --git a/src/admin.c b/src/admin.c index 825d3bd5..958d55b4 100644 --- a/src/admin.c +++ b/src/admin.c @@ -53,6 +53,7 @@ #include "listensocket.h" #include "refbuf.h" #include "client.h" +#include "event_stream.h" #include "source.h" #include "global.h" #include "stats.h" @@ -149,6 +150,7 @@ #define DEFAULT_RAW_REQUEST "" #define DEFAULT_HTML_REQUEST "" #define BUILDM3U_RAW_REQUEST "buildm3u" +#define EVENTSTREAM_RAW_REQUEST "eventstream" typedef struct { size_t listeners; @@ -179,6 +181,7 @@ static void command_dumpfile_control (client_t *client, source_t *source, adm static void command_manageauth (client_t *client, source_t *source, admin_format_t response); static void command_updatemetadata (client_t *client, source_t *source, admin_format_t response); static void command_buildm3u (client_t *client, source_t *source, admin_format_t response); +static void command_eventstream (client_t *client, source_t *source, admin_format_t response); static void command_show_log (client_t *client, source_t *source, admin_format_t response); static void command_mark_log (client_t *client, source_t *source, admin_format_t response); static void command_dashboard (client_t *client, source_t *source, admin_format_t response); @@ -232,6 +235,7 @@ static const admin_command_handler_t handlers[] = { { UPDATEMETADATA_HTML_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_HTML, ADMINSAFE_SAFE, command_updatemetadata, NULL}, { UPDATEMETADATA_JSON_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_JSON, ADMINSAFE_SAFE, command_updatemetadata, NULL}, { BUILDM3U_RAW_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_buildm3u, NULL}, + { EVENTSTREAM_RAW_REQUEST, ADMINTYPE_HYBRID, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_eventstream, NULL}, { SHOWLOG_RAW_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_show_log, NULL}, { SHOWLOG_HTML_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_HTML, ADMINSAFE_SAFE, command_show_log, NULL}, { SHOWLOG_JSON_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_JSON, ADMINSAFE_SAFE, command_show_log, NULL}, @@ -1097,6 +1101,12 @@ static void command_buildm3u(client_t *client, source_t *source, admin_format_t client_send_buffer(client, 200, "audio/x-mpegurl", NULL, buffer, -1, "Content-Disposition: attachment; filename=listen.m3u\r\n"); } +static void command_eventstream (client_t *client, source_t *source, admin_format_t response) +{ + (void)source, (void)response; + event_stream_add_client(client); +} + xmlNodePtr admin_add_role_to_authentication(auth_t *auth, xmlNodePtr parent) { xmlNodePtr rolenode = xmlNewChild(parent, NULL, XMLSTR("role"), NULL); diff --git a/src/event.c b/src/event.c index daa737bc..a675f71b 100644 --- a/src/event.c +++ b/src/event.c @@ -21,6 +21,7 @@ #include #include "event.h" +#include "event_stream.h" #include "fastevent.h" #include "logging.h" #include "string_renderer.h" @@ -426,6 +427,7 @@ void event_registration_push(event_registration_t **er, event_registration_t *ta /* event signaling */ void event_emit(event_t *event) { fastevent_emit(FASTEVENT_TYPE_SLOWEVENT, FASTEVENT_FLAG_NONE, FASTEVENT_DATATYPE_EVENT, event); + event_stream_emit_event(event); thread_mutex_lock(&event_lock); event_push(&event_queue, event); thread_mutex_unlock(&event_lock); @@ -476,21 +478,6 @@ void event_emit_va(const char *trigger, ...) { event_push_reglist(event, mount->event); config_release_config(); - /* This isn't perfectly clean but is an important speedup: - * If first element of reglist is NULL none of the above pushed in - * some registrations. If there are no registrations we can just drop - * this event now and here. - * We do this before inserting all the data into the object to avoid - * all the strdups() and stuff in case they aren't needed. - */ -#ifndef FASTEVENT_ENABLED - if (event->reglist[0] == NULL) { - /* we have no registrations, drop this event. */ - igloo_ro_unref(&event); - return; - } -#endif - if (uri) extra_add(event, EVENT_EXTRA_KEY_URI, uri); diff --git a/src/event_stream.c b/src/event_stream.c index 50ff4a23..ed746cdc 100644 --- a/src/event_stream.c +++ b/src/event_stream.c @@ -10,14 +10,367 @@ #include #endif -#include +#include +#include +#include +#include "icecasttypes.h" + #include +#include +#include +#include + +#include "common/thread/thread.h" +#include "common/avl/avl.h" #include "event_stream.h" #include "string_renderer.h" #include "json.h" +#include "util.h" +#include "fserve.h" #include "global.h" +#include "event.h" +#include "client.h" +#include "connection.h" #include "errors.h" #include "logging.h" #define CATMODULE "event-stream" +struct event_stream_event_tag { + igloo_ro_full_t __parent; + + const char * uuid; + const char * rendered; + size_t rendered_length; + + event_t *event; + + event_stream_event_t *next; +}; + +typedef struct { + const char *current_buffer; + size_t todo; + event_stream_event_t *current_event; +} event_stream_clientstate_t; + +static void event_stream_event_free(igloo_ro_t self); +static void *event_stream_thread_function(void *arg); +static void event_stream_event_render(event_stream_event_t *event); + +igloo_RO_PUBLIC_TYPE(event_stream_event_t, igloo_ro_full_t, + igloo_RO_TYPEDECL_FREE(event_stream_event_free), + ); + +static mutex_t event_stream_event_mutex; // protects: event_queue, event_queue_next, event_stream_thread, alive +static event_stream_event_t *event_queue; +static event_stream_event_t **event_queue_next = &event_queue; +static thread_type *event_stream_thread; +static cond_t event_stream_cond; +static avl_tree *client_tree; +static bool alive; + +static void event_stream_clientstate_free(client_t *client) +{ + event_stream_clientstate_t *state = client->format_data; + client->format_data = NULL; + + if (!state) + return; + + igloo_ro_unref(&(state->current_event)); + + free(state); +} + +static void event_stream_event_free(igloo_ro_t self) +{ + event_stream_event_t *event = igloo_ro_to_type(self, event_stream_event_t); + igloo_sp_unref(&(event->uuid), igloo_instance); + igloo_sp_unref(&(event->rendered), igloo_instance); + igloo_ro_unref(&(event->event)); + igloo_ro_unref(&(event->next)); +} + +static event_stream_event_t * event_stream_event_new(void) +{ + event_stream_event_t *event; + + if (igloo_ro_new_raw(&event, event_stream_event_t, igloo_instance) != igloo_ERROR_NONE) + return NULL; + + if (igloo_uuid_new_random_sp(&(event->uuid), igloo_instance) != igloo_ERROR_NONE) { + igloo_ro_unref(&event); + return NULL; + } + + return event; +} + +static void event_stream_queue(event_stream_event_t *event) +{ + event_stream_event_render(event); + + thread_mutex_lock(&event_stream_event_mutex); + *event_queue_next = event; + event_queue_next = &(event->next); + thread_mutex_unlock(&event_stream_event_mutex); + + thread_cond_broadcast(&event_stream_cond); + ICECAST_LOG_INFO("event queued"); +} + +static int _free_client(void *key) +{ + client_t *client = (client_t *)key; + client_destroy(client); + return 1; +} + +void event_stream_initialise(void) +{ + thread_mutex_create(&event_stream_event_mutex); + thread_cond_create(&event_stream_cond); + client_tree = avl_tree_new(client_compare, NULL); + alive = true; +} + +void event_stream_shutdown(void) +{ + thread_mutex_lock(&event_stream_event_mutex); + alive = false; + thread_mutex_unlock(&event_stream_event_mutex); + + thread_cond_broadcast(&event_stream_cond); + + if (event_stream_thread) + thread_join(event_stream_thread); + + avl_tree_free(client_tree, _free_client); + + thread_mutex_lock(&event_stream_event_mutex); + igloo_ro_unref(&event_queue); + thread_mutex_unlock(&event_stream_event_mutex); + + thread_mutex_destroy(&event_stream_event_mutex); + thread_cond_destroy(&event_stream_cond); +} + +void event_stream_add_client_inner(client_t *client, void *ud) +{ + event_stream_clientstate_t *state = calloc(1, sizeof(event_stream_clientstate_t)); + + (void)ud; + + if (!state) { + client_destroy(client); + return; + } + + thread_mutex_lock(&event_stream_event_mutex); + igloo_ro_ref(event_queue, &(state->current_event), event_stream_event_t); + thread_mutex_unlock(&event_stream_event_mutex); + + client->format_data = state; + client->free_client_data = event_stream_clientstate_free; + + avl_tree_wlock(client_tree); + avl_insert(client_tree, client); + avl_tree_unlock(client_tree); + + thread_mutex_lock(&event_stream_event_mutex); + if (!event_stream_thread) { + event_stream_thread = thread_create("Event Stream Thread", event_stream_thread_function, (void *)NULL, THREAD_ATTACHED); + } + thread_mutex_unlock(&event_stream_event_mutex); +} + +void event_stream_add_client(client_t *client) +{ + ssize_t len = util_http_build_header(client->refbuf->data, PER_CLIENT_REFBUF_SIZE, 0, + 0, 200, NULL, + "text/event-stream", NULL, + "", NULL, client + ); + if (len < 1 || len > PER_CLIENT_REFBUF_SIZE) + return; + + client->refbuf->len = len; + + fserve_add_client_callback(client, event_stream_add_client_inner, NULL); +} + +void event_stream_emit_event(event_t *event) +{ + event_stream_event_t *el = event_stream_event_new(); + if (!el) + return; + + igloo_ro_ref_replace(event, &(el->event), event_t); + + event_stream_queue(el); +} + +static void event_stream_send_to_client(client_t *client) +{ + event_stream_clientstate_t *state = client->format_data; + bool going = true; + + ICECAST_LOG_INFO("Sending to client %p {.con.id = %llu}, with state %p", client, (long long unsigned int)client->con->id, state); + + do { + if (!state->current_buffer) { + state->current_buffer = state->current_event->rendered; + state->todo = state->current_event->rendered_length; + } + + if (state->todo) { + ssize_t done = client_send_bytes(client, state->current_buffer, state->todo); + if (done < 0) { + return; + } else { + state->todo -= done; + state->current_buffer += done; + } + } + + if (!state->todo) { + if (state->current_event->next) { + igloo_ro_ref_replace(state->current_event->next, &(state->current_event), event_stream_event_t); + state->current_buffer = NULL; + } else { + going = false; + } + } + } while (going); +} + +static void *event_stream_thread_function(void *arg) +{ + bool running = true; + + ICECAST_LOG_INFO("Good morning!"); + + do { + thread_cond_wait(&event_stream_cond); + ICECAST_LOG_INFO("Tick!"); + + { + avl_tree_wlock(client_tree); + avl_node *next; + + next = avl_get_first(client_tree); + while (next) { + avl_node *node = next; + client_t *client = node->key; + + next = avl_get_next(next); + + event_stream_send_to_client(client); + if (client->con->error) { + avl_delete(client_tree, (void *) client, _free_client); + } + } + avl_tree_unlock(client_tree); + } + + thread_mutex_lock(&event_stream_event_mutex); + running = alive; + + { + event_stream_event_t *head = event_queue; + while (head) { + ICECAST_LOG_INFO("Event: % #H", head->rendered); + head = head->next; + } + } + + thread_mutex_unlock(&event_stream_event_mutex); + } while (running); + + ICECAST_LOG_INFO("Good evening!"); + return NULL; +} + +static void add_number(json_renderer_t *json, const char *key, intmax_t val) +{ + if (val < 1) + return; + + json_renderer_write_key(json, key, JSON_RENDERER_FLAGS_NONE); + json_renderer_write_uint(json, val); +} + +static void event_stream_event_render(event_stream_event_t *event) +{ + string_renderer_t * renderer; + json_renderer_t *json; + char *body; + + if (event->rendered) + return; + + if (igloo_ro_new(&renderer, string_renderer_t, igloo_instance) != igloo_ERROR_NONE) + return; + json = json_renderer_create(JSON_RENDERER_FLAGS_NONE); + if (!json) { + igloo_ro_unref(&renderer); + return; + } + + json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT); + if (event->event) { + event_t *uevent = event->event; + + json_renderer_write_key(json, "type", JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, "event", JSON_RENDERER_FLAGS_NONE); + + json_renderer_write_key(json, "crude", JSON_RENDERER_FLAGS_NONE); + json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT); + { + static const event_extra_key_t key_list[] = { + EVENT_EXTRA_KEY_URI, + EVENT_EXTRA_KEY_SOURCE_MEDIA_TYPE, + EVENT_EXTRA_KEY_SOURCE_INSTANCE_UUID, + EVENT_EXTRA_KEY_CONNECTION_IP, + EVENT_EXTRA_KEY_CLIENT_ROLE, + EVENT_EXTRA_KEY_CLIENT_USERNAME, + EVENT_EXTRA_KEY_CLIENT_USERAGENT, + EVENT_EXTRA_KEY_DUMPFILE_FILENAME, + EVENT_EXTRA_LIST_END + }; + json_renderer_write_key(json, "trigger", JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, uevent->trigger, JSON_RENDERER_FLAGS_NONE); + + for (size_t i = 0; key_list[i] != EVENT_EXTRA_LIST_END; i++) { + const char *value = event_extra_get(uevent, key_list[i]); + + if (!value) + continue; + + json_renderer_write_key(json, event_extra_key_name(key_list[i]), JSON_RENDERER_FLAGS_NONE); + json_renderer_write_string(json, value, JSON_RENDERER_FLAGS_NONE); + } + + add_number(json, "connection-id", uevent->connection_id); + add_number(json, "connection-time", uevent->connection_time); + add_number(json, "client-admin-command", uevent->client_admin_command); + } + json_renderer_end(json); + } + json_renderer_end(json); + + body = json_renderer_finish(&json); + + string_renderer_start_list(renderer, "\r\n", ": ", false, false, STRING_RENDERER_ENCODING_PLAIN); + string_renderer_add_kv(renderer, "id", event->uuid); + string_renderer_add_kv(renderer, "data", body); + string_renderer_end_list(renderer); + string_renderer_add_string(renderer, "\r\n\r\n"); + + free(body); + + igloo_sp_replace(string_renderer_to_string_zero_copy(renderer), &(event->rendered), igloo_instance); + event->rendered_length = strlen(event->rendered); + igloo_ro_unref(&renderer); +} diff --git a/src/event_stream.h b/src/event_stream.h index 8928dd69..f2ac6ffe 100644 --- a/src/event_stream.h +++ b/src/event_stream.h @@ -11,4 +11,12 @@ #include "icecasttypes.h" +igloo_RO_FORWARD_TYPE(event_stream_event_t); + +void event_stream_initialise(void); +void event_stream_shutdown(void); + +void event_stream_add_client(client_t *client); +void event_stream_emit_event(event_t *event); + #endif diff --git a/src/icecasttypes.h b/src/icecasttypes.h index dc30b8d8..6a60b28c 100644 --- a/src/icecasttypes.h +++ b/src/icecasttypes.h @@ -143,6 +143,10 @@ typedef struct geoip_db_tag geoip_db_t; typedef struct event_tag event_t; typedef struct event_registration_tag event_registration_t; +/* ---[ event_stream.[ch] ]--- */ + +typedef struct event_stream_event_tag event_stream_event_t; + /* ---[ refobject.[ch] ]--- */ typedef struct refobject_base_tag refobject_base_t; @@ -167,6 +171,7 @@ typedef void * refobject_t; igloo_RO_TYPE(geoip_db_t) \ igloo_RO_TYPE(event_t) \ igloo_RO_TYPE(event_registration_t) \ + igloo_RO_TYPE(event_stream_event_t) \ igloo_RO_TYPE(module_t) \ igloo_RO_TYPE(module_container_t) diff --git a/src/main.c b/src/main.c index e021b3fd..842cf7c2 100644 --- a/src/main.c +++ b/src/main.c @@ -88,6 +88,7 @@ #include "yp.h" #include "auth.h" #include "event.h" +#include "event_stream.h" #include "ping.h" #include "listensocket.h" #include "fastevent.h" @@ -186,6 +187,7 @@ static void initialize_subsystems(void) static void shutdown_subsystems(void) { ICECAST_LOG_DEBUG("Shuting down subsystems..."); + event_stream_shutdown(); event_shutdown(); fserve_shutdown(); refbuf_shutdown(); @@ -785,6 +787,7 @@ int main(int argc, char **argv) slave_initialize(); auth_initialise (); event_initialise(); + event_stream_initialise(); event_emit_global("icecast-start"); _server_proc();