1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-06-23 06:25:24 +00:00

kh18 update. The important/visible stuff

. memory corruption fix in stats/threading.
. fix for the excessive page flush bug leading to higher bandwidth than expected
with vorbis streams from certain source clients.
. avoid the ::ffff: part from IPv6 within an IP.

The other main change is the YP thread which is started from a dummy client
handle instead of the slave thread. it should now only startup from the worker
if there is work scheduled. Whether the slave thread can reduce wakeups would
need to be investigated.

command auth was updated but the performance results are similar to URL auth.
Other some internal code cleanups are also done. stats seems to be working much
better now.

karl.


svn path=/icecast/branches/kh/icecast/; revision=16691
This commit is contained in:
Karl Heyes 2009-11-05 05:32:02 +00:00
parent ad21bb0b04
commit 9fea71ff77
22 changed files with 636 additions and 281 deletions

17
NEWS
View File

@ -16,6 +16,23 @@ Feature differences from SVN trunk
any extra tags are show in the conf/icecast.xml.dist file
2.3.2-kh18
. for people who use ICE_MUTEX_ABORT (environment var) to trap for long held
locks, change the name to ICE_LOCK_ABORT
. fix possible memory corruption case in stats and threading
. fix vorbis stream bandwidth increase when certain source clients connect
. avoid any ::ffff: in IPs
. YP thread now started from worker as required instead of once per second
. shutdown order and other internal code cleanups
. use signalfd if available for processing signals in connection thread, saves
waking up the thread.
. make sure the auth mountpoint returned is used instead of the listener
supplied one. Allows for redirecting listeners to alternative mounts
. update command auth, works in a similar way to url auth now.
. stats clients work better under much heavier load. stats notifications
are grouped together if possible before sending, this reduces writes in the
many streams case
2.3.2-kh17
. fix possible memory corruption when using multiple workers
. a few minor timing changes, nothing major.

View File

@ -127,7 +127,7 @@
<max-listeners>1</max-listeners>
<max-bandwidth>1000k</max-bandwidth>
<file-seekable>0</file-seekable>
<dump-file>/tmp/dump-example1.ogg</dump-file>
<dump-file>/backup/live-%d-%b.ogg</dump-file>
<burst-size>65536</burst-size>
<fallback-mount>/example2.ogg</fallback-mount>
<fallback-override>1</fallback-override>
@ -150,15 +150,13 @@
<mount>
....
<authentication type="command">
<option name="filename" value="auth_verify"/>
<option name="listener_add" value="auth_verify"/>
</authentication>
or
for url auth, the add url needs to return a "icecast-auth-user: 1" http
header for a user to authenicate. Both urls are sent params via POST,
add is sent id, mount, user, pass, ip, useragent
remove is passed id, mount, user, pass, duration
for url auth, the add url needs to return a "icecast-auth-user:" http
header for a user to authenicate. URLs are sent params via POST.
<authentication type="url">
<option name="username" value="admin"/>

View File

@ -95,7 +95,7 @@
#define PACKAGE_NAME "Icecast"
/* Version number of package */
#define VERSION "2.3.2-kh17"
#define VERSION "2.3.2-kh18"
/* Define to the version of this package. */
#define PACKAGE_VERSION VERSION

View File

@ -1,4 +1,4 @@
AC_INIT([Icecast], [2.3.2-kh17], [karl@xiph.org])
AC_INIT([Icecast], [2.3.2-kh18], [karl@xiph.org])
AC_PREREQ(2.59)
AC_CONFIG_SRCDIR(src/main.c)
@ -33,7 +33,7 @@ dnl Checks for header files.
AC_HEADER_STDC
AC_HEADER_TIME
AC_CHECK_HEADERS([alloca.h fnmatch.h limits.h sys/timeb.h])
AC_CHECK_HEADERS([signal.h fnmatch.h limits.h sys/timeb.h])
AC_CHECK_HEADERS(pwd.h, AC_DEFINE(CHUID, 1, [Define if you have pwd.h]),,)
AC_CHECK_HEADERS(unistd.h, AC_DEFINE(CHROOT, 1, [Define if you have unistd.h]),,)
@ -46,6 +46,9 @@ AC_CHECK_TYPES([struct timespec])
dnl Checks for library functions.
AC_CHECK_FUNCS([localtime_r poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync])
AC_CHECK_TYPES([struct signalfd_siginfo],
[AC_DEFINE(HAVE_SIGNALFD, 1 ,[Define if signalfd exists])], [],
[#include <sys/signalfd.h>])
AC_SEARCH_LIBS(nanosleep, rt posix4,
AC_DEFINE(HAVE_NANOSLEEP, 1, [Define if you have nanosleep]))
AC_SEARCH_LIBS(clock_gettime, rt posix4,

View File

@ -628,7 +628,6 @@ static void add_listener_node (xmlNodePtr srcnode, client_t *listener)
{
const char *useragent;
char buf[30];
source_t *source = listener->shared_data;
xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL);
@ -645,8 +644,11 @@ static void add_listener_node (xmlNodePtr srcnode, client_t *listener)
xmlFree (str);
}
if (listener->flags & CLIENT_ACTIVE)
if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE)
{
source_t *source = listener->shared_data;
snprintf (buf, sizeof (buf), "%ld", (long)(source->client->queue_pos - listener->queue_pos));
}
else
snprintf (buf, sizeof (buf), "0");
xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf));

View File

