diff --git a/src/source.c b/src/source.c index fcdd016b..f3ddc504 100644 --- a/src/source.c +++ b/src/source.c @@ -745,7 +745,7 @@ static void source_shutdown (source_t *source) /* delete this sources stats */ stats_event_dec(NULL, "sources"); - stats_event(source->mount, "listeners", NULL); + stats_event(source->mount, NULL, NULL); /* we don't remove the source from the tree here, it may be a relay and therefore reserved */ diff --git a/src/stats.c b/src/stats.c index b83e0d5a..0989b6f8 100644 --- a/src/stats.c +++ b/src/stats.c @@ -35,11 +35,20 @@ #include "client.h" #include "stats.h" #include "xslt.h" +#define CATMODULE "stats" +#include "logging.h" #ifdef _WIN32 #define vsnprintf _vsnprintf +#define snprintf _snprintf #endif +#define STATS_EVENT_SET 0 +#define STATS_EVENT_INC 1 +#define STATS_EVENT_DEC 2 +#define STATS_EVENT_ADD 3 +#define STATS_EVENT_REMOVE 4 + typedef struct _event_listener_tag { stats_event_t **queue; @@ -48,20 +57,17 @@ typedef struct _event_listener_tag struct _event_listener_tag *next; } event_listener_t; -int _stats_running = 0; -thread_type *_stats_thread_id; -int _stats_threads = 0; +static volatile int _stats_running = 0; +static thread_type *_stats_thread_id; +static volatile int _stats_threads = 0; -stats_t _stats; -mutex_t _stats_mutex; +static stats_t _stats; +static mutex_t _stats_mutex; -stats_event_t *_global_event_queue; +static volatile stats_event_t *_global_event_queue; mutex_t _global_event_mutex; -cond_t _event_signal_cond; - -event_listener_t *_event_listeners; - +static volatile event_listener_t *_event_listeners; static void *_stats_thread(void *arg); @@ -74,6 +80,46 @@ static stats_node_t *_find_node(avl_tree *tree, char *name); static stats_source_t *_find_source(avl_tree *tree, char *source); static void _free_event(stats_event_t *event); + +/* simple helper function for creating an event */ +static stats_event_t *build_event (const char *source, const char *name, const char *value) +{ + stats_event_t *event; + + event = (stats_event_t *)calloc(1, sizeof(stats_event_t)); + if (event) + { + if (source) + event->source = (char *)strdup(source); + if (name) + event->name = (char *)strdup(name); + if (value) + event->value = (char *)strdup(value); + else + event->action = STATS_EVENT_REMOVE; + } + return event; +} + +static void queue_global_event (stats_event_t *event) +{ + thread_mutex_lock(&_global_event_mutex); + if (_global_event_queue == NULL) + { + _global_event_queue = event; + } + else + { + stats_event_t *node = (stats_event_t *)_global_event_queue; + while (node->next) + node = node->next; + node->next = event; + } + /* DEBUG3("event added (%s, %s, %s)", event->source, + event->name, event->value); */ + thread_mutex_unlock(&_global_event_mutex); +} + void stats_initialize() { _event_listeners = NULL; @@ -85,9 +131,6 @@ void stats_initialize() /* set up global mutex */ thread_mutex_create(&_stats_mutex); - /* set up event signaler */ - thread_cond_create(&_event_signal_cond); - /* set up stats queues */ _global_event_queue = NULL; thread_mutex_create(&_global_event_mutex); @@ -116,19 +159,18 @@ void stats_shutdown() n = _stats_threads; thread_mutex_unlock(&_stats_mutex); } while (n > 0); + INFO0("stats thread finished"); /* free the queues */ /* destroy the queue mutexes */ thread_mutex_destroy(&_global_event_mutex); - /* tear it all down */ - thread_cond_destroy(&_event_signal_cond); thread_mutex_destroy(&_stats_mutex); avl_tree_free(_stats.source_tree, _free_source_stats); avl_tree_free(_stats.global_tree, _free_stats); - event = _global_event_queue; + event = (stats_event_t *)_global_event_queue; while(event) { if(event->source) free(event->source); @@ -155,43 +197,36 @@ stats_t *stats_get_stats() return NULL; } -void stats_event(char *source, char *name, char *value) +/* simple name=tag stat create/update */ +void stats_event(const char *source, const char *name, const char *value) { - stats_event_t *node; stats_event_t *event; - if (name == NULL || strcmp(name, "") == 0) return; - - /* build event */ - event = (stats_event_t *)malloc(sizeof(stats_event_t)); - event->source = NULL; - if (source != NULL) event->source = (char *)strdup(source); - event->name = (char *)strdup(name); - event->value = NULL; - event->next = NULL; - if (value != NULL) event->value = (char *)strdup(value); - - /* queue event */ - thread_mutex_lock(&_global_event_mutex); - if (_global_event_queue == NULL) { - _global_event_queue = event; - } else { - node = _global_event_queue; - while (node->next) node = node->next; - node->next = event; - } - thread_mutex_unlock(&_global_event_mutex); + event = build_event (source, name, value); + if (event) + queue_global_event (event); } -void stats_event_args(char *source, char *name, char *format, ...) + +/* printf style formatting for stat create/update */ +void stats_event_args(const char *source, char *name, char *format, ...) { char buf[1024]; va_list val; - + int ret; + + if (name == NULL) + return; va_start(val, format); - vsnprintf(buf, 1024, format, val); + ret = vsnprintf(buf, 1024, format, val); va_end(val); + if (ret < 0 || (unsigned int)ret >= sizeof (buf)) + { + WARN2 ("problem with formatting %s stat %s", + source==NULL ? "global" : source, name); + return; + } stats_event(source, name, buf); } @@ -223,56 +258,42 @@ char *stats_get_value(char *source, char *name) { return(_get_stats(source, name)); } -void stats_event_inc(char *source, char *name) -{ - char *old_value; - int new_value; - - old_value = _get_stats(source, name); - if (old_value != NULL) { - new_value = atoi(old_value); - free(old_value); - new_value++; - } else { - new_value = 1; - } - stats_event_args(source, name, "%d", new_value); +/* increase the value in the provided stat by 1 */ +void stats_event_inc(const char *source, const char *name) +{ + stats_event_t *event = build_event (source, name, NULL); + /* DEBUG2("%s on %s", name, source==NULL?"global":source); */ + if (event) + { + event->action = STATS_EVENT_INC; + queue_global_event (event); + } } -void stats_event_add(char *source, char *name, unsigned long value) +void stats_event_add(const char *source, const char *name, unsigned long value) { - char *old_value; - unsigned long new_value; - - old_value = _get_stats(source, name); - if (old_value != NULL) { - new_value = atol(old_value); - free(old_value); - new_value += value; - } else { - new_value = value; + stats_event_t *event = build_event (source, name, NULL); + /* DEBUG2("%s on %s", name, source==NULL?"global":source); */ + if (event) + { + event->value = malloc (16); + snprintf (event->value, 16, "%ld", value); + event->action = STATS_EVENT_ADD; + queue_global_event (event); } - - stats_event_args(source, name, "%ld", new_value); } -void stats_event_dec(char *source, char *name) +/* decrease the value in the provided stat by 1 */ +void stats_event_dec(const char *source, const char *name) { - char *old_value; - int new_value; - - old_value = _get_stats(source, name); - if (old_value != NULL) { - new_value = atoi(old_value); - free(old_value); - new_value--; - if (new_value < 0) new_value = 0; - } else { - new_value = 0; + /* DEBUG2("%s on %s", name, source==NULL?"global":source); */ + stats_event_t *event = build_event (source, name, NULL); + if (event) + { + event->action = STATS_EVENT_DEC; + queue_global_event (event); } - - stats_event_args(source, name, "%d", new_value); } /* note: you must call this function only when you have exclusive access @@ -330,12 +351,13 @@ static stats_source_t *_find_source(avl_tree *source_tree, char *source) static stats_event_t *_copy_event(stats_event_t *event) { - stats_event_t *copy = (stats_event_t *)malloc(sizeof(stats_event_t)); + stats_event_t *copy = (stats_event_t *)calloc(1, sizeof(stats_event_t)); if (event->source) copy->source = (char *)strdup(event->source); else copy->source = NULL; - copy->name = (char *)strdup(event->name); + if (event->name) + copy->name = (char *)strdup(event->name); if (event->value) copy->value = (char *)strdup(event->value); else @@ -345,103 +367,167 @@ static stats_event_t *_copy_event(stats_event_t *event) return copy; } + +/* helper to apply specialised changes to a stats node */ +static void modify_node_event (stats_node_t *node, stats_event_t *event) +{ + char *str; + + if (event->action != STATS_EVENT_SET) + { + int value = 0; + + switch (event->action) + { + case STATS_EVENT_INC: + value = atoi (node->value)+1; + break; + case STATS_EVENT_DEC: + value = atoi (node->value)-1; + break; + case STATS_EVENT_ADD: + value = atoi (node->value)+atoi (event->value); + break; + default: + break; + } + str = malloc (16); + snprintf (str, 16, "%d", value); + } + else + str = (char *)strdup (event->value); + free (node->value); + node->value = str; + /* DEBUG3 ("update node %s \"%s\" (%d)", node->name, node->value, event->action); */ +} + + +static void process_global_event (stats_event_t *event) +{ + stats_node_t *node; + + /* DEBUG3("global event %s %s %d", event->name, event->value, event->action); */ + if (event->action == STATS_EVENT_REMOVE) + { + /* we're deleting */ + node = _find_node(_stats.global_tree, event->name); + if (node != NULL) + avl_delete(_stats.global_tree, (void *)node, _free_stats); + return; + } + node = _find_node(_stats.global_tree, event->name); + if (node) + { + modify_node_event (node, event); + } + else + { + /* add node */ + node = (stats_node_t *)calloc(1, sizeof(stats_node_t)); + node->name = (char *)strdup(event->name); + node->value = (char *)strdup(event->value); + + avl_insert(_stats.global_tree, (void *)node); + } +} + + +static void process_source_event (stats_event_t *event) +{ + stats_source_t *snode = _find_source(_stats.source_tree, event->source); + if (snode == NULL) + { + if (event->action == STATS_EVENT_REMOVE) + return; + snode = (stats_source_t *)calloc(1,sizeof(stats_source_t)); + if (snode == NULL) + return; + DEBUG1 ("new source stat %s", event->source); + snode->source = (char *)strdup(event->source); + snode->stats_tree = avl_tree_new(_compare_stats, NULL); + + avl_insert(_stats.source_tree, (void *)snode); + } + if (event->name) + { + stats_node_t *node = _find_node(snode->stats_tree, event->name); + if (node == NULL) + { + if (event->action == STATS_EVENT_REMOVE) + return; + /* adding node */ + if (event->value) + { + DEBUG2 ("new node %s (%s)", event->name, event->value); + node = (stats_node_t *)calloc(1,sizeof(stats_node_t)); + node->name = (char *)strdup(event->name); + node->value = (char *)strdup(event->value); + + avl_insert(snode->stats_tree, (void *)node); + } + return; + } + if (event->action == STATS_EVENT_REMOVE) + { + DEBUG1 ("delete node %s", event->name); + avl_delete(snode->stats_tree, (void *)node, _free_stats); + return; + } + modify_node_event (node, event); + return; + } + if (event->action == STATS_EVENT_REMOVE) + { + DEBUG1 ("delete source node %s", event->source); + avl_delete(_stats.source_tree, (void *)snode, _free_source_stats); + } +} + + static void *_stats_thread(void *arg) { stats_event_t *event; stats_event_t *copy; - stats_node_t *node; - stats_node_t *anode; - stats_source_t *snode; - stats_source_t *asnode; event_listener_t *listener; - avl_node *avlnode; + stats_event (NULL, "server", ICECAST_VERSION_STRING); + + /* global currently active stats */ + stats_event (NULL, "clients", "0"); + stats_event (NULL, "connections", "0"); + stats_event (NULL, "sources", "0"); + stats_event (NULL, "stats", "0"); + + /* global accumulating stats */ + stats_event (NULL, "client_connections", "0"); + stats_event (NULL, "source_client_connections", "0"); + stats_event (NULL, "source_relay_connections", "0"); + stats_event (NULL, "source_total_connections", "0"); + stats_event (NULL, "stats_connections", "0"); + + INFO0 ("stats thread started"); while (_stats_running) { - thread_mutex_lock(&_global_event_mutex); if (_global_event_queue != NULL) { /* grab the next event from the queue */ - event = _global_event_queue; + thread_mutex_lock(&_global_event_mutex); + event = (stats_event_t *)_global_event_queue; _global_event_queue = event->next; + thread_mutex_unlock(&_global_event_mutex); + event->next = NULL; thread_mutex_unlock(&_global_event_mutex); thread_mutex_lock(&_stats_mutex); - if (event->source == NULL) { - /* we have a global event */ - if (event->value != NULL) { - /* adding/updating */ - node = _find_node(_stats.global_tree, event->name); - if (node == NULL) { - /* add node */ - anode = (stats_node_t *)malloc(sizeof(stats_node_t)); - anode->name = (char *)strdup(event->name); - anode->value = (char *)strdup(event->value); - avl_insert(_stats.global_tree, (void *)anode); - } else { - /* update node */ - free(node->value); - node->value = (char *)strdup(event->value); - } - - } else { - /* we're deleting */ - node = _find_node(_stats.global_tree, event->name); - if (node != NULL) - avl_delete(_stats.global_tree, (void *)node, _free_stats); - } - } else { - /* we have a source event */ - - snode = _find_source(_stats.source_tree, event->source); - if (snode != NULL) { - /* this is a source we already have a tree for */ - if (event->value != NULL) { - /* we're add/updating */ - node = _find_node(snode->stats_tree, event->name); - if (node == NULL) { - /* adding node */ - anode = (stats_node_t *)malloc(sizeof(stats_node_t)); - anode->name = (char *)strdup(event->name); - anode->value = (char *)strdup(event->value); - - avl_insert(snode->stats_tree, (void *)anode); - } else { - /* updating node */ - free(node->value); - node->value = (char *)strdup(event->value); - } - } else { - /* we're deleting */ - node = _find_node(snode->stats_tree, event->name); - if (node != NULL) { - avl_delete(snode->stats_tree, (void *)node, _free_stats); - - avlnode = avl_get_first(snode->stats_tree); - if (avlnode == NULL) { - avl_delete(_stats.source_tree, (void *)snode, _free_source_stats); - } - } - } - } else { - /* this is a new source */ - asnode = (stats_source_t *)malloc(sizeof(stats_source_t)); - asnode->source = (char *)strdup(event->source); - asnode->stats_tree = avl_tree_new(_compare_stats, NULL); - - anode = (stats_node_t *)malloc(sizeof(stats_node_t)); - anode->name = (char *)strdup(event->name); - anode->value = (char *)strdup(event->value); - - avl_insert(asnode->stats_tree, (void *)anode); - - avl_insert(_stats.source_tree, (void *)asnode); - } - } + /* check if we are dealing with a global or source event */ + if (event->source == NULL) + process_global_event (event); + else + process_source_event (event); /* now we have an event that's been processed into the running stats */ /* this event should get copied to event listeners' queues */ - listener = _event_listeners; + listener = (event_listener_t *)_event_listeners; while (listener) { copy = _copy_event(event); thread_mutex_lock(listener->mutex); @@ -450,23 +536,17 @@ static void *_stats_thread(void *arg) listener = listener->next; } - thread_cond_broadcast(&_event_signal_cond); /* now we need to destroy the event */ _free_event(event); thread_mutex_unlock(&_stats_mutex); continue; - } else { - thread_mutex_unlock(&_global_event_mutex); } thread_sleep(300000); } - /* wake the other threads so they can shut down cleanly */ - thread_cond_broadcast(&_event_signal_cond); - return NULL; } @@ -483,7 +563,7 @@ static void _register_listener(stats_event_t **queue, mutex_t *mutex) if (_event_listeners == NULL) { _event_listeners = evli; } else { - node = _event_listeners; + node = (event_listener_t *)_event_listeners; while (node->next) node = node->next; node->next = evli; } @@ -499,6 +579,7 @@ static stats_event_t *_make_event_from_node(stats_node_t *node, char *source) event->source = NULL; event->name = (char *)strdup(node->name); event->value = (char *)strdup(node->value); + event->action = STATS_EVENT_SET; event->next = NULL; return event; @@ -535,7 +616,10 @@ static int _send_event_to_client(stats_event_t *event, connection_t *con) int ret; /* send data to the client!!!! */ - ret = sock_write(con->sock, "EVENT %s %s %s\n", (event->source != NULL) ? event->source : "global", event->name, event->value ? event->value : "null"); + ret = sock_write(con->sock, "EVENT %s %s %s\n", + (event->source != NULL) ? event->source : "global", + event->name ? event->name : "null", + event->value ? event->value : "null"); return (ret == -1) ? 0 : 1; } @@ -649,7 +733,7 @@ void *stats_connection(void *arg) _free_event(event); } else { thread_mutex_unlock(&local_event_mutex); - thread_cond_wait(&_event_signal_cond); + thread_sleep (500000); continue; } @@ -806,8 +890,7 @@ void stats_sendxml(client_t *client) if (bytes > 0) client->con->sent_bytes += bytes; else goto send_error; - bytes = sock_write_bytes(client->con->sock, buff, len); - if (bytes > 0) client->con->sent_bytes += bytes; + bytes = client_send_bytes (client, buff, (unsigned)len); send_error: while (src_nodes) { diff --git a/src/stats.h b/src/stats.h index 894af3df..2cb326fe 100644 --- a/src/stats.h +++ b/src/stats.h @@ -38,6 +38,7 @@ typedef struct _stats_event_tag char *source; char *name; char *value; + int action; struct _stats_event_tag *next; } stats_event_t; @@ -77,11 +78,11 @@ void stats_shutdown(); stats_t *stats_get_stats(); -void stats_event(char *source, char *name, char *value); -void stats_event_args(char *source, char *name, char *format, ...); -void stats_event_inc(char *source, char *name); -void stats_event_add(char *source, char *name, unsigned long value); -void stats_event_dec(char *source, char *name); +void stats_event(const char *source, const char *name, const char *value); +void stats_event_args(const char *source, char *name, char *format, ...); +void stats_event_inc(const char *source, const char *name); +void stats_event_add(const char *source, const char *name, unsigned long value); +void stats_event_dec(const char *source, const char *name); void *stats_connection(void *arg); void *stats_callback(void *arg);