diff --git a/src/stats.c b/src/stats.c index 60453ff7..a7a2118f 100644 --- a/src/stats.c +++ b/src/stats.c @@ -51,10 +51,18 @@ #define STATS_EVENT_REMOVE 4 #define STATS_EVENT_HIDDEN 5 +typedef struct _event_queue_tag +{ + volatile stats_event_t *head; + volatile stats_event_t **tail; +} event_queue_t; + +#define event_queue_init(qp) { (qp)->head = NULL; (qp)->tail = &(qp)->head; } + typedef struct _event_listener_tag { - stats_event_t **queue; - mutex_t *mutex; + event_queue_t queue; + mutex_t mutex; struct _event_listener_tag *next; } event_listener_t; @@ -66,7 +74,7 @@ static volatile int _stats_threads = 0; static stats_t _stats; static mutex_t _stats_mutex; -static volatile stats_event_t *_global_event_queue; +static event_queue_t _global_event_queue; mutex_t _global_event_mutex; static volatile event_listener_t *_event_listeners; @@ -77,10 +85,11 @@ static int _compare_stats(void *a, void *b, void *arg); static int _compare_source_stats(void *a, void *b, void *arg); static int _free_stats(void *key); static int _free_source_stats(void *key); -static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue); +static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue); static stats_node_t *_find_node(avl_tree *tree, char *name); static stats_source_t *_find_source(avl_tree *tree, char *source); static void _free_event(stats_event_t *event); +static stats_event_t *_get_event_from_queue (event_queue_t *queue); /* simple helper function for creating an event */ @@ -106,19 +115,7 @@ static stats_event_t *build_event (const char *source, const char *name, const c static void queue_global_event (stats_event_t *event) { thread_mutex_lock(&_global_event_mutex); - if (_global_event_queue == NULL) - { - _global_event_queue = event; - } - else - { - stats_event_t *node = (stats_event_t *)_global_event_queue; - while (node->next) - node = node->next; - node->next = event; - } - /* DEBUG3("event added (%s, %s, %s)", event->source, - event->name, event->value); */ + _add_event_to_queue (event, &_global_event_queue); thread_mutex_unlock(&_global_event_mutex); } @@ -134,7 +131,7 @@ void stats_initialize() thread_mutex_create(&_stats_mutex); /* set up stats queues */ - _global_event_queue = NULL; + event_queue_init (&_global_event_queue); thread_mutex_create(&_global_event_mutex); /* fire off the stats thread */ @@ -145,7 +142,6 @@ void stats_initialize() void stats_shutdown() { int n; - stats_event_t *event, *next; if(!_stats_running) /* We can't shutdown if we're not running. */ return; @@ -172,17 +168,17 @@ void stats_shutdown() avl_tree_free(_stats.source_tree, _free_source_stats); avl_tree_free(_stats.global_tree, _free_stats); - event = (stats_event_t *)_global_event_queue; - while(event) { + while (1) + { + stats_event_t *event = _get_event_from_queue (&_global_event_queue); + if (event == NULL) break; if(event->source) free(event->source); if(event->value) free(event->value); if(event->name) free(event->name); - next = event->next; free(event); - event = next; } } @@ -537,7 +533,7 @@ static void process_source_event (stats_event_t *event) void stats_event_time (const char *mount, const char *name) { time_t now = time(NULL); - struct tm local; + struct tm local; char buffer[100]; localtime_r (&now, &local); @@ -571,11 +567,10 @@ static void *_stats_thread(void *arg) INFO0 ("stats thread started"); while (_stats_running) { - if (_global_event_queue != NULL) { + if (_global_event_queue.head != NULL) { /* grab the next event from the queue */ thread_mutex_lock(&_global_event_mutex); - event = (stats_event_t *)_global_event_queue; - _global_event_queue = event->next; + event = _get_event_from_queue (&_global_event_queue); thread_mutex_unlock(&_global_event_mutex); event->next = NULL; @@ -593,9 +588,9 @@ static void *_stats_thread(void *arg) listener = (event_listener_t *)_event_listeners; while (listener) { copy = _copy_event(event); - thread_mutex_lock(listener->mutex); - _add_event_to_queue(copy, listener->queue); - thread_mutex_unlock(listener->mutex); + thread_mutex_lock (&listener->mutex); + _add_event_to_queue (copy, &listener->queue); + thread_mutex_unlock (&listener->mutex); listener = listener->next; } @@ -614,16 +609,15 @@ static void *_stats_thread(void *arg) } /* you must have the _stats_mutex locked here */ -static void _unregister_listener(stats_event_t **queue) +static void _unregister_listener(event_listener_t *listener) { event_listener_t **prev = (event_listener_t **)&_event_listeners, *current = *prev; while (current) { - if (current->queue == queue) + if (current == listener) { *prev = current->next; - free (current); break; } prev = ¤t->next; @@ -632,25 +626,6 @@ static void _unregister_listener(stats_event_t **queue) } -/* you must have the _stats_mutex locked here */ -static void _register_listener(stats_event_t **queue, mutex_t *mutex) -{ - event_listener_t *node; - event_listener_t *evli = (event_listener_t *)malloc(sizeof(event_listener_t)); - - evli->queue = queue; - evli->mutex = mutex; - evli->next = NULL; - - if (_event_listeners == NULL) { - _event_listeners = evli; - } else { - node = (event_listener_t *)_event_listeners; - while (node->next) node = node->next; - node->next = evli; - } -} - static stats_event_t *_make_event_from_node(stats_node_t *node, char *source) { stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t)); @@ -668,35 +643,32 @@ static stats_event_t *_make_event_from_node(stats_node_t *node, char *source) return event; } -static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue) -{ - stats_event_t *node; - if (*queue == NULL) { - *queue = event; - } else { - node = *queue; - while (node->next) node = node->next; - node->next = event; - } +static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue) +{ + *queue->tail = event; + queue->tail = (volatile stats_event_t **)&event->next; } -static stats_event_t *_get_event_from_queue(stats_event_t **queue) + +static stats_event_t *_get_event_from_queue (event_queue_t *queue) { - stats_event_t *event; + stats_event_t *event = NULL; - if (*queue == NULL) return NULL; - - event = *queue; - *queue = (*queue)->next; - event->next = NULL; + if (queue && queue->head) + { + event = (stats_event_t *)queue->head; + queue->head = event->next; + if (queue->head == NULL) + queue->tail = &queue->head; + } return event; } static int _send_event_to_client(stats_event_t *event, client_t *client) { - int ret = -1, len; + int len; char buf [200]; /* send data to the client!!!! */ @@ -705,12 +677,15 @@ static int _send_event_to_client(stats_event_t *event, client_t *client) event->name ? event->name : "null", event->value ? event->value : "null"); if (len > 0 && len < (int)sizeof (buf)) - ret = client_send_bytes (client, buf, len); - - return (ret == -1) ? 0 : 1; + { + client_send_bytes (client, buf, len); + if (client->con->error) + return -1; + } + return 0; } -void _dump_stats_to_queue(stats_event_t **queue) +void _dump_stats_to_queue (event_queue_t *queue) { avl_node *node; avl_node *node2; @@ -750,7 +725,7 @@ void _dump_stats_to_queue(stats_event_t **queue) ** the queue for all new events atomically. ** note: mutex must already be created! */ -static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex) +static void _register_listener (event_listener_t *listener) { avl_node *node; avl_node *node2; @@ -765,7 +740,7 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex) node = avl_get_first(_stats.global_tree); while (node) { event = _make_event_from_node((stats_node_t *)node->key, NULL); - _add_event_to_queue(event, queue); + _add_event_to_queue (event, &listener->queue); node = avl_get_next(node); } @@ -777,7 +752,7 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex) node2 = avl_get_first(source->stats_tree); while (node2) { event = _make_event_from_node((stats_node_t *)node2->key, source->source); - _add_event_to_queue(event, queue); + _add_event_to_queue (event, &listener->queue); node2 = avl_get_next(node2); } @@ -786,7 +761,8 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex) } /* now we register to receive future event notices */ - _register_listener(queue, mutex); + listener->next = (event_listener_t *)_event_listeners; + _event_listeners = listener; thread_mutex_unlock(&_stats_mutex); } @@ -794,48 +770,44 @@ static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex) void *stats_connection(void *arg) { client_t *client = (client_t *)arg; - stats_event_t *local_event_queue = NULL; - mutex_t local_event_mutex; stats_event_t *event; + event_listener_t listener; INFO0 ("stats client starting"); + event_queue_init (&listener.queue); /* increment the thread count */ thread_mutex_lock(&_stats_mutex); _stats_threads++; stats_event_args (NULL, "stats", "%d", _stats_threads); thread_mutex_unlock(&_stats_mutex); - thread_mutex_create(&local_event_mutex); + thread_mutex_create (&(listener.mutex)); - _atomic_get_and_register(&local_event_queue, &local_event_mutex); + _register_listener (&listener); while (_stats_running) { - thread_mutex_lock(&local_event_mutex); - event = _get_event_from_queue(&local_event_queue); + thread_mutex_lock (&listener.mutex); + event = _get_event_from_queue (&listener.queue); + thread_mutex_unlock (&listener.mutex); if (event != NULL) { - if (!_send_event_to_client(event, client)) { + if (_send_event_to_client(event, client) < 0) { _free_event(event); - thread_mutex_unlock(&local_event_mutex); break; } _free_event(event); - } else { - thread_mutex_unlock(&local_event_mutex); - thread_sleep (500000); continue; } - - thread_mutex_unlock(&local_event_mutex); + thread_sleep (500000); } thread_mutex_lock(&_stats_mutex); - _unregister_listener (&local_event_queue); + _unregister_listener (&listener); _stats_threads--; stats_event_args (NULL, "stats", "%d", _stats_threads); thread_mutex_unlock(&_stats_mutex); - thread_mutex_destroy(&local_event_mutex); + thread_mutex_destroy (&listener.mutex); client_destroy (client); INFO0 ("stats client finished"); @@ -916,19 +888,18 @@ void stats_transform_xslt(client_t *client, const char *uri) void stats_get_xml(xmlDocPtr *doc, int show_hidden) { stats_event_t *event; - stats_event_t *queue; + event_queue_t queue; xmlNodePtr node, srcnode; source_xml_t *src_nodes = NULL; source_xml_t *next; - queue = NULL; - _dump_stats_to_queue(&queue); + event_queue_init (&queue); + _dump_stats_to_queue (&queue); *doc = xmlNewDoc("1.0"); node = xmlNewDocNode(*doc, NULL, "icestats", NULL); xmlDocSetRootElement(*doc, node); - event = _get_event_from_queue(&queue); while (event) { @@ -961,7 +932,7 @@ void stats_sendxml(client_t *client) { int bytes; stats_event_t *event; - stats_event_t *queue; + event_queue_t queue; xmlDocPtr doc; xmlNodePtr node, srcnode; int len; @@ -969,8 +940,8 @@ void stats_sendxml(client_t *client) source_xml_t *snd; source_xml_t *src_nodes = NULL; - queue = NULL; - _dump_stats_to_queue(&queue); + event_queue_init (&queue); + _dump_stats_to_queue (&queue); doc = xmlNewDoc("1.0"); node = xmlNewDocNode(doc, NULL, "icestats", NULL);