mirror of
https://gitlab.xiph.org/xiph/icecast-server.git
synced 2024-12-04 14:46:30 -05:00
The _inc/_dec routines can race causing incorrect values as they don't account
for unprocessed stat events. Here I push the actual calculations to the stats thread. The API is maintained however all stats for a specific source can be dropped with one call now. svn path=/icecast/trunk/icecast/; revision=8090
This commit is contained in:
parent
80561957f0
commit
7b9b8e70a3
@ -745,7 +745,7 @@ static void source_shutdown (source_t *source)
|
|||||||
|
|
||||||
/* delete this sources stats */
|
/* delete this sources stats */
|
||||||
stats_event_dec(NULL, "sources");
|
stats_event_dec(NULL, "sources");
|
||||||
stats_event(source->mount, "listeners", NULL);
|
stats_event(source->mount, NULL, NULL);
|
||||||
|
|
||||||
/* we don't remove the source from the tree here, it may be a relay and
|
/* we don't remove the source from the tree here, it may be a relay and
|
||||||
therefore reserved */
|
therefore reserved */
|
||||||
|
423
src/stats.c
423
src/stats.c
@ -35,11 +35,20 @@
|
|||||||
#include "client.h"
|
#include "client.h"
|
||||||
#include "stats.h"
|
#include "stats.h"
|
||||||
#include "xslt.h"
|
#include "xslt.h"
|
||||||
|
#define CATMODULE "stats"
|
||||||
|
#include "logging.h"
|
||||||
|
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
#define vsnprintf _vsnprintf
|
#define vsnprintf _vsnprintf
|
||||||
|
#define snprintf _snprintf
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define STATS_EVENT_SET 0
|
||||||
|
#define STATS_EVENT_INC 1
|
||||||
|
#define STATS_EVENT_DEC 2
|
||||||
|
#define STATS_EVENT_ADD 3
|
||||||
|
#define STATS_EVENT_REMOVE 4
|
||||||
|
|
||||||
typedef struct _event_listener_tag
|
typedef struct _event_listener_tag
|
||||||
{
|
{
|
||||||
stats_event_t **queue;
|
stats_event_t **queue;
|
||||||
@ -48,20 +57,17 @@ typedef struct _event_listener_tag
|
|||||||
struct _event_listener_tag *next;
|
struct _event_listener_tag *next;
|
||||||
} event_listener_t;
|
} event_listener_t;
|
||||||
|
|
||||||
int _stats_running = 0;
|
static volatile int _stats_running = 0;
|
||||||
thread_type *_stats_thread_id;
|
static thread_type *_stats_thread_id;
|
||||||
int _stats_threads = 0;
|
static volatile int _stats_threads = 0;
|
||||||
|
|
||||||
stats_t _stats;
|
static stats_t _stats;
|
||||||
mutex_t _stats_mutex;
|
static mutex_t _stats_mutex;
|
||||||
|
|
||||||
stats_event_t *_global_event_queue;
|
static volatile stats_event_t *_global_event_queue;
|
||||||
mutex_t _global_event_mutex;
|
mutex_t _global_event_mutex;
|
||||||
|
|
||||||
cond_t _event_signal_cond;
|
static volatile event_listener_t *_event_listeners;
|
||||||
|
|
||||||
event_listener_t *_event_listeners;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static void *_stats_thread(void *arg);
|
static void *_stats_thread(void *arg);
|
||||||
@ -74,6 +80,46 @@ static stats_node_t *_find_node(avl_tree *tree, char *name);
|
|||||||
static stats_source_t *_find_source(avl_tree *tree, char *source);
|
static stats_source_t *_find_source(avl_tree *tree, char *source);
|
||||||
static void _free_event(stats_event_t *event);
|
static void _free_event(stats_event_t *event);
|
||||||
|
|
||||||
|
|
||||||
|
/* simple helper function for creating an event */
|
||||||
|
static stats_event_t *build_event (const char *source, const char *name, const char *value)
|
||||||
|
{
|
||||||
|
stats_event_t *event;
|
||||||
|
|
||||||
|
event = (stats_event_t *)calloc(1, sizeof(stats_event_t));
|
||||||
|
if (event)
|
||||||
|
{
|
||||||
|
if (source)
|
||||||
|
event->source = (char *)strdup(source);
|
||||||
|
if (name)
|
||||||
|
event->name = (char *)strdup(name);
|
||||||
|
if (value)
|
||||||
|
event->value = (char *)strdup(value);
|
||||||
|
else
|
||||||
|
event->action = STATS_EVENT_REMOVE;
|
||||||
|
}
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void queue_global_event (stats_event_t *event)
|
||||||
|
{
|
||||||
|
thread_mutex_lock(&_global_event_mutex);
|
||||||
|
if (_global_event_queue == NULL)
|
||||||
|
{
|
||||||
|
_global_event_queue = event;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
stats_event_t *node = (stats_event_t *)_global_event_queue;
|
||||||
|
while (node->next)
|
||||||
|
node = node->next;
|
||||||
|
node->next = event;
|
||||||
|
}
|
||||||
|
/* DEBUG3("event added (%s, %s, %s)", event->source,
|
||||||
|
event->name, event->value); */
|
||||||
|
thread_mutex_unlock(&_global_event_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
void stats_initialize()
|
void stats_initialize()
|
||||||
{
|
{
|
||||||
_event_listeners = NULL;
|
_event_listeners = NULL;
|
||||||
@ -85,9 +131,6 @@ void stats_initialize()
|
|||||||
/* set up global mutex */
|
/* set up global mutex */
|
||||||
thread_mutex_create(&_stats_mutex);
|
thread_mutex_create(&_stats_mutex);
|
||||||
|
|
||||||
/* set up event signaler */
|
|
||||||
thread_cond_create(&_event_signal_cond);
|
|
||||||
|
|
||||||
/* set up stats queues */
|
/* set up stats queues */
|
||||||
_global_event_queue = NULL;
|
_global_event_queue = NULL;
|
||||||
thread_mutex_create(&_global_event_mutex);
|
thread_mutex_create(&_global_event_mutex);
|
||||||
@ -116,19 +159,18 @@ void stats_shutdown()
|
|||||||
n = _stats_threads;
|
n = _stats_threads;
|
||||||
thread_mutex_unlock(&_stats_mutex);
|
thread_mutex_unlock(&_stats_mutex);
|
||||||
} while (n > 0);
|
} while (n > 0);
|
||||||
|
INFO0("stats thread finished");
|
||||||
|
|
||||||
/* free the queues */
|
/* free the queues */
|
||||||
|
|
||||||
/* destroy the queue mutexes */
|
/* destroy the queue mutexes */
|
||||||
thread_mutex_destroy(&_global_event_mutex);
|
thread_mutex_destroy(&_global_event_mutex);
|
||||||
|
|
||||||
/* tear it all down */
|
|
||||||
thread_cond_destroy(&_event_signal_cond);
|
|
||||||
thread_mutex_destroy(&_stats_mutex);
|
thread_mutex_destroy(&_stats_mutex);
|
||||||
avl_tree_free(_stats.source_tree, _free_source_stats);
|
avl_tree_free(_stats.source_tree, _free_source_stats);
|
||||||
avl_tree_free(_stats.global_tree, _free_stats);
|
avl_tree_free(_stats.global_tree, _free_stats);
|
||||||
|
|
||||||
event = _global_event_queue;
|
event = (stats_event_t *)_global_event_queue;
|
||||||
while(event) {
|
while(event) {
|
||||||
if(event->source)
|
if(event->source)
|
||||||
free(event->source);
|
free(event->source);
|
||||||
@ -155,43 +197,36 @@ stats_t *stats_get_stats()
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void stats_event(char *source, char *name, char *value)
|
/* simple name=tag stat create/update */
|
||||||
|
void stats_event(const char *source, const char *name, const char *value)
|
||||||
{
|
{
|
||||||
stats_event_t *node;
|
|
||||||
stats_event_t *event;
|
stats_event_t *event;
|
||||||
|
|
||||||
if (name == NULL || strcmp(name, "") == 0) return;
|
event = build_event (source, name, value);
|
||||||
|
if (event)
|
||||||
/* build event */
|
queue_global_event (event);
|
||||||
event = (stats_event_t *)malloc(sizeof(stats_event_t));
|
|
||||||
event->source = NULL;
|
|
||||||
if (source != NULL) event->source = (char *)strdup(source);
|
|
||||||
event->name = (char *)strdup(name);
|
|
||||||
event->value = NULL;
|
|
||||||
event->next = NULL;
|
|
||||||
if (value != NULL) event->value = (char *)strdup(value);
|
|
||||||
|
|
||||||
/* queue event */
|
|
||||||
thread_mutex_lock(&_global_event_mutex);
|
|
||||||
if (_global_event_queue == NULL) {
|
|
||||||
_global_event_queue = event;
|
|
||||||
} else {
|
|
||||||
node = _global_event_queue;
|
|
||||||
while (node->next) node = node->next;
|
|
||||||
node->next = event;
|
|
||||||
}
|
|
||||||
thread_mutex_unlock(&_global_event_mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void stats_event_args(char *source, char *name, char *format, ...)
|
|
||||||
|
/* printf style formatting for stat create/update */
|
||||||
|
void stats_event_args(const char *source, char *name, char *format, ...)
|
||||||
{
|
{
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
va_list val;
|
va_list val;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
if (name == NULL)
|
||||||
|
return;
|
||||||
va_start(val, format);
|
va_start(val, format);
|
||||||
vsnprintf(buf, 1024, format, val);
|
ret = vsnprintf(buf, 1024, format, val);
|
||||||
va_end(val);
|
va_end(val);
|
||||||
|
|
||||||
|
if (ret < 0 || (unsigned int)ret >= sizeof (buf))
|
||||||
|
{
|
||||||
|
WARN2 ("problem with formatting %s stat %s",
|
||||||
|
source==NULL ? "global" : source, name);
|
||||||
|
return;
|
||||||
|
}
|
||||||
stats_event(source, name, buf);
|
stats_event(source, name, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,56 +258,42 @@ char *stats_get_value(char *source, char *name)
|
|||||||
{
|
{
|
||||||
return(_get_stats(source, name));
|
return(_get_stats(source, name));
|
||||||
}
|
}
|
||||||
void stats_event_inc(char *source, char *name)
|
|
||||||
|
/* increase the value in the provided stat by 1 */
|
||||||
|
void stats_event_inc(const char *source, const char *name)
|
||||||
{
|
{
|
||||||
char *old_value;
|
stats_event_t *event = build_event (source, name, NULL);
|
||||||
int new_value;
|
/* DEBUG2("%s on %s", name, source==NULL?"global":source); */
|
||||||
|
if (event)
|
||||||
old_value = _get_stats(source, name);
|
{
|
||||||
if (old_value != NULL) {
|
event->action = STATS_EVENT_INC;
|
||||||
new_value = atoi(old_value);
|
queue_global_event (event);
|
||||||
free(old_value);
|
|
||||||
new_value++;
|
|
||||||
} else {
|
|
||||||
new_value = 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stats_event_args(source, name, "%d", new_value);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void stats_event_add(char *source, char *name, unsigned long value)
|
void stats_event_add(const char *source, const char *name, unsigned long value)
|
||||||
{
|
{
|
||||||
char *old_value;
|
stats_event_t *event = build_event (source, name, NULL);
|
||||||
unsigned long new_value;
|
/* DEBUG2("%s on %s", name, source==NULL?"global":source); */
|
||||||
|
if (event)
|
||||||
old_value = _get_stats(source, name);
|
{
|
||||||
if (old_value != NULL) {
|
event->value = malloc (16);
|
||||||
new_value = atol(old_value);
|
snprintf (event->value, 16, "%ld", value);
|
||||||
free(old_value);
|
event->action = STATS_EVENT_ADD;
|
||||||
new_value += value;
|
queue_global_event (event);
|
||||||
} else {
|
|
||||||
new_value = value;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stats_event_args(source, name, "%ld", new_value);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void stats_event_dec(char *source, char *name)
|
/* decrease the value in the provided stat by 1 */
|
||||||
|
void stats_event_dec(const char *source, const char *name)
|
||||||
{
|
{
|
||||||
char *old_value;
|
/* DEBUG2("%s on %s", name, source==NULL?"global":source); */
|
||||||
int new_value;
|
stats_event_t *event = build_event (source, name, NULL);
|
||||||
|
if (event)
|
||||||
old_value = _get_stats(source, name);
|
{
|
||||||
if (old_value != NULL) {
|
event->action = STATS_EVENT_DEC;
|
||||||
new_value = atoi(old_value);
|
queue_global_event (event);
|
||||||
free(old_value);
|
|
||||||
new_value--;
|
|
||||||
if (new_value < 0) new_value = 0;
|
|
||||||
} else {
|
|
||||||
new_value = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stats_event_args(source, name, "%d", new_value);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* note: you must call this function only when you have exclusive access
|
/* note: you must call this function only when you have exclusive access
|
||||||
@ -330,11 +351,12 @@ static stats_source_t *_find_source(avl_tree *source_tree, char *source)
|
|||||||
|
|
||||||
static stats_event_t *_copy_event(stats_event_t *event)
|
static stats_event_t *_copy_event(stats_event_t *event)
|
||||||
{
|
{
|
||||||
stats_event_t *copy = (stats_event_t *)malloc(sizeof(stats_event_t));
|
stats_event_t *copy = (stats_event_t *)calloc(1, sizeof(stats_event_t));
|
||||||
if (event->source)
|
if (event->source)
|
||||||
copy->source = (char *)strdup(event->source);
|
copy->source = (char *)strdup(event->source);
|
||||||
else
|
else
|
||||||
copy->source = NULL;
|
copy->source = NULL;
|
||||||
|
if (event->name)
|
||||||
copy->name = (char *)strdup(event->name);
|
copy->name = (char *)strdup(event->name);
|
||||||
if (event->value)
|
if (event->value)
|
||||||
copy->value = (char *)strdup(event->value);
|
copy->value = (char *)strdup(event->value);
|
||||||
@ -345,103 +367,167 @@ static stats_event_t *_copy_event(stats_event_t *event)
|
|||||||
return copy;
|
return copy;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *_stats_thread(void *arg)
|
|
||||||
|
/* helper to apply specialised changes to a stats node */
|
||||||
|
static void modify_node_event (stats_node_t *node, stats_event_t *event)
|
||||||
{
|
{
|
||||||
stats_event_t *event;
|
char *str;
|
||||||
stats_event_t *copy;
|
|
||||||
stats_node_t *node;
|
|
||||||
stats_node_t *anode;
|
|
||||||
stats_source_t *snode;
|
|
||||||
stats_source_t *asnode;
|
|
||||||
event_listener_t *listener;
|
|
||||||
avl_node *avlnode;
|
|
||||||
|
|
||||||
while (_stats_running) {
|
if (event->action != STATS_EVENT_SET)
|
||||||
thread_mutex_lock(&_global_event_mutex);
|
{
|
||||||
if (_global_event_queue != NULL) {
|
int value = 0;
|
||||||
/* grab the next event from the queue */
|
|
||||||
event = _global_event_queue;
|
|
||||||
_global_event_queue = event->next;
|
|
||||||
event->next = NULL;
|
|
||||||
thread_mutex_unlock(&_global_event_mutex);
|
|
||||||
|
|
||||||
thread_mutex_lock(&_stats_mutex);
|
switch (event->action)
|
||||||
if (event->source == NULL) {
|
{
|
||||||
/* we have a global event */
|
case STATS_EVENT_INC:
|
||||||
if (event->value != NULL) {
|
value = atoi (node->value)+1;
|
||||||
/* adding/updating */
|
break;
|
||||||
node = _find_node(_stats.global_tree, event->name);
|
case STATS_EVENT_DEC:
|
||||||
if (node == NULL) {
|
value = atoi (node->value)-1;
|
||||||
/* add node */
|
break;
|
||||||
anode = (stats_node_t *)malloc(sizeof(stats_node_t));
|
case STATS_EVENT_ADD:
|
||||||
anode->name = (char *)strdup(event->name);
|
value = atoi (node->value)+atoi (event->value);
|
||||||
anode->value = (char *)strdup(event->value);
|
break;
|
||||||
|
default:
|
||||||
avl_insert(_stats.global_tree, (void *)anode);
|
break;
|
||||||
} else {
|
|
||||||
/* update node */
|
|
||||||
free(node->value);
|
|
||||||
node->value = (char *)strdup(event->value);
|
|
||||||
}
|
}
|
||||||
|
str = malloc (16);
|
||||||
|
snprintf (str, 16, "%d", value);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
str = (char *)strdup (event->value);
|
||||||
|
free (node->value);
|
||||||
|
node->value = str;
|
||||||
|
/* DEBUG3 ("update node %s \"%s\" (%d)", node->name, node->value, event->action); */
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
|
||||||
|
static void process_global_event (stats_event_t *event)
|
||||||
|
{
|
||||||
|
stats_node_t *node;
|
||||||
|
|
||||||
|
/* DEBUG3("global event %s %s %d", event->name, event->value, event->action); */
|
||||||
|
if (event->action == STATS_EVENT_REMOVE)
|
||||||
|
{
|
||||||
/* we're deleting */
|
/* we're deleting */
|
||||||
node = _find_node(_stats.global_tree, event->name);
|
node = _find_node(_stats.global_tree, event->name);
|
||||||
if (node != NULL)
|
if (node != NULL)
|
||||||
avl_delete(_stats.global_tree, (void *)node, _free_stats);
|
avl_delete(_stats.global_tree, (void *)node, _free_stats);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
node = _find_node(_stats.global_tree, event->name);
|
||||||
/* we have a source event */
|
if (node)
|
||||||
|
{
|
||||||
snode = _find_source(_stats.source_tree, event->source);
|
modify_node_event (node, event);
|
||||||
if (snode != NULL) {
|
}
|
||||||
/* this is a source we already have a tree for */
|
else
|
||||||
if (event->value != NULL) {
|
{
|
||||||
/* we're add/updating */
|
/* add node */
|
||||||
node = _find_node(snode->stats_tree, event->name);
|
node = (stats_node_t *)calloc(1, sizeof(stats_node_t));
|
||||||
if (node == NULL) {
|
node->name = (char *)strdup(event->name);
|
||||||
/* adding node */
|
|
||||||
anode = (stats_node_t *)malloc(sizeof(stats_node_t));
|
|
||||||
anode->name = (char *)strdup(event->name);
|
|
||||||
anode->value = (char *)strdup(event->value);
|
|
||||||
|
|
||||||
avl_insert(snode->stats_tree, (void *)anode);
|
|
||||||
} else {
|
|
||||||
/* updating node */
|
|
||||||
free(node->value);
|
|
||||||
node->value = (char *)strdup(event->value);
|
node->value = (char *)strdup(event->value);
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* we're deleting */
|
|
||||||
node = _find_node(snode->stats_tree, event->name);
|
|
||||||
if (node != NULL) {
|
|
||||||
avl_delete(snode->stats_tree, (void *)node, _free_stats);
|
|
||||||
|
|
||||||
avlnode = avl_get_first(snode->stats_tree);
|
avl_insert(_stats.global_tree, (void *)node);
|
||||||
if (avlnode == NULL) {
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void process_source_event (stats_event_t *event)
|
||||||
|
{
|
||||||
|
stats_source_t *snode = _find_source(_stats.source_tree, event->source);
|
||||||
|
if (snode == NULL)
|
||||||
|
{
|
||||||
|
if (event->action == STATS_EVENT_REMOVE)
|
||||||
|
return;
|
||||||
|
snode = (stats_source_t *)calloc(1,sizeof(stats_source_t));
|
||||||
|
if (snode == NULL)
|
||||||
|
return;
|
||||||
|
DEBUG1 ("new source stat %s", event->source);
|
||||||
|
snode->source = (char *)strdup(event->source);
|
||||||
|
snode->stats_tree = avl_tree_new(_compare_stats, NULL);
|
||||||
|
|
||||||
|
avl_insert(_stats.source_tree, (void *)snode);
|
||||||
|
}
|
||||||
|
if (event->name)
|
||||||
|
{
|
||||||
|
stats_node_t *node = _find_node(snode->stats_tree, event->name);
|
||||||
|
if (node == NULL)
|
||||||
|
{
|
||||||
|
if (event->action == STATS_EVENT_REMOVE)
|
||||||
|
return;
|
||||||
|
/* adding node */
|
||||||
|
if (event->value)
|
||||||
|
{
|
||||||
|
DEBUG2 ("new node %s (%s)", event->name, event->value);
|
||||||
|
node = (stats_node_t *)calloc(1,sizeof(stats_node_t));
|
||||||
|
node->name = (char *)strdup(event->name);
|
||||||
|
node->value = (char *)strdup(event->value);
|
||||||
|
|
||||||
|
avl_insert(snode->stats_tree, (void *)node);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (event->action == STATS_EVENT_REMOVE)
|
||||||
|
{
|
||||||
|
DEBUG1 ("delete node %s", event->name);
|
||||||
|
avl_delete(snode->stats_tree, (void *)node, _free_stats);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
modify_node_event (node, event);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (event->action == STATS_EVENT_REMOVE)
|
||||||
|
{
|
||||||
|
DEBUG1 ("delete source node %s", event->source);
|
||||||
avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
|
avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* this is a new source */
|
|
||||||
asnode = (stats_source_t *)malloc(sizeof(stats_source_t));
|
|
||||||
asnode->source = (char *)strdup(event->source);
|
|
||||||
asnode->stats_tree = avl_tree_new(_compare_stats, NULL);
|
|
||||||
|
|
||||||
anode = (stats_node_t *)malloc(sizeof(stats_node_t));
|
|
||||||
anode->name = (char *)strdup(event->name);
|
|
||||||
anode->value = (char *)strdup(event->value);
|
|
||||||
|
|
||||||
avl_insert(asnode->stats_tree, (void *)anode);
|
static void *_stats_thread(void *arg)
|
||||||
|
{
|
||||||
|
stats_event_t *event;
|
||||||
|
stats_event_t *copy;
|
||||||
|
event_listener_t *listener;
|
||||||
|
|
||||||
avl_insert(_stats.source_tree, (void *)asnode);
|
stats_event (NULL, "server", ICECAST_VERSION_STRING);
|
||||||
}
|
|
||||||
}
|
/* global currently active stats */
|
||||||
|
stats_event (NULL, "clients", "0");
|
||||||
|
stats_event (NULL, "connections", "0");
|
||||||
|
stats_event (NULL, "sources", "0");
|
||||||
|
stats_event (NULL, "stats", "0");
|
||||||
|
|
||||||
|
/* global accumulating stats */
|
||||||
|
stats_event (NULL, "client_connections", "0");
|
||||||
|
stats_event (NULL, "source_client_connections", "0");
|
||||||
|
stats_event (NULL, "source_relay_connections", "0");
|
||||||
|
stats_event (NULL, "source_total_connections", "0");
|
||||||
|
stats_event (NULL, "stats_connections", "0");
|
||||||
|
|
||||||
|
INFO0 ("stats thread started");
|
||||||
|
while (_stats_running) {
|
||||||
|
if (_global_event_queue != NULL) {
|
||||||
|
/* grab the next event from the queue */
|
||||||
|
thread_mutex_lock(&_global_event_mutex);
|
||||||
|
event = (stats_event_t *)_global_event_queue;
|
||||||
|
_global_event_queue = event->next;
|
||||||
|
thread_mutex_unlock(&_global_event_mutex);
|
||||||
|
|
||||||
|
event->next = NULL;
|
||||||
|
thread_mutex_unlock(&_global_event_mutex);
|
||||||
|
|
||||||
|
thread_mutex_lock(&_stats_mutex);
|
||||||
|
|
||||||
|
/* check if we are dealing with a global or source event */
|
||||||
|
if (event->source == NULL)
|
||||||
|
process_global_event (event);
|
||||||
|
else
|
||||||
|
process_source_event (event);
|
||||||
|
|
||||||
/* now we have an event that's been processed into the running stats */
|
/* now we have an event that's been processed into the running stats */
|
||||||
/* this event should get copied to event listeners' queues */
|
/* this event should get copied to event listeners' queues */
|
||||||
listener = _event_listeners;
|
listener = (event_listener_t *)_event_listeners;
|
||||||
while (listener) {
|
while (listener) {
|
||||||
copy = _copy_event(event);
|
copy = _copy_event(event);
|
||||||
thread_mutex_lock(listener->mutex);
|
thread_mutex_lock(listener->mutex);
|
||||||
@ -450,23 +536,17 @@ static void *_stats_thread(void *arg)
|
|||||||
|
|
||||||
listener = listener->next;
|
listener = listener->next;
|
||||||
}
|
}
|
||||||
thread_cond_broadcast(&_event_signal_cond);
|
|
||||||
|
|
||||||
/* now we need to destroy the event */
|
/* now we need to destroy the event */
|
||||||
_free_event(event);
|
_free_event(event);
|
||||||
|
|
||||||
thread_mutex_unlock(&_stats_mutex);
|
thread_mutex_unlock(&_stats_mutex);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
|
||||||
thread_mutex_unlock(&_global_event_mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
thread_sleep(300000);
|
thread_sleep(300000);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* wake the other threads so they can shut down cleanly */
|
|
||||||
thread_cond_broadcast(&_event_signal_cond);
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -483,7 +563,7 @@ static void _register_listener(stats_event_t **queue, mutex_t *mutex)
|
|||||||
if (_event_listeners == NULL) {
|
if (_event_listeners == NULL) {
|
||||||
_event_listeners = evli;
|
_event_listeners = evli;
|
||||||
} else {
|
} else {
|
||||||
node = _event_listeners;
|
node = (event_listener_t *)_event_listeners;
|
||||||
while (node->next) node = node->next;
|
while (node->next) node = node->next;
|
||||||
node->next = evli;
|
node->next = evli;
|
||||||
}
|
}
|
||||||
@ -499,6 +579,7 @@ static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
|
|||||||
event->source = NULL;
|
event->source = NULL;
|
||||||
event->name = (char *)strdup(node->name);
|
event->name = (char *)strdup(node->name);
|
||||||
event->value = (char *)strdup(node->value);
|
event->value = (char *)strdup(node->value);
|
||||||
|
event->action = STATS_EVENT_SET;
|
||||||
event->next = NULL;
|
event->next = NULL;
|
||||||
|
|
||||||
return event;
|
return event;
|
||||||
@ -535,7 +616,10 @@ static int _send_event_to_client(stats_event_t *event, connection_t *con)
|
|||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
/* send data to the client!!!! */
|
/* send data to the client!!!! */
|
||||||
ret = sock_write(con->sock, "EVENT %s %s %s\n", (event->source != NULL) ? event->source : "global", event->name, event->value ? event->value : "null");
|
ret = sock_write(con->sock, "EVENT %s %s %s\n",
|
||||||
|
(event->source != NULL) ? event->source : "global",
|
||||||
|
event->name ? event->name : "null",
|
||||||
|
event->value ? event->value : "null");
|
||||||
|
|
||||||
return (ret == -1) ? 0 : 1;
|
return (ret == -1) ? 0 : 1;
|
||||||
}
|
}
|
||||||
@ -649,7 +733,7 @@ void *stats_connection(void *arg)
|
|||||||
_free_event(event);
|
_free_event(event);
|
||||||
} else {
|
} else {
|
||||||
thread_mutex_unlock(&local_event_mutex);
|
thread_mutex_unlock(&local_event_mutex);
|
||||||
thread_cond_wait(&_event_signal_cond);
|
thread_sleep (500000);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -806,8 +890,7 @@ void stats_sendxml(client_t *client)
|
|||||||
if (bytes > 0) client->con->sent_bytes += bytes;
|
if (bytes > 0) client->con->sent_bytes += bytes;
|
||||||
else goto send_error;
|
else goto send_error;
|
||||||
|
|
||||||
bytes = sock_write_bytes(client->con->sock, buff, len);
|
bytes = client_send_bytes (client, buff, (unsigned)len);
|
||||||
if (bytes > 0) client->con->sent_bytes += bytes;
|
|
||||||
|
|
||||||
send_error:
|
send_error:
|
||||||
while (src_nodes) {
|
while (src_nodes) {
|
||||||
|
11
src/stats.h
11
src/stats.h
@ -38,6 +38,7 @@ typedef struct _stats_event_tag
|
|||||||
char *source;
|
char *source;
|
||||||
char *name;
|
char *name;
|
||||||
char *value;
|
char *value;
|
||||||
|
int action;
|
||||||
|
|
||||||
struct _stats_event_tag *next;
|
struct _stats_event_tag *next;
|
||||||
} stats_event_t;
|
} stats_event_t;
|
||||||
@ -77,11 +78,11 @@ void stats_shutdown();
|
|||||||
|
|
||||||
stats_t *stats_get_stats();
|
stats_t *stats_get_stats();
|
||||||
|
|
||||||
void stats_event(char *source, char *name, char *value);
|
void stats_event(const char *source, const char *name, const char *value);
|
||||||
void stats_event_args(char *source, char *name, char *format, ...);
|
void stats_event_args(const char *source, char *name, char *format, ...);
|
||||||
void stats_event_inc(char *source, char *name);
|
void stats_event_inc(const char *source, const char *name);
|
||||||
void stats_event_add(char *source, char *name, unsigned long value);
|
void stats_event_add(const char *source, const char *name, unsigned long value);
|
||||||
void stats_event_dec(char *source, char *name);
|
void stats_event_dec(const char *source, const char *name);
|
||||||
|
|
||||||
void *stats_connection(void *arg);
|
void *stats_connection(void *arg);
|
||||||
void *stats_callback(void *arg);
|
void *stats_callback(void *arg);
|
||||||
|
Loading…
Reference in New Issue
Block a user