@ -48,6 +48,8 @@ struct _auth_thread_t
};
static volatile int thread_id;
static rwlock_t auth_lock;
int allow_auth;
static void *auth_run_thread (void *arg);
static int auth_postprocess_listener (auth_client *auth_user);
@ -308,9 +310,10 @@ static void *auth_run_thread (void *arg)
auth_thread_t *handler = arg;
auth_t *auth = handler->auth;
INFO2 ("Authentication thread %d started for %s", handler->id, auth->mount);
DEBUG2 ("Authentication thread %d started for %s", handler->id, auth->mount);
thread_rwlock_rlock (&auth_lock);
while (auth->running)
while (1)
{
/* usually no clients are waiting, so don't bother taking locks */
if (auth->head)
@ -337,7 +340,7 @@ static void *auth_run_thread (void *arg)
auth_user->thread_data = handler->data;
auth_user->handler = handler->id;
if (auth_user->process)
if (allow_auth && auth_user->process)
{
worker_t *worker = NULL;
if (auth_user->client)
@ -351,8 +354,6 @@ static void *auth_run_thread (void *arg)
thread_mutex_unlock (&worker->lock);
}
}
else
ERROR0 ("client auth process not set");
auth_client_free (auth_user);
@ -360,8 +361,9 @@ static void *auth_run_thread (void *arg)
}
break;
}
INFO1 ("Authenication thread %d shutting down", handler->id);
DEBUG1 ("Authenication thread %d shutting down", handler->id);
handler->thread = NULL;
thread_rwlock_unlock (&auth_lock);
return NULL;
}
@ -471,13 +473,12 @@ static int auth_postprocess_listener (auth_client *auth_user)
if ((client->flags & CLIENT_AUTHENTICATED) == 0)
{
/* auth failed so do we place the listener elsewhere */
if (auth_user->rejected_mount)
mount = auth_user->rejected_mount;
else if (auth->rejected_mount)
if (auth->rejected_mount)
mount = auth->rejected_mount;
else
{
client->flags |= CLIENT_ACTIVE;
client_send_401 (client, auth_user->auth->realm);
auth_user->client = NULL;
return -1;
}
}
@ -540,7 +541,7 @@ void auth_add_listener (const char *mount, client_t *client)
{
auth_client *auth_user;
if (mountinfo->auth->pending_count > 1000)
if (mountinfo->auth->running == 0 || mountinfo->auth->pending_count > 1000)
{
config_release_config ();
WARN0 ("too many clients awaiting authentication");
@ -550,7 +551,7 @@ void auth_add_listener (const char *mount, client_t *client)
auth_user = auth_client_setup (mount, client);
auth_user->process = auth_new_listener;
client->flags &= ~CLIENT_ACTIVE;
INFO0 ("adding client for authentication");
DEBUG0 ("adding client for authentication");
queue_auth_client (auth_user, mountinfo);
}
else
@ -816,10 +817,14 @@ int auth_check_source (client_t *client, const char *mount)
void auth_initialise (void)
{
thread_id = 0;
allow_auth = 1;
}
void auth_shutdown (void)
{
INFO0 ("Auth shutdown");
allow_auth = 0;
thread_rwlock_wlock (&auth_lock);
thread_rwlock_unlock (&auth_lock);
INFO0 ("Auth shutdown complete");
}

View File

@ -46,7 +46,6 @@ typedef struct auth_client_tag
int handler;
client_t *client;
struct auth_tag *auth;
char *rejected_mount;
void *thread_data;
void (*process)(struct auth_client_tag *auth_user);
struct auth_client_tag *next;

View File

@ -32,6 +32,12 @@
#ifndef WIN32
#include <sys/wait.h>
#endif
#ifdef HAVE_POLL
#include <poll.h>
#endif
#ifdef HAVE_SIGNAL_H
#include <signal.h>
#endif
#include "auth.h"
#include "source.h"
@ -43,62 +49,220 @@
#define CATMODULE "auth_cmd"
typedef struct {
char *filename;
char *listener_add;
char *listener_remove;
} auth_cmd;
static void cmd_clear(auth_t *self)
{
auth_cmd *cmd = self->state;
free(cmd->filename);
free (cmd->listener_add);
free (cmd->listener_remove);
free(cmd);
}
static void process_header (const char *p, auth_client *auth_user)
{
client_t *client = auth_user->client;
if (strncasecmp (p, "Mountpoint: ",12) == 0)
{
char *new_mount = strdup (p+12);
if (new_mount)
{
free (auth_user->mount);
auth_user->mount = new_mount;
}
return;
}
if (strncasecmp (p, "icecast-auth-user: ", 19) == 0)
{
if (strcmp (p+19, "withintro") == 0)
client->flags |= CLIENT_AUTHENTICATED|CLIENT_HAS_INTRO_CONTENT;
else if (strcmp (p+19, "1") == 0)
client->flags |= CLIENT_AUTHENTICATED;
return;
}
if (strncasecmp (p, "icecast-auth-timelimit: ", 24) == 0)
{
unsigned limit;
sscanf (p+24, "%u", &limit);
client->connection.discon_time = time(NULL) + limit;
}
}
static void process_body (int fd, pid_t pid, auth_client *auth_user)
{
client_t *client = auth_user->client;
if (client->flags & CLIENT_HAS_INTRO_CONTENT)
{
refbuf_t *head = client->refbuf, *r = head->next;
client_t *client = auth_user->client;
head->next = NULL;
DEBUG0 ("Have intro content from command");
while (1)
{
int ret;
unsigned remaining = 4096 - r->len;
char *buf = r->data + r->len;
#if HAVE_POLL
struct pollfd response;
response.fd = fd;
response.events = POLLIN;
response.revents = 0;
ret = poll (&response, 1, 1000);
if (ret == 0)
{
kill (pid, SIGTERM);
WARN1 ("command timeout triggered for %s", auth_user->mount);
return;
}
if (ret < 0)
continue;
#endif
ret = read (fd, buf, remaining);
if (ret > 0)
{
r->len += ret;
if (r->len == 4096)
{
head->next = r;
head = r;
r = refbuf_new (4096);
r->len = 0;
}
continue;
}
break;
}
if (r->len)
head->next = r;
else
refbuf_release (r);
if (client->refbuf->next == NULL)
client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
}
}
static void get_response (int fd, auth_client *auth_user, pid_t pid)
{
client_t *client = auth_user->client;
refbuf_t *r = client->refbuf;
char *buf = r->data, *blankline;
unsigned remaining = 4095; /* leave a nul char at least */
int ret;
memset (r->data, 0, remaining+1);
while (remaining)
{
#if HAVE_POLL
struct pollfd response;
response.fd = fd;
response.events = POLLIN;
response.revents = 0;
ret = poll (&response, 1, 1000);
if (ret == 0)
{
kill (pid, SIGTERM);
WARN1 ("command timeout triggered for %s", auth_user->mount);
return;
}
if (ret < 0)
continue;
#endif
ret = read (fd, buf, remaining);
if (ret <= 0)
break;
blankline = strstr (r->data, "\n\n");
if (blankline)
{
char *p = r->data;
do {
char *nl = strchr (p, '\n');
*nl = '\0';
process_header (p, auth_user);
p = nl+1;
} while (*p != '\n');
if (client->flags & CLIENT_HAS_INTRO_CONTENT)
{
r->len = (buf+ret) - (blankline + 2);
if (r->len)
memmove (r->data, blankline+2, r->len);
client->refbuf = refbuf_new (4096);
client->refbuf->next = r;
}
process_body (fd, pid, auth_user);
return;
}
buf += ret;
remaining -= ret;
}
return;
}
static auth_result auth_cmd_client (auth_client *auth_user)
{
int fd[2];
int infd[2], outfd[2];
pid_t pid;
client_t *client = auth_user->client;
auth_t *auth = auth_user->auth;
auth_cmd *cmd = auth->state;
int status, len;
const char *qargs;
char str[512];
if (client->username == NULL || client->password == NULL)
if (auth->running == 0)
return AUTH_FAILED;
if (pipe (fd) == 0)
if (pipe (infd) < 0 || pipe (outfd) < 0)
{
pid = fork();
switch (pid)
{
case 0: /* child */
dup2 (fd[0], fileno(stdin));
close (fd[0]);
close (fd[1]);
execl (cmd->filename, cmd->filename, NULL);
exit (EXIT_FAILURE);
case -1:
ERROR1 ("pipe failed code %d", errno);
return AUTH_FAILED;
}
pid = fork();
switch (pid)
{
case 0: /* child */
dup2 (outfd[0], 0);
dup2 (infd[1], 1);
close (outfd[0]);
close (infd[1]);
if (execl (cmd->listener_add, cmd->listener_add, NULL) < 0)
ERROR1 ("unable to exec command \"%s\"", cmd->listener_add);
exit (-1);
case -1:
break;
default: /* parent */
close (outfd[0]);
close (infd[1]);
qargs = httpp_getvar (client->parser, HTTPP_VAR_QUERYARGS);
len = snprintf (str, sizeof(str),
"Mountpoint: %s%s\n"
"User: %s\n"
"Pass: %s\n"
"IP: %s\n"
"Agent: %s\n\n"
, auth_user->mount, qargs ? qargs : "",
client->username ? client->username : "",
client->password ? client->password : "",
client->connection.ip,
httpp_getvar (client->parser, "user-agent"));
write (outfd[1], str, len);
close (outfd[1]);
get_response (infd[0], auth_user, pid);
close (infd[0]);
DEBUG1 ("Waiting on pid %ld", (long)pid);
if (waitpid (pid, &status, 0) < 0)
{
DEBUG1("waitpid error %s", strerror(errno));
break;
default: /* parent */
close (fd[0]);
len = snprintf (str, sizeof(str), "%s\n%s\n%s\n",
auth_user->mount, client->username, client->password);
write (fd[1], str, len);
close (fd[1]);
DEBUG1 ("Waiting on pid %ld", (long)pid);
if (waitpid (pid, &status, 0) < 0)
{
DEBUG1("waitpid error %s", strerror(errno));
break;
}
if (WIFEXITED (status))
{
DEBUG1("command exited normally with %d", WEXITSTATUS (status));
if (WEXITSTATUS(status) == 0)
return AUTH_OK;
}
break;
}
}
if (client->flags & CLIENT_AUTHENTICATED)
return AUTH_OK;
}
return AUTH_FAILED;
}
@ -131,11 +295,13 @@ int auth_get_cmd_auth (auth_t *authenticator, config_options_t *options)
state = calloc(1, sizeof(auth_cmd));
while(options) {
if(!strcmp(options->name, "filename"))
state->filename = strdup(options->value);
if (strcmp (options->name, "listener_add") == 0)
state->listener_add = strdup (options->value);
if (strcmp (options->name, "listener_remove") == 0)
state->listener_remove = strdup (options->value);
options = options->next;
}
if (state->filename == NULL)
if (state->listener_add == NULL)
{
ERROR0 ("No command specified for authentication");
return -1;

View File

@ -222,8 +222,13 @@ static int handle_returned_header (void *ptr, size_t size, size_t nmemb, void *s
if (strncasecmp (ptr, "Mountpoint: ", 12) == 0)
{
int len = strcspn ((char*)ptr+12, "\r\n");
auth_user->rejected_mount = malloc (len+1);
snprintf (auth_user->rejected_mount, len+1, "%s", (char *)ptr+12);
char *mount = malloc (len+1);
if (mount)
{
snprintf (mount, len+1, "%s", (char *)ptr+12);
free (auth_user->mount);
auth_user->mount = mount;
}
}
}

View File

@ -141,7 +141,7 @@ void workers_adjust (int new_count);
#define CLIENT_ACTIVE (001)
#define CLIENT_AUTHENTICATED (002)
#define CLIENT_IS_SLAVE (004)
#define CLIENT_IN_FSERVE (010)
#define CLIENT_NO_CONTENT_LENGTH (020)
#define CLIENT_HAS_INTRO_CONTENT (040)
#define CLIENT_FORMAT_BIT (01000)

View File

@ -33,6 +33,10 @@
#include <netinet/in.h>
#include <netdb.h>
#endif
#ifdef HAVE_SIGNALFD
#include <sys/signalfd.h>
#include <signal.h>
#endif
#include "compat.h"
@ -98,6 +102,7 @@ static time_t now;
static spin_t _connection_lock;
static volatile unsigned long _current_id = 0;
thread_type *conn_tid;
int sigfd;
static int ssl_ok;
#ifdef HAVE_OPENSSL
@ -436,7 +441,10 @@ int connection_init (connection_t *con, sock_t sock)
#ifdef HAVE_GETNAMEINFO
char buffer [200] = "unknown";
getnameinfo ((struct sockaddr *)&sa, slen, buffer, 200, NULL, 0, NI_NUMERICHOST);
ip = strdup (buffer);
if (strncmp (buffer, "::ffff:", 7) == 0)
ip = strdup (buffer+7);
else
ip = strdup (buffer);
#else
int len = 30;
ip = malloc (len);
@ -466,10 +474,10 @@ void connection_uses_ssl (connection_t *con)
#endif
}
static sock_t wait_for_serversock(int timeout)
static sock_t wait_for_serversock (void)
{
#ifdef HAVE_POLL
struct pollfd ufds [global.server_sockets];
struct pollfd ufds [global.server_sockets + 1];
int i, ret;
for(i=0; i < global.server_sockets; i++) {
@ -477,16 +485,45 @@ static sock_t wait_for_serversock(int timeout)
ufds[i].events = POLLIN;
ufds[i].revents = 0;
}
#ifdef HAVE_SIGNALFD
ufds[i].fd = sigfd;
ufds[i].events = POLLIN;
ufds[i].revents = 0;
ret = poll(ufds, i+1, -1);
#else
ret = poll(ufds, global.server_sockets, 333);
#endif
ret = poll(ufds, global.server_sockets, timeout);
if(ret < 0) {
if (ret <= 0)
return SOCK_ERROR;
}
else if(ret == 0) {
return SOCK_ERROR;
}
else {
int dst;
#ifdef HAVE_SIGNALFD
if (ufds[i].revents & POLLIN)
{
struct signalfd_siginfo fdsi;
int ret = read (sigfd, &fdsi, sizeof(struct signalfd_siginfo));
if (ret == sizeof(struct signalfd_siginfo))
{
switch (fdsi.ssi_signo)
{
case SIGINT:
case SIGTERM:
global.running = ICE_HALTING;
connection_running = 0;
DEBUG0 ("signalfd received a termination");
break;
case SIGHUP:
global.schedule_config_reread = 1;
connection_running = 0;
INFO0 ("HUP received, reread scheduled");
break;
default:
WARN1 ("unexpected signal (%d)", fdsi.ssi_signo);
}
}
}
#endif
for(i=0; i < global.server_sockets; i++) {
if(ufds[i].revents & POLLIN)
return ufds[i].fd;
@ -526,13 +563,10 @@ static sock_t wait_for_serversock(int timeout)
max = global.serversock[i];
}
if(timeout >= 0) {
tv.tv_sec = timeout/1000;
tv.tv_usec = (timeout % 1000) * 1000;
p = &tv;
}
tv.tv_sec = 0;
tv.tv_usec = 333000;
ret = select(max+1, &rfds, NULL, NULL, p);
ret = select(max+1, &rfds, NULL, NULL, &tv);
if(ret < 0) {
return SOCK_ERROR;
}
@ -550,10 +584,10 @@ static sock_t wait_for_serversock(int timeout)
}
static client_t *accept_client (int duration)
static client_t *accept_client (void)
{
client_t *client;
sock_t sock, serversock = wait_for_serversock (duration);
sock_t sock, serversock = wait_for_serversock ();
if (serversock == SOCK_ERROR)
return NULL;
@ -701,7 +735,7 @@ static int http_client_request (client_t *client)
{
fbinfo fb;
fb.mount = "/flashpolicy";
fb.flags = FS_NORMAL|FS_USE_ADMIN;
fb.flags = FS_USE_ADMIN;
fb.fallback = NULL;
fb.limit = 0;
client->respcode = 200;
@ -804,6 +838,14 @@ static void *connection_thread (void *arg)
{
ice_config_t *config;
#ifdef HAVE_SIGNALFD
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGTERM);
sigfd = signalfd(-1, &mask, 0);
#endif
connection_running = 1;
INFO0 ("connection thread started");
@ -816,7 +858,7 @@ static void *connection_thread (void *arg)
while (connection_running)
{
client_t *client = accept_client (333);
client_t *client = accept_client ();
if (client)
{
/* do a small delay here so the client has chance to send the request after
@ -837,6 +879,11 @@ static void *connection_thread (void *arg)
void connection_thread_startup ()
{
#ifdef HAVE_SIGNALFD
sigset_t mask;
sigfillset(&mask);
pthread_sigmask (SIG_SETMASK, &mask, NULL);
#endif
connection_running = 0;
while (conn_tid)
thread_sleep (100001);

View File

@ -261,11 +261,9 @@ static int process_vorbis_audio (ogg_state_t *ogg_info, ogg_codec_t *codec)
/* check for short values on first initial page */
if (packet . packetno == 4)
{
source_vorbis->initial_page_granulepos = codec->os.granulepos;
if (source_vorbis->initial_page_granulepos < source_vorbis->granulepos)
{
source_vorbis->granulepos -= source_vorbis->initial_page_granulepos;
source_vorbis->samples_in_page = source_vorbis->page_samples_trigger;
}
}
/* check for long values on first page */
if (packet.granulepos == source_vorbis->initial_page_granulepos)

View File

@ -292,7 +292,7 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client)
{
char *contenttype = fserve_content_type (fullpath);
stats_event_hidden (finfo->mount, "file", fullpath, STATS_HIDDEN);
stats_event_flags (finfo->mount, "file", fullpath, STATS_HIDDEN);
fh->format = calloc (1, sizeof (format_plugin_t));
fh->format->type = format_get_type (contenttype);
free (contenttype);
@ -318,7 +318,7 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client)
if (client)
{
if (finfo->mount && (finfo->flags & FS_FALLBACK))
stats_event_hidden (fh->finfo.mount, "listeners", "1", STATS_HIDDEN);
stats_event_flags (fh->finfo.mount, "listeners", "1", STATS_HIDDEN);
fh->clients = client;
client->next = NULL;
}
@ -546,7 +546,7 @@ int fserve_client_create (client_t *httpclient, const char *path)
return -1;
}
finfo.flags = FS_NORMAL;
finfo.flags = 0;
finfo.mount = (char *)path;
finfo.fallback = NULL;
finfo.limit = 0;
@ -609,7 +609,7 @@ static void file_release (client_t *client)
if (fh)
{
thread_mutex_lock (&fh->lock);
if (fh->finfo.flags & (FS_FALLBACK|FS_JINGLE))
if (fh->finfo.flags & FS_FALLBACK)
stats_event_dec (NULL, "listeners");
remove_from_fh (fh, client);
fh_release (fh);
@ -909,7 +909,7 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo)
void fserve_setup_client (client_t *client, const char *mount)
{
fbinfo finfo;
finfo.flags = FS_NORMAL;
finfo.flags = 0;
finfo.mount = (char *)mount;
finfo.fallback = NULL;
finfo.limit = 0;
@ -1033,7 +1033,7 @@ void fserve_kill_client (client_t *client, const char *mount, int response)
const char *idtext, *v = "0";
char buf[50];
finfo.flags = FS_NORMAL;
finfo.flags = 0;
finfo.mount = (char*)mount;
finfo.limit = 0;
finfo.fallback = NULL;
@ -1092,7 +1092,7 @@ void fserve_list_clients (client_t *client, const char *mount, int response, int
xmlNodePtr node, srcnode;
char buf[100];
finfo.flags = FS_NORMAL;
finfo.flags = 0;
if (type && strcmp (type, "fallback") == 0)
{
finfo.flags = FS_FALLBACK;

View File

@ -26,10 +26,9 @@ typedef struct _fbinfo
char *fallback;
} fbinfo;
#define FS_NORMAL 01
#define FS_USE_ADMIN 01
#define FS_FALLBACK 02
#define FS_USE_ADMIN 04
#define FS_JINGLE 010
#define FS_FALLBACK_EOF 04
void fserve_initialize(void);
void fserve_shutdown(void);

View File

@ -58,7 +58,6 @@
#include "logging.h"
#include "xslt.h"
#include "fserve.h"
#include "yp.h"
#include "auth.h"
#include <libxml/xmlmemory.h>
@ -118,13 +117,12 @@ void _initialize_subsystems(void)
void _shutdown_subsystems(void)
{
fserve_shutdown();
slave_shutdown();
connection_shutdown();
auth_shutdown();
yp_shutdown();
slave_shutdown();
fserve_shutdown();
stats_shutdown();
connection_shutdown();
config_shutdown();
refbuf_shutdown();
resolver_shutdown();
@ -224,6 +222,7 @@ static int _start_logging(void)
_fatal_error(buf);
}
log_set_level(errorlog, config->error_log.level);
thread_use_log_id (errorlog);
if(strcmp(config->access_log.name, "-")) {
snprintf(fn_access, FILENAME_MAX, "%s%s%s", config->log_dir, PATH_SEPARATOR, config->access_log.name);
@ -484,9 +483,6 @@ int main(int argc, char **argv)
/* let her rip */
global.running = ICE_RUNNING;
/* Startup yp thread */
yp_initialize();
/* Do this after logging init */
auth_initialise ();

View File

@ -59,6 +59,7 @@
#include "source.h"
#include "format.h"
#include "event.h"
#include "yp.h"
#define CATMODULE "slave"
@ -226,10 +227,12 @@ void slave_initialize(void)
void slave_shutdown(void)
{
yp_stop();
workers_adjust (0);
thread_rwlock_destroy (&slaves_lock);
thread_rwlock_destroy (&workers_lock);
thread_spin_destroy (&relay_start_lock);
yp_shutdown();
}
@ -595,7 +598,7 @@ static void check_relay_stream (relay_server *relay)
}
relay->source = source;
INFO1("Adding new relay at mountpoint \"%s\"", relay->localmount);
stats_event_hidden (source->mount, "listener_connections", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "listener_connections", "0", STATS_COUNTERS);
}
if (source->client == NULL)
{
@ -1049,6 +1052,7 @@ static void slave_startup (void)
update_master_as_slave (config);
stats_global (config);
workers_adjust (config->workers_count);
yp_initialize (config);
config_release_config();
source_recheck_mounts (1);
@ -1115,8 +1119,6 @@ static void _slave_thread(void)
restart_connection_thread = 0;
}
}
/* trigger any YP processing */
yp_thread_startup();
stats_global_calc();
thread_sleep (1000000);
}
@ -1127,8 +1129,6 @@ static void _slave_thread(void)
global.relays = NULL;
global.master_relays = NULL;
redirector_clearall();
/* send any removals to the YP servers */
yp_thread_startup();
thread_rwlock_wlock (&global.shutdown_lock);
thread_rwlock_unlock (&global.shutdown_lock);

View File

@ -915,14 +915,14 @@ void source_init (source_t *source)
/* start off the statistics */
stats_event_inc (NULL, "source_total_connections");
stats_event_hidden (source->mount, "slow_listeners", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "slow_listeners", "0", STATS_COUNTERS);
stats_event (source->mount, "server_type", source->format->contenttype);
stats_event_hidden (source->mount, "listener_peak", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "listener_peak", "0", STATS_COUNTERS);
stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
stats_event_time (source->mount, "stream_start");
stats_event_hidden (source->mount, "total_mbytes_sent", "0", STATS_COUNTERS);
stats_event_hidden (source->mount, "total_bytes_sent", "0", STATS_COUNTERS);
stats_event_hidden (source->mount, "total_bytes_read", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "total_mbytes_sent", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "total_bytes_sent", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "total_bytes_read", "0", STATS_COUNTERS);
stats_event (source->mount, "source_ip", source->client->connection.ip);
source->last_read = time(NULL);
@ -1333,21 +1333,21 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy
DEBUG1 ("fallback_when_full to %u", mountinfo->fallback_when_full);
DEBUG1 ("max listeners to %d", mountinfo->max_listeners);
stats_event_args (source->mount, "max_listeners", "%d", mountinfo->max_listeners);
stats_event_hidden (source->mount, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN);
stats_event_flags (source->mount, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN);
if (mountinfo->hidden)
{
stats_event_hidden (source->mount, NULL, NULL, STATS_HIDDEN);
stats_event_flags (source->mount, NULL, NULL, STATS_HIDDEN);
DEBUG0 ("hidden from public");
}
else
stats_event_hidden (source->mount, NULL, NULL, 0);
stats_event_flags (source->mount, NULL, NULL, 0);
}
else
{
DEBUG0 ("max listeners is not specified");
stats_event (source->mount, "max_listeners", "unlimited");
stats_event_hidden (source->mount, "cluster_password", NULL, STATS_SLAVE);
stats_event_hidden (source->mount, NULL, NULL, STATS_PUBLIC);
stats_event_flags (source->mount, "cluster_password", NULL, STATS_SLAVE);
stats_event_flags (source->mount, NULL, NULL, STATS_PUBLIC);
}
DEBUG1 ("public set to %d", source->yp_public);
DEBUG1 ("queue size to %u", source->queue_size_limit);
@ -1375,7 +1375,7 @@ static int source_client_callback (client_t *client)
if (agent)
stats_event (source->mount, "user_agent", agent);
stats_event_inc(NULL, "source_client_connections");
stats_event_hidden (source->mount, "listener_connections", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "listener_connections", "0", STATS_COUNTERS);
source_init (source);
client->ops = &source_client_ops;
@ -1462,7 +1462,7 @@ void source_recheck_mounts (int update_all)
}
else if (update_all)
{
stats_event_hidden (mount->mountname, NULL, NULL, mount->hidden?STATS_HIDDEN:0);
stats_event_flags (mount->mountname, NULL, NULL, mount->hidden?STATS_HIDDEN:0);
stats_event_args (mount->mountname, "listenurl", "http://%s:%d%s",
config->hostname, config->port, mount->mountname);
stats_event (mount->mountname, "listeners", "0");

View File

@ -47,9 +47,7 @@
#endif
#define VAL_BUFSIZE 20
#define STATS_BLOCK_NORMAL 01
#define STATS_LARGE CLIENT_FORMAT_BIT
#define STATS_BLOCK_CONNECTION 01
#define STATS_EVENT_SET 0
#define STATS_EVENT_INC 1
@ -63,7 +61,7 @@ typedef struct _stats_node_tag
{
char *name;
char *value;
int hidden;
int flags;
} stats_node_t;
typedef struct _stats_event_tag
@ -71,7 +69,7 @@ typedef struct _stats_event_tag
char *source;
char *name;
char *value;
int hidden;
int flags;
int action;
struct _stats_event_tag *next;
@ -80,18 +78,19 @@ typedef struct _stats_event_tag
typedef struct _stats_source_tag
{
char *source;
int hidden;
int flags;
avl_tree *stats_tree;
} stats_source_t;
typedef struct _event_listener_tag
{
int hidden_level;
int mask;
unsigned int content_len;
char *source;
/* queue for unwritten stats to stats clients */
refbuf_t **queue_recent_p;
unsigned int content_len;
refbuf_t *recent_block;
client_t *client;
struct _event_listener_tag *next;
} event_listener_t;
@ -120,7 +119,7 @@ static int _free_source_stats(void *key);
static stats_node_t *_find_node(avl_tree *tree, const char *name);
static stats_source_t *_find_source(avl_tree *tree, const char *source);
static void process_event (stats_event_t *event);
static void _add_stats_to_stats_client (event_listener_t *listener, const char *fmt, va_list ap);
static void _add_stats_to_stats_client (client_t *client, const char *fmt, va_list ap);
static void stats_listener_send (int flags, const char *fmt, ...);
static void process_event_unlocked (stats_event_t *event);
@ -131,8 +130,8 @@ static void build_event (stats_event_t *event, const char *source, const char *n
event->source = (char *)source;
event->name = (char *)name;
event->value = (char *)value;
event->hidden = STATS_PUBLIC;
if (source) event->hidden |= STATS_SLAVE;
event->flags = STATS_PUBLIC;
if (source) event->flags |= STATS_SLAVE;
if (value)
event->action = STATS_EVENT_SET;
else
@ -160,22 +159,22 @@ void stats_initialize(void)
stats_event_time (NULL, "server_start");
/* global currently active stats */
stats_event_hidden (NULL, "clients", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "connections", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "sources", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "stats", "0", STATS_COUNTERS);
stats_event_flags (NULL, "clients", "0", STATS_COUNTERS);
stats_event_flags (NULL, "connections", "0", STATS_COUNTERS);
stats_event_flags (NULL, "sources", "0", STATS_COUNTERS);
stats_event_flags (NULL, "stats", "0", STATS_COUNTERS);
stats_event (NULL, "listeners", "0");
/* global accumulating stats */
stats_event_hidden (NULL, "client_connections", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "source_client_connections", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "source_relay_connections", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "source_total_connections", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "stats_connections", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "listener_connections", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "outgoing_kbitrate", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "stream_kbytes_sent", "0", STATS_COUNTERS);
stats_event_hidden (NULL, "stream_kbytes_read", "0", STATS_COUNTERS);
stats_event_flags (NULL, "client_connections", "0", STATS_COUNTERS);
stats_event_flags (NULL, "source_client_connections", "0", STATS_COUNTERS);
stats_event_flags (NULL, "source_relay_connections", "0", STATS_COUNTERS);
stats_event_flags (NULL, "source_total_connections", "0", STATS_COUNTERS);
stats_event_flags (NULL, "stats_connections", "0", STATS_COUNTERS);
stats_event_flags (NULL, "listener_connections", "0", STATS_COUNTERS);
stats_event_flags (NULL, "outgoing_kbitrate", "0", STATS_COUNTERS|STATS_REGULAR);
stats_event_flags (NULL, "stream_kbytes_sent", "0", STATS_COUNTERS|STATS_REGULAR);
stats_event_flags (NULL, "stream_kbytes_read", "0", STATS_COUNTERS|STATS_REGULAR);
}
void stats_shutdown(void)
@ -213,7 +212,7 @@ void stats_event_conv(const char *mount, const char *name, const char *value, co
const char *metadata = value;
xmlBufferPtr conv = xmlBufferCreate ();
if (charset)
if (charset && value)
{
xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset);
@ -234,14 +233,14 @@ void stats_event_conv(const char *mount, const char *name, const char *value, co
xmlBufferFree (conv);
}
/* make stat hidden (non-zero). name can be NULL if it applies to a whole
/* set stat with flags, name can be NULL if it applies to a whole
* source stats tree. */
void stats_event_hidden (const char *source, const char *name, const char *value, int hidden)
void stats_event_flags (const char *source, const char *name, const char *value, int flags)
{
stats_event_t event;
build_event (&event, source, name, value);
event.hidden = hidden;
event.flags = flags;
if (value)
event.action |= STATS_EVENT_HIDDEN;
else
@ -316,6 +315,8 @@ void stats_event_add(const char *source, const char *name, unsigned long value)
stats_event_t event;
char buffer [VAL_BUFSIZE];
if (value == 0)
return;
build_event (&event, source, name, buffer);
snprintf (buffer, VAL_BUFSIZE, "%ld", value);
event.action = STATS_EVENT_ADD;
@ -327,6 +328,9 @@ void stats_event_sub(const char *source, const char *name, unsigned long value)
{
stats_event_t event;
char buffer[VAL_BUFSIZE];
if (value == 0)
return;
build_event (&event, source, name, buffer);
/* DEBUG2("%s on %s", name, source==NULL?"global":source); */
snprintf (buffer, VAL_BUFSIZE, "%ld", value);
@ -406,7 +410,7 @@ static void modify_node_event (stats_node_t *node, stats_event_t *event)
return;
if (event->action & STATS_EVENT_HIDDEN)
{
node->hidden = event->hidden;
node->flags = event->flags;
event->action &= ~STATS_EVENT_HIDDEN;
}
if (event->action != STATS_EVENT_SET)
@ -452,7 +456,7 @@ static void process_global_event (stats_event_t *event)
node = _find_node(_stats.global_tree, event->name);
if (node != NULL)
{
stats_listener_send (node->hidden, "DELETE global %s\n", event->name);
stats_listener_send (node->flags, "DELETE global %s\n", event->name);
avl_delete(_stats.global_tree, (void *)node, _free_stats);
}
return;
@ -461,7 +465,8 @@ static void process_global_event (stats_event_t *event)
if (node)
{
modify_node_event (node, event);
stats_listener_send (node->hidden, "EVENT global %s %s\n", node->name, node->value);
if ((node->flags & STATS_REGULAR) == 0)
stats_listener_send (node->flags, "EVENT global %s %s\n", node->name, node->value);
}
else
{
@ -469,10 +474,10 @@ static void process_global_event (stats_event_t *event)
node = (stats_node_t *)calloc(1, sizeof(stats_node_t));
node->name = (char *)strdup(event->name);
node->value = (char *)strdup(event->value);
node->hidden = event->hidden;
node->flags = event->flags;
avl_insert(_stats.global_tree, (void *)node);
stats_listener_send (node->hidden, "EVENT global %s %s\n", event->name, event->value);
stats_listener_send (node->flags, "EVENT global %s %s\n", event->name, event->value);
}
}
@ -492,7 +497,7 @@ static void process_source_event (stats_event_t *event)
DEBUG1 ("new source stat %s", event->source);
snode->source = (char *)strdup(event->source);
snode->stats_tree = avl_tree_new(_compare_stats, NULL);
snode->hidden = STATS_SLAVE|STATS_GENERAL|STATS_HIDDEN;
snode->flags = STATS_SLAVE|STATS_GENERAL|STATS_HIDDEN;
avl_insert(_stats.source_tree, (void *)snode);
}
@ -510,10 +515,10 @@ static void process_source_event (stats_event_t *event)
node = (stats_node_t *)calloc(1,sizeof(stats_node_t));
node->name = (char *)strdup(event->name);
node->value = (char *)strdup(event->value);
node->hidden = event->hidden;
if (snode->hidden & STATS_HIDDEN)
node->hidden |= STATS_HIDDEN;
stats_listener_send (node->hidden, "EVENT %s %s %s\n", event->source, event->name, event->value);
node->flags = event->flags;
if (snode->flags & STATS_HIDDEN)
node->flags |= STATS_HIDDEN;
stats_listener_send (node->flags, "EVENT %s %s %s\n", event->source, event->name, event->value);
avl_insert(snode->stats_tree, (void *)node);
}
return;
@ -521,43 +526,43 @@ static void process_source_event (stats_event_t *event)
if (event->action == STATS_EVENT_REMOVE)
{
DEBUG1 ("delete node %s", event->name);
stats_listener_send (node->hidden, "DELETE %s %s\n", event->source, event->name);
stats_listener_send (node->flags, "DELETE %s %s\n", event->source, event->name);
avl_delete(snode->stats_tree, (void *)node, _free_stats);
return;
}
modify_node_event (node, event);
stats_listener_send (node->hidden, "EVENT %s %s %s\n", event->source, node->name, node->value);
stats_listener_send (node->flags, "EVENT %s %s %s\n", event->source, node->name, node->value);
return;
}
/* change source hidden status */
/* change source flags status */
if (event->action & STATS_EVENT_HIDDEN)
{
avl_node *node = avl_get_first (snode->stats_tree);
int visible = 0;
if ((event->hidden&STATS_HIDDEN) == (snode->hidden&STATS_HIDDEN))
if ((event->flags&STATS_HIDDEN) == (snode->flags&STATS_HIDDEN))
return;
if (snode->hidden & STATS_HIDDEN)
if (snode->flags & STATS_HIDDEN)
{
snode->hidden &= ~STATS_HIDDEN;
stats_listener_send (snode->hidden, "NEW %s\n", snode->source);
snode->flags &= ~STATS_HIDDEN;
stats_listener_send (snode->flags, "NEW %s\n", snode->source);
visible = 1;
}
else
{
stats_listener_send (snode->hidden, "DELETE %s\n", snode->source);
snode->hidden |= STATS_HIDDEN;
stats_listener_send (snode->flags, "DELETE %s\n", snode->source);
snode->flags |= STATS_HIDDEN;
}
while (node)
{
stats_node_t *stats = (stats_node_t*)node->key;
if (visible)
{
stats->hidden &= ~STATS_HIDDEN;
stats_listener_send (stats->hidden, "EVENT %s %s %s\n", snode->source, stats->name, stats->value);
stats->flags &= ~STATS_HIDDEN;
stats_listener_send (stats->flags, "EVENT %s %s %s\n", snode->source, stats->name, stats->value);
}
else
stats->hidden |= STATS_HIDDEN;
stats->flags |= STATS_HIDDEN;
node = avl_get_next (node);
}
return;
@ -575,7 +580,7 @@ void stats_event_time (const char *mount, const char *name)
localtime_r (&now, &local);
strftime (buffer, sizeof (buffer), ICECAST_TIME_FMT, &local);
stats_event_hidden (mount, name, buffer, STATS_GENERAL);
stats_event_flags (mount, name, buffer, STATS_GENERAL);
}
@ -587,28 +592,34 @@ static int stats_listeners_send (client_t *client)
if (client->connection.error || global.running != ICE_RUNNING)
return -1;
if (client->flags & STATS_LARGE)
if (client->refbuf && client->refbuf->flags & STATS_BLOCK_CONNECTION)
loop = 4;
else
if (listener->content_len > 50000)
/* allow for 200k lag but only after 2Meg has been sent, give connection time
* to cacth up after the large dump at the beginning */
if (client->connection.sent_bytes > 2000000 && listener->content_len > 200000)
{
WARN1 ("dropping stats client, %ld in queue", listener->content_len);
return -1;
}
client->schedule_ms = client->worker->time_ms + 100;
client->schedule_ms = client->worker->time_ms;
thread_mutex_lock(&_stats_mutex);
while (loop)
{
refbuf_t *refbuf = client->refbuf;
if (refbuf == NULL)
{
client->schedule_ms = client->worker->time_ms + 300;
break;
if ((client->flags & STATS_LARGE) && (refbuf->flags & STATS_BLOCK_NORMAL))
client->flags &= ~STATS_LARGE;
}
ret = format_generic_write_to_client (client);
if (ret > 0)
listener->content_len -= ret;
if (ret < 0)
{
client->schedule_ms = client->worker->time_ms + 200;
break;
}
listener->content_len -= ret;
if (client->pos == refbuf->len)
{
client->refbuf = refbuf->next;
@ -617,17 +628,21 @@ static int stats_listeners_send (client_t *client)
client->pos = 0;
if (client->refbuf == NULL)
{
listener->queue_recent_p = &client->refbuf;
break;
if (listener->content_len)
WARN1 ("content length is %u", listener->content_len);
listener->recent_block = NULL;
}
}
else if (ret < 4096)
else
{
client->schedule_ms = client->worker->time_ms + 200;
break; /* short write, so stop for now */
}
loop--;
}
thread_mutex_unlock(&_stats_mutex);
if (loop == 0)
client->schedule_ms -= 100;
client->schedule_ms = client->worker->time_ms;
return 0;
}
@ -647,7 +662,7 @@ static void clear_stats_queue (client_t *client)
}
static void stats_listener_send (int hidden_level, const char *fmt, ...)
static void stats_listener_send (int mask, const char *fmt, ...)
{
va_list ap;
event_listener_t *listener = _stats.event_listeners,
@ -657,8 +672,8 @@ static void stats_listener_send (int hidden_level, const char *fmt, ...)
while (listener)
{
if (listener->hidden_level & hidden_level)
_add_stats_to_stats_client (listener, fmt, ap);
if (listener->mask & mask)
_add_stats_to_stats_client (listener->client, fmt, ap);
trail = &listener->next;
listener = listener->next;
}
@ -667,8 +682,8 @@ static void stats_listener_send (int hidden_level, const char *fmt, ...)
void stats_global (ice_config_t *config)
{
stats_event_hidden (NULL, "server_id", config->server_id, STATS_GENERAL);
stats_event_hidden (NULL, "host", config->hostname, STATS_GENERAL);
stats_event_flags (NULL, "server_id", config->server_id, STATS_GENERAL);
stats_event_flags (NULL, "host", config->hostname, STATS_GENERAL);
stats_event (NULL, "location", config->location);
stats_event (NULL, "admin", config->admin);
#if 0
@ -715,7 +730,7 @@ static int _append_to_bufferv (refbuf_t *refbuf, int max_len, const char *fmt, v
if (ret < 0 || ret >= len)
return -1;
refbuf->len += ret;
return 0;
return ret;
}
static int _append_to_buffer (refbuf_t *refbuf, int max_len, const char *fmt, ...)
@ -730,38 +745,58 @@ static int _append_to_buffer (refbuf_t *refbuf, int max_len, const char *fmt, ..
}
static void _add_node_to_stats_client (event_listener_t *listener, refbuf_t *refbuf)
static void _add_node_to_stats_client (client_t *client, refbuf_t *refbuf)
{
if (refbuf->len)
{
*listener->queue_recent_p = refbuf;
listener->queue_recent_p = &refbuf->next;
event_listener_t *listener = client->shared_data;
if (listener->recent_block)
{
listener->recent_block->next = refbuf;
listener->recent_block = refbuf;
}
else
{
listener->recent_block = refbuf;
client->refbuf = refbuf;
}
listener->content_len += refbuf->len;
}
}
static void _add_stats_to_stats_client (event_listener_t *listener,const char *fmt, va_list ap)
static void _add_stats_to_stats_client (client_t *client, const char *fmt, va_list ap)
{
unsigned int size = 50;
while (size < 300)
{
refbuf_t *refbuf = refbuf_new (size);
refbuf->len = 0;
event_listener_t *listener = client->shared_data;
refbuf_t *r = listener->recent_block;
if (_append_to_bufferv (refbuf, size, fmt, ap) == 0)
if (r && (r->flags & STATS_BLOCK_CONNECTION) == 0)
{
/* lets see if we can append to an existing block */
if (r->len < 1390)
{
refbuf->flags |= STATS_BLOCK_NORMAL;
_add_node_to_stats_client (listener, refbuf);
return;
int written = _append_to_bufferv (r, 1400, fmt, ap);
if (written > 0)
{
listener->content_len += written;
return;
}
}
refbuf_release (refbuf);
size += 100;
}
r = refbuf_new (1400);
r->len = 0;
if (_append_to_bufferv (r, 1400, fmt, ap) < 0)
{
WARN1 ("stat details are too large \"%s\"", fmt);
refbuf_release (r);
return;
}
_add_node_to_stats_client (client, r);
}
static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, int hidden)
static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, int flags)
{
avl_node *avlnode;
xmlNodePtr ret = NULL;
@ -772,7 +807,7 @@ static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, i
while (avlnode)
{
stats_node_t *stat = avlnode->key;
if (stat->hidden & hidden)
if (stat->flags & flags)
xmlNewTextChild (root, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
avlnode = avl_get_next (avlnode);
}
@ -781,7 +816,7 @@ static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, i
while (avlnode)
{
stats_source_t *source = (stats_source_t *)avlnode->key;
if (((hidden&STATS_HIDDEN) || (source->hidden&STATS_HIDDEN) == (hidden&STATS_HIDDEN)) &&
if (((flags&STATS_HIDDEN) || (source->flags&STATS_HIDDEN) == (flags&STATS_HIDDEN)) &&
(show_mount == NULL || strcmp (show_mount, source->source) == 0))
{
avl_node *avlnode2 = avl_get_first (source->stats_tree);
@ -793,7 +828,7 @@ static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, i
while (avlnode2)
{
stats_node_t *stat = avlnode2->key;
if ((hidden&STATS_HIDDEN) || (stat->hidden&STATS_HIDDEN) == (hidden&STATS_HIDDEN))
if ((flags&STATS_HIDDEN) || (stat->flags&STATS_HIDDEN) == (flags&STATS_HIDDEN))
xmlNewTextChild (xmlnode, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
avlnode2 = avl_get_next (avlnode2);
}
@ -808,8 +843,9 @@ static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, i
/* factoring out code for stats loops
** this function copies all stats to queue, and registers
*/
static void _register_listener (event_listener_t *listener)
static void _register_listener (client_t *client)
{
event_listener_t *listener = client->shared_data;
avl_node *node;
stats_event_t stats_count;
refbuf_t *refbuf;
@ -830,11 +866,11 @@ static void _register_listener (event_listener_t *listener)
{
stats_node_t *stat = node->key;
if (stat->hidden & listener->hidden_level)
if (stat->flags & listener->mask)
{
if (_append_to_buffer (refbuf, size, "EVENT global %s %s\n", stat->name, stat->value) < 0)
{
_add_node_to_stats_client (listener, refbuf);
_add_node_to_stats_client (client, refbuf);
refbuf = refbuf_new (size);
refbuf->len = 0;
continue;
@ -848,11 +884,11 @@ static void _register_listener (event_listener_t *listener)
{
avl_node *node2;
stats_source_t *snode = (stats_source_t *)node->key;
if (snode->hidden & listener->hidden_level)
if (snode->flags & listener->mask)
{
if (_append_to_buffer (refbuf, size, "NEW %s\n", snode->source) < 0)
{
_add_node_to_stats_client (listener, refbuf);
_add_node_to_stats_client (client, refbuf);
refbuf = refbuf_new (size);
refbuf->len = 0;
continue;
@ -863,11 +899,11 @@ static void _register_listener (event_listener_t *listener)
while (node2)
{
stats_node_t *stat = node2->key;
if (stat->hidden & listener->hidden_level)
if (stat->flags & listener->mask)
{
if (_append_to_buffer (refbuf, size, "EVENT %s %s %s\n", snode->source, stat->name, stat->value) < 0)
{
_add_node_to_stats_client (listener, refbuf);
_add_node_to_stats_client (client, refbuf);
refbuf = refbuf_new (size);
refbuf->len = 0;
continue;
@ -876,7 +912,7 @@ static void _register_listener (event_listener_t *listener)
node2 = avl_get_next (node2);
}
}
_add_node_to_stats_client (listener, refbuf);
_add_node_to_stats_client (client, refbuf);
/* now we register to receive future event notices */
listener->next = _stats.event_listeners;
@ -917,27 +953,27 @@ struct _client_functions stats_client_send_ops =
stats_client_release
};
void stats_add_listener (client_t *client, int hidden_level)
void stats_add_listener (client_t *client, int mask)
{
event_listener_t *listener = calloc (1, sizeof (event_listener_t));
listener->hidden_level = hidden_level;
listener->mask = mask;
client->respcode = 200;
client->ops = &stats_client_send_ops;
client->shared_data = listener;
client_set_queue (client, NULL);
client->flags |= CLIENT_ACTIVE;
client->refbuf = refbuf_new (100);
snprintf (client->refbuf->data, 100,
"HTTP/1.0 200 OK\r\ncapability: streamlist\r\n\r\n");
client->refbuf->len = strlen (client->refbuf->data);
listener->content_len = client->refbuf->len;
listener->queue_recent_p = &client->refbuf->next;
listener->recent_block = client->refbuf;
listener->client = client;
client->flags |= STATS_LARGE;
thread_mutex_lock(&_stats_mutex);
_register_listener (listener);
_register_listener (client);
thread_mutex_unlock(&_stats_mutex);
client->flags |= CLIENT_ACTIVE;
}
void stats_transform_xslt(client_t *client, const char *uri)
@ -954,7 +990,7 @@ void stats_transform_xslt(client_t *client, const char *uri)
free (xslpath);
}
xmlDocPtr stats_get_xml (int hidden, const char *show_mount)
xmlDocPtr stats_get_xml (int flags, const char *show_mount)
{
xmlDocPtr doc;
xmlNodePtr node;
@ -963,7 +999,7 @@ xmlDocPtr stats_get_xml (int hidden, const char *show_mount)
node = xmlNewDocNode (doc, NULL, XMLSTR("icestats"), NULL);
xmlDocSetRootElement(doc, node);
node = _dump_stats_to_doc (node, show_mount, hidden);
node = _dump_stats_to_doc (node, show_mount, flags);
if (show_mount && node)
{
@ -1009,7 +1045,7 @@ static int _free_stats(void *key)
static int _free_source_stats(void *key)
{
stats_source_t *node = (stats_source_t *)key;
stats_listener_send (node->hidden, "DELETE %s\n", node->source);
stats_listener_send (node->flags, "DELETE %s\n", node->source);
DEBUG1 ("delete source node %s", node->source);
avl_tree_free(node->stats_tree, _free_stats);
free(node->source);
@ -1044,7 +1080,7 @@ refbuf_t *stats_get_streams (int prepend)
int ret;
stats_source_t *source = (stats_source_t *)node->key;
if (source->hidden & STATS_SLAVE)
if (source->flags & STATS_SLAVE)
{
if (remaining <= strlen (source->source) + prelen + 3)
{
@ -1107,15 +1143,26 @@ void stats_global_calc (void)
{
event_listener_t *listener;
stats_event_t event;
avl_node *anode;
char buffer [VAL_BUFSIZE];
anode = avl_get_first(_stats.global_tree);
while (anode)
{
stats_node_t *node = (stats_node_t *)anode->key;
if (node->flags & STATS_REGULAR)
stats_listener_send (node->flags, "EVENT global %s %s\n", node->name, node->value);
anode = avl_get_next (anode);
}
build_event (&event, NULL, "outgoing_kbitrate", buffer);
event.hidden = STATS_COUNTERS|STATS_HIDDEN;
event.flags = STATS_COUNTERS|STATS_HIDDEN;
thread_mutex_lock (&_stats_mutex);
snprintf (buffer, sizeof(buffer), "%" PRIu64,
(int64_t)global_getrate_avg (global.out_bitrate) * 8 / 1024);
process_event_unlocked (&event);
/* retrieve the list of closing down clients */
listener = _stats.listeners_removed;
_stats.listeners_removed = NULL;

View File

@ -26,6 +26,7 @@
#define STATS_GENERAL 4
#define STATS_COUNTERS 8
#define STATS_PUBLIC (STATS_GENERAL|STATS_COUNTERS)
#define STATS_REGULAR 01000
#define STATS_ALL ~0
void stats_initialize(void);
@ -44,7 +45,7 @@ void stats_event_inc(const char *source, const char *name);
void stats_event_add(const char *source, const char *name, unsigned long value);
void stats_event_sub(const char *source, const char *name, unsigned long value);
void stats_event_dec(const char *source, const char *name);
void stats_event_hidden (const char *source, const char *name, const char *value, int hidden);
void stats_event_flags (const char *source, const char *name, const char *value, int flags);
void stats_event_time (const char *mount, const char *name);
void *stats_connection(void *arg);
@ -53,7 +54,7 @@ void stats_global_calc(void);
void stats_transform_xslt(client_t *client, const char *uri);
void stats_sendxml(client_t *client);
xmlDocPtr stats_get_xml(int show_hidden, const char *show_mount);
xmlDocPtr stats_get_xml(int flags, const char *show_mount);
char *stats_get_value(const char *source, const char *name);
#endif /* __STATS_H__ */

154
src/yp.c
View File

@ -96,6 +96,16 @@ static int do_yp_remove (ypdata_t *yp, char *s, unsigned len);
static int do_yp_add (ypdata_t *yp, char *s, unsigned len);
static int do_yp_touch (ypdata_t *yp, char *s, unsigned len);
static void yp_destroy_ypdata(ypdata_t *ypdata);
static int directory_recheck (client_t *client);
struct _client_functions directory_client_ops =
{
directory_recheck,
NULL
};
static client_t ypclient;
/* curl callback used to parse headers coming back from the YP server */
@ -186,6 +196,57 @@ static void destroy_yp_server (struct yp_server *server)
}
static void yp_schedule (ypdata_t *yp, unsigned offset)
{
time_t when = ypclient.worker->current_time.tv_sec + offset;
yp->next_update = when;
if ((uint64_t)when < ypclient.counter)
ypclient.counter = (uint64_t)when;
}
static int directory_recheck (client_t *client)
{
int ret = -1;
thread_rwlock_rlock (&yp_lock);
do {
if (ypclient.connection.error)
break;
if (active_yps || yp_update)
{
ret = 0;
if (yp_update || active_yps->mounts)
{
if (yp_update || client->counter <= client->worker->current_time.tv_sec)
{
client->counter = (uint64_t)-1;
client->flags &= ~CLIENT_ACTIVE;
thread_create ("YP Thread", yp_update_thread, NULL, THREAD_DETACHED);
break;
}
}
}
client->schedule_ms = client->worker->time_ms + 1000;
} while (0);
thread_rwlock_unlock (&yp_lock);
return ret;
}
static void yp_client_add (ice_config_t *config)
{
if (config->num_yp_directories == 0 || active_yps || global.running != ICE_RUNNING)
return;
INFO0 ("Starting Directory client for YP processing");
ypclient.ops = &directory_client_ops;
ypclient.counter = 0;
ypclient.schedule_ms = 0;
ypclient.connection.error = 0;
ypclient.flags = CLIENT_ACTIVE;
client_add_worker (&ypclient);
}
/* search for a ypdata entry corresponding to a specific mountpoint */
static ypdata_t *find_yp_mount (ypdata_t *mounts, const char *mount)
@ -228,10 +289,7 @@ void yp_recheck_config (ice_config_t *config)
server = calloc (1, sizeof (struct yp_server));
if (server == NULL)
{
destroy_yp_server (server);
break;
}
server->server_id = strdup ((char *)server_version);
server->url = strdup (config->yp_url[i]);
server->url_timeout = config->yp_url_timeout[i];
@ -264,18 +322,17 @@ void yp_recheck_config (ice_config_t *config)
server->remove = 0;
}
}
thread_rwlock_unlock (&yp_lock);
yp_update = 1;
yp_client_add (config);
thread_rwlock_unlock (&yp_lock);
}
void yp_initialize(void)
void yp_initialize (ice_config_t *config)
{
ice_config_t *config = config_get_config();
thread_rwlock_create (&yp_lock);
thread_mutex_create (&yp_pending_lock);
yp_recheck_config (config);
config_release_config ();
}
@ -297,7 +354,7 @@ static int send_to_yp (const char *cmd, ypdata_t *yp, char *post)
if (curlcode)
{
yp->process = do_yp_add;
yp->next_update = now + 1200;
yp_schedule (yp, 1200);
ERROR2 ("connection to %s failed with \"%s\"", server->url, server->curl_error);
return -2;
}
@ -308,7 +365,7 @@ static int send_to_yp (const char *cmd, ypdata_t *yp, char *post)
if (yp->process == do_yp_add)
{
ERROR3 ("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
yp->next_update = now + 7200;
yp_schedule (yp, 7200);
}
if (yp->process == do_yp_touch)
{
@ -319,9 +376,9 @@ static int send_to_yp (const char *cmd, ypdata_t *yp, char *post)
* cases as a firewall block or incorrect listenurl.
*/
if (yp->touch_interval < 1200)
yp->next_update = now + 1200;
yp_schedule (yp, 1200);
else
yp->next_update = now + yp->touch_interval;
yp_schedule (yp, yp->touch_interval);
INFO3 ("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
}
yp->process = do_yp_add;
@ -409,7 +466,7 @@ static int do_yp_add (ypdata_t *yp, char *s, unsigned len)
{
yp->process = do_yp_touch;
/* force first touch in 5 secs */
yp->next_update = now + 5;
yp_schedule (yp, 5);
}
return ret;
}
@ -474,7 +531,7 @@ static int do_yp_touch (ypdata_t *yp, char *s, unsigned len)
if (send_to_yp ("touch", yp, s) == 0)
{
yp->next_update = now + yp->touch_interval;
yp_schedule (yp, yp->touch_interval);
return 0;
}
return -1;
@ -501,14 +558,14 @@ static int process_ypdata (struct yp_server *server, ypdata_t *yp)
if (yp->release)
{
yp->process = do_yp_remove;
yp->next_update = 0;
yp_schedule (yp, 0);
}
ret = yp->process (yp, s, len);
if (ret <= 0)
{
free (s);
return ret;
free (s);
return ret;
}
len = ret;
}
@ -533,10 +590,13 @@ static void yp_process_server (struct yp_server *server)
{
DEBUG2 ("skiping %s on %s", yp->mount, server->url);
yp->process = do_yp_add;
yp->next_update += 900;
yp_schedule (yp, 900);
}
else
state = process_ypdata (server, yp);
if (yp->remove == 0 && (uint64_t)yp->next_update < ypclient.counter)
ypclient.counter = (uint64_t)yp->next_update;
yp = yp->next;
}
}
@ -594,6 +654,7 @@ static ypdata_t *create_yp_entry (const char *mount)
if (yp->listen_url == NULL)
break;
yp_schedule (yp, 0);
return yp;
} while (0);
@ -677,6 +738,7 @@ static void add_pending_yp (struct yp_server *server)
break;
yp = yp->next;
}
ypclient.counter = 0;
yp->next = current;
DEBUG2 ("%u YP entries added to %s", count, server->url);
}
@ -712,6 +774,7 @@ static void *yp_update_thread(void *arg)
/* do the YP communication */
thread_rwlock_rlock (&yp_lock);
ypclient.counter = -1;
server = (struct yp_server *)active_yps;
while (server)
{
@ -740,6 +803,12 @@ static void *yp_update_thread(void *arg)
yp_thread = NULL;
/* DEBUG0("YP thread shutdown"); */
thread_mutex_lock (&ypclient.worker->lock);
ypclient.flags |= CLIENT_ACTIVE;
DEBUG1 ("wakeup again in %lu secs", ypclient.counter - time(NULL));
thread_cond_signal (&ypclient.worker->cond);
thread_mutex_unlock (&ypclient.worker->lock);
return NULL;
}
@ -914,6 +983,7 @@ void yp_remove (const char *mount)
DEBUG2 ("release %s on YP %s", mount, server->url);
yp->release = 1;
yp->next_update = 0;
yp_update = 1;
}
server = server->next;
}
@ -946,7 +1016,7 @@ void yp_touch (const char *mount)
}
/* don't update the directory if there is a touch scheduled soon */
if (yp->process == do_yp_touch && now + yp->touch_interval - yp->next_update > 60)
yp->next_update = now;
yp_schedule (yp, 0);
}
server = server->next;
if (server)
@ -958,35 +1028,35 @@ void yp_touch (const char *mount)
void yp_shutdown (void)
{
int loop=25;
DEBUG0 ("releasing directory details");
thread_rwlock_destroy (&yp_lock);
thread_mutex_destroy (&yp_pending_lock);
yp_update = 1;
while (yp_thread && loop)
/* free server and ypdata left */
while (active_yps)
{
thread_sleep (200000);
loop--;
struct yp_server *server = (struct yp_server *)active_yps;
active_yps = server->next;
destroy_yp_server (server);
}
if (yp_thread == NULL)
{
thread_rwlock_destroy (&yp_lock);
thread_mutex_destroy (&yp_pending_lock);
/* free server and ypdata left */
while (active_yps)
{
struct yp_server *server = (struct yp_server *)active_yps;
active_yps = server->next;
destroy_yp_server (server);
}
free ((char*)server_version);
server_version = NULL;
}
INFO0 ("YP thread down");
free ((char*)server_version);
server_version = NULL;
active_yps = NULL;
INFO0 ("YP cleanup complete");
}
void yp_thread_startup (void)
void yp_stop (void)
{
if (yp_thread == NULL)
thread_create ("YP Thread", yp_update_thread, NULL, THREAD_DETACHED);
worker_t *w = ypclient.worker;
if (w)
{
thread_mutex_lock (&w->lock);
ypclient.connection.error = 1;
ypclient.schedule_ms = 0;
thread_cond_signal (&w->cond);
thread_mutex_unlock (&w->lock);
DEBUG0 ("YP client is now stopped");
}
}

View File

@ -32,9 +32,10 @@ void yp_add (const char *mount);
void yp_remove (const char *mount);
void yp_touch (const char *mount);
void yp_recheck_config (ice_config_t *config);
void yp_initialize(void);
void yp_initialize (ice_config_t *config);
void yp_shutdown(void);
void yp_thread_startup (void);
void yp_stop (void);
#else
@ -42,9 +43,10 @@ void yp_thread_startup (void);
#define yp_remove(x) do{}while(0)
#define yp_touch(x) do{}while(0)
#define yp_recheck_config(x) do{}while(0)
#define yp_initialize() WARN0("YP server handling has been disabled")
#define yp_initialize(x) WARN0("YP server handling has been disabled")
#define yp_shutdown() do{}while(0)
#define yp_thread_startup() do{}while(0)
#define yp_stop() do{}while(0)
#endif /* USE_YP */

View File

@ -3,7 +3,7 @@
[Setup]
AppName=Icecast2-KH
AppVerName=Icecast v2.3.2-kh17
AppVerName=Icecast v2.3.2-kh18
AppPublisherURL=http://www.icecast.org
AppSupportURL=http://www.icecast.org
AppUpdatesURL=http://www.icecast.org
@ -13,10 +13,10 @@ AllowNoIcons=yes
LicenseFile=..\COPYING
InfoAfterFile=..\README
OutputDir=.
OutputBaseFilename=icecast2_win32_v2.3.2-kh17_setup
OutputBaseFilename=icecast2_win32_v2.3.2-kh18_setup
WizardImageFile=icecast2logo2.bmp
WizardImageStretch=no
VersionInfoProductVersion=kh17
VersionInfoProductVersion=kh18
VersionInfoVersion=2.3.2
; uncomment the following line if you want your installation to run on NT 3.51 too.
; MinVersion=4,3.51