1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-11-03 04:17:17 -05:00

Feature: Added most basic event stream implementation

This commit is contained in:
Philipp Schafft 2024-01-24 16:06:32 +00:00
parent ae0e8e0183
commit 11ae1e87df
7 changed files with 383 additions and 17 deletions

View File

@ -111,7 +111,7 @@ acl_t *acl_new(void)
acl_set_method_str(ret, ACL_POLICY_ALLOW, "get,options");
acl_set_admin_str(ret, ACL_POLICY_DENY, "*");
acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,publicstats,publicstats.json");
acl_set_admin_str(ret, ACL_POLICY_ALLOW, "buildm3u,eventstream,publicstats,publicstats.json");
acl_set_web_policy(ret, ACL_POLICY_ALLOW);

View File

@ -53,6 +53,7 @@
#include "listensocket.h"
#include "refbuf.h"
#include "client.h"
#include "event_stream.h"
#include "source.h"
#include "global.h"
#include "stats.h"
@ -149,6 +150,7 @@
#define DEFAULT_RAW_REQUEST ""
#define DEFAULT_HTML_REQUEST ""
#define BUILDM3U_RAW_REQUEST "buildm3u"
#define EVENTSTREAM_RAW_REQUEST "eventstream"
typedef struct {
size_t listeners;
@ -179,6 +181,7 @@ static void command_dumpfile_control (client_t *client, source_t *source, adm
static void command_manageauth (client_t *client, source_t *source, admin_format_t response);
static void command_updatemetadata (client_t *client, source_t *source, admin_format_t response);
static void command_buildm3u (client_t *client, source_t *source, admin_format_t response);
static void command_eventstream (client_t *client, source_t *source, admin_format_t response);
static void command_show_log (client_t *client, source_t *source, admin_format_t response);
static void command_mark_log (client_t *client, source_t *source, admin_format_t response);
static void command_dashboard (client_t *client, source_t *source, admin_format_t response);
@ -232,6 +235,7 @@ static const admin_command_handler_t handlers[] = {
{ UPDATEMETADATA_HTML_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_HTML, ADMINSAFE_SAFE, command_updatemetadata, NULL},
{ UPDATEMETADATA_JSON_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_JSON, ADMINSAFE_SAFE, command_updatemetadata, NULL},
{ BUILDM3U_RAW_REQUEST, ADMINTYPE_MOUNT, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_buildm3u, NULL},
{ EVENTSTREAM_RAW_REQUEST, ADMINTYPE_HYBRID, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_eventstream, NULL},
{ SHOWLOG_RAW_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_RAW, ADMINSAFE_SAFE, command_show_log, NULL},
{ SHOWLOG_HTML_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_HTML, ADMINSAFE_SAFE, command_show_log, NULL},
{ SHOWLOG_JSON_REQUEST, ADMINTYPE_GENERAL, ADMIN_FORMAT_JSON, ADMINSAFE_SAFE, command_show_log, NULL},
@ -1097,6 +1101,12 @@ static void command_buildm3u(client_t *client, source_t *source, admin_format_t
client_send_buffer(client, 200, "audio/x-mpegurl", NULL, buffer, -1, "Content-Disposition: attachment; filename=listen.m3u\r\n");
}
static void command_eventstream (client_t *client, source_t *source, admin_format_t response)
{
(void)source, (void)response;
event_stream_add_client(client);
}
xmlNodePtr admin_add_role_to_authentication(auth_t *auth, xmlNodePtr parent)
{
xmlNodePtr rolenode = xmlNewChild(parent, NULL, XMLSTR("role"), NULL);

View File

@ -21,6 +21,7 @@
#include <igloo/sp.h>
#include "event.h"
#include "event_stream.h"
#include "fastevent.h"
#include "logging.h"
#include "string_renderer.h"
@ -426,6 +427,7 @@ void event_registration_push(event_registration_t **er, event_registration_t *ta
/* event signaling */
void event_emit(event_t *event) {
fastevent_emit(FASTEVENT_TYPE_SLOWEVENT, FASTEVENT_FLAG_NONE, FASTEVENT_DATATYPE_EVENT, event);
event_stream_emit_event(event);
thread_mutex_lock(&event_lock);
event_push(&event_queue, event);
thread_mutex_unlock(&event_lock);
@ -476,21 +478,6 @@ void event_emit_va(const char *trigger, ...) {
event_push_reglist(event, mount->event);
config_release_config();
/* This isn't perfectly clean but is an important speedup:
* If first element of reglist is NULL none of the above pushed in
* some registrations. If there are no registrations we can just drop
* this event now and here.
* We do this before inserting all the data into the object to avoid
* all the strdups() and stuff in case they aren't needed.
*/
#ifndef FASTEVENT_ENABLED
if (event->reglist[0] == NULL) {
/* we have no registrations, drop this event. */
igloo_ro_unref(&event);
return;
}
#endif
if (uri)
extra_add(event, EVENT_EXTRA_KEY_URI, uri);

View File

@ -10,14 +10,367 @@
#include <config.h>
#endif
#include <igloo/ro.h>
#include <stdbool.h>
#include <string.h>
#include <igloo/typedef.h>
#include "icecasttypes.h"
#include <igloo/error.h>
#include <igloo/ro.h>
#include <igloo/sp.h>
#include <igloo/uuid.h>
#include "common/thread/thread.h"
#include "common/avl/avl.h"
#include "event_stream.h"
#include "string_renderer.h"
#include "json.h"
#include "util.h"
#include "fserve.h"
#include "global.h"
#include "event.h"
#include "client.h"
#include "connection.h"
#include "errors.h"
#include "logging.h"
#define CATMODULE "event-stream"
struct event_stream_event_tag {
igloo_ro_full_t __parent;
const char * uuid;
const char * rendered;
size_t rendered_length;
event_t *event;
event_stream_event_t *next;
};
typedef struct {
const char *current_buffer;
size_t todo;
event_stream_event_t *current_event;
} event_stream_clientstate_t;
static void event_stream_event_free(igloo_ro_t self);
static void *event_stream_thread_function(void *arg);
static void event_stream_event_render(event_stream_event_t *event);
igloo_RO_PUBLIC_TYPE(event_stream_event_t, igloo_ro_full_t,
igloo_RO_TYPEDECL_FREE(event_stream_event_free),
);
static mutex_t event_stream_event_mutex; // protects: event_queue, event_queue_next, event_stream_thread, alive
static event_stream_event_t *event_queue;
static event_stream_event_t **event_queue_next = &event_queue;
static thread_type *event_stream_thread;
static cond_t event_stream_cond;
static avl_tree *client_tree;
static bool alive;
static void event_stream_clientstate_free(client_t *client)
{
event_stream_clientstate_t *state = client->format_data;
client->format_data = NULL;
if (!state)
return;
igloo_ro_unref(&(state->current_event));
free(state);
}
static void event_stream_event_free(igloo_ro_t self)
{
event_stream_event_t *event = igloo_ro_to_type(self, event_stream_event_t);
igloo_sp_unref(&(event->uuid), igloo_instance);
igloo_sp_unref(&(event->rendered), igloo_instance);
igloo_ro_unref(&(event->event));
igloo_ro_unref(&(event->next));
}
static event_stream_event_t * event_stream_event_new(void)
{
event_stream_event_t *event;
if (igloo_ro_new_raw(&event, event_stream_event_t, igloo_instance) != igloo_ERROR_NONE)
return NULL;
if (igloo_uuid_new_random_sp(&(event->uuid), igloo_instance) != igloo_ERROR_NONE) {
igloo_ro_unref(&event);
return NULL;
}
return event;
}
static void event_stream_queue(event_stream_event_t *event)
{
event_stream_event_render(event);
thread_mutex_lock(&event_stream_event_mutex);
*event_queue_next = event;
event_queue_next = &(event->next);
thread_mutex_unlock(&event_stream_event_mutex);
thread_cond_broadcast(&event_stream_cond);
ICECAST_LOG_INFO("event queued");
}
static int _free_client(void *key)
{
client_t *client = (client_t *)key;
client_destroy(client);
return 1;
}
void event_stream_initialise(void)
{
thread_mutex_create(&event_stream_event_mutex);
thread_cond_create(&event_stream_cond);
client_tree = avl_tree_new(client_compare, NULL);
alive = true;
}
void event_stream_shutdown(void)
{
thread_mutex_lock(&event_stream_event_mutex);
alive = false;
thread_mutex_unlock(&event_stream_event_mutex);
thread_cond_broadcast(&event_stream_cond);
if (event_stream_thread)
thread_join(event_stream_thread);
avl_tree_free(client_tree, _free_client);
thread_mutex_lock(&event_stream_event_mutex);
igloo_ro_unref(&event_queue);
thread_mutex_unlock(&event_stream_event_mutex);
thread_mutex_destroy(&event_stream_event_mutex);
thread_cond_destroy(&event_stream_cond);
}
void event_stream_add_client_inner(client_t *client, void *ud)
{
event_stream_clientstate_t *state = calloc(1, sizeof(event_stream_clientstate_t));
(void)ud;
if (!state) {
client_destroy(client);
return;
}
thread_mutex_lock(&event_stream_event_mutex);
igloo_ro_ref(event_queue, &(state->current_event), event_stream_event_t);
thread_mutex_unlock(&event_stream_event_mutex);
client->format_data = state;
client->free_client_data = event_stream_clientstate_free;
avl_tree_wlock(client_tree);
avl_insert(client_tree, client);
avl_tree_unlock(client_tree);
thread_mutex_lock(&event_stream_event_mutex);
if (!event_stream_thread) {
event_stream_thread = thread_create("Event Stream Thread", event_stream_thread_function, (void *)NULL, THREAD_ATTACHED);
}
thread_mutex_unlock(&event_stream_event_mutex);
}
void event_stream_add_client(client_t *client)
{
ssize_t len = util_http_build_header(client->refbuf->data, PER_CLIENT_REFBUF_SIZE, 0,
0, 200, NULL,
"text/event-stream", NULL,
"", NULL, client
);
if (len < 1 || len > PER_CLIENT_REFBUF_SIZE)
return;
client->refbuf->len = len;
fserve_add_client_callback(client, event_stream_add_client_inner, NULL);
}
void event_stream_emit_event(event_t *event)
{
event_stream_event_t *el = event_stream_event_new();
if (!el)
return;
igloo_ro_ref_replace(event, &(el->event), event_t);
event_stream_queue(el);
}
static void event_stream_send_to_client(client_t *client)
{
event_stream_clientstate_t *state = client->format_data;
bool going = true;
ICECAST_LOG_INFO("Sending to client %p {.con.id = %llu}, with state %p", client, (long long unsigned int)client->con->id, state);
do {
if (!state->current_buffer) {
state->current_buffer = state->current_event->rendered;
state->todo = state->current_event->rendered_length;
}
if (state->todo) {
ssize_t done = client_send_bytes(client, state->current_buffer, state->todo);
if (done < 0) {
return;
} else {
state->todo -= done;
state->current_buffer += done;
}
}
if (!state->todo) {
if (state->current_event->next) {
igloo_ro_ref_replace(state->current_event->next, &(state->current_event), event_stream_event_t);
state->current_buffer = NULL;
} else {
going = false;
}
}
} while (going);
}
static void *event_stream_thread_function(void *arg)
{
bool running = true;
ICECAST_LOG_INFO("Good morning!");
do {
thread_cond_wait(&event_stream_cond);
ICECAST_LOG_INFO("Tick!");
{
avl_tree_wlock(client_tree);
avl_node *next;
next = avl_get_first(client_tree);
while (next) {
avl_node *node = next;
client_t *client = node->key;
next = avl_get_next(next);
event_stream_send_to_client(client);
if (client->con->error) {
avl_delete(client_tree, (void *) client, _free_client);
}
}
avl_tree_unlock(client_tree);
}
thread_mutex_lock(&event_stream_event_mutex);
running = alive;
{
event_stream_event_t *head = event_queue;
while (head) {
ICECAST_LOG_INFO("Event: % #H", head->rendered);
head = head->next;
}
}
thread_mutex_unlock(&event_stream_event_mutex);
} while (running);
ICECAST_LOG_INFO("Good evening!");
return NULL;
}
static void add_number(json_renderer_t *json, const char *key, intmax_t val)
{
if (val < 1)
return;
json_renderer_write_key(json, key, JSON_RENDERER_FLAGS_NONE);
json_renderer_write_uint(json, val);
}
static void event_stream_event_render(event_stream_event_t *event)
{
string_renderer_t * renderer;
json_renderer_t *json;
char *body;
if (event->rendered)
return;
if (igloo_ro_new(&renderer, string_renderer_t, igloo_instance) != igloo_ERROR_NONE)
return;
json = json_renderer_create(JSON_RENDERER_FLAGS_NONE);
if (!json) {
igloo_ro_unref(&renderer);
return;
}
json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT);
if (event->event) {
event_t *uevent = event->event;
json_renderer_write_key(json, "type", JSON_RENDERER_FLAGS_NONE);
json_renderer_write_string(json, "event", JSON_RENDERER_FLAGS_NONE);
json_renderer_write_key(json, "crude", JSON_RENDERER_FLAGS_NONE);
json_renderer_begin(json, JSON_ELEMENT_TYPE_OBJECT);
{
static const event_extra_key_t key_list[] = {
EVENT_EXTRA_KEY_URI,
EVENT_EXTRA_KEY_SOURCE_MEDIA_TYPE,
EVENT_EXTRA_KEY_SOURCE_INSTANCE_UUID,
EVENT_EXTRA_KEY_CONNECTION_IP,
EVENT_EXTRA_KEY_CLIENT_ROLE,
EVENT_EXTRA_KEY_CLIENT_USERNAME,
EVENT_EXTRA_KEY_CLIENT_USERAGENT,
EVENT_EXTRA_KEY_DUMPFILE_FILENAME,
EVENT_EXTRA_LIST_END
};
json_renderer_write_key(json, "trigger", JSON_RENDERER_FLAGS_NONE);
json_renderer_write_string(json, uevent->trigger, JSON_RENDERER_FLAGS_NONE);
for (size_t i = 0; key_list[i] != EVENT_EXTRA_LIST_END; i++) {
const char *value = event_extra_get(uevent, key_list[i]);
if (!value)
continue;
json_renderer_write_key(json, event_extra_key_name(key_list[i]), JSON_RENDERER_FLAGS_NONE);
json_renderer_write_string(json, value, JSON_RENDERER_FLAGS_NONE);
}
add_number(json, "connection-id", uevent->connection_id);
add_number(json, "connection-time", uevent->connection_time);
add_number(json, "client-admin-command", uevent->client_admin_command);
}
json_renderer_end(json);
}
json_renderer_end(json);
body = json_renderer_finish(&json);
string_renderer_start_list(renderer, "\r\n", ": ", false, false, STRING_RENDERER_ENCODING_PLAIN);
string_renderer_add_kv(renderer, "id", event->uuid);
string_renderer_add_kv(renderer, "data", body);
string_renderer_end_list(renderer);
string_renderer_add_string(renderer, "\r\n\r\n");
free(body);
igloo_sp_replace(string_renderer_to_string_zero_copy(renderer), &(event->rendered), igloo_instance);
event->rendered_length = strlen(event->rendered);
igloo_ro_unref(&renderer);
}

View File

@ -11,4 +11,12 @@
#include "icecasttypes.h"
igloo_RO_FORWARD_TYPE(event_stream_event_t);
void event_stream_initialise(void);
void event_stream_shutdown(void);
void event_stream_add_client(client_t *client);
void event_stream_emit_event(event_t *event);
#endif

View File

@ -143,6 +143,10 @@ typedef struct geoip_db_tag geoip_db_t;
typedef struct event_tag event_t;
typedef struct event_registration_tag event_registration_t;
/* ---[ event_stream.[ch] ]--- */
typedef struct event_stream_event_tag event_stream_event_t;
/* ---[ refobject.[ch] ]--- */
typedef struct refobject_base_tag refobject_base_t;
@ -167,6 +171,7 @@ typedef void * refobject_t;
igloo_RO_TYPE(geoip_db_t) \
igloo_RO_TYPE(event_t) \
igloo_RO_TYPE(event_registration_t) \
igloo_RO_TYPE(event_stream_event_t) \
igloo_RO_TYPE(module_t) \
igloo_RO_TYPE(module_container_t)

View File

@ -88,6 +88,7 @@
#include "yp.h"
#include "auth.h"
#include "event.h"
#include "event_stream.h"
#include "ping.h"
#include "listensocket.h"
#include "fastevent.h"
@ -186,6 +187,7 @@ static void initialize_subsystems(void)
static void shutdown_subsystems(void)
{
ICECAST_LOG_DEBUG("Shuting down subsystems...");
event_stream_shutdown();
event_shutdown();
fserve_shutdown();
refbuf_shutdown();
@ -785,6 +787,7 @@ int main(int argc, char **argv)
slave_initialize();
auth_initialise ();
event_initialise();
event_stream_initialise();
event_emit_global("icecast-start");
_server_proc();