From 1f3c6c2d87103834cb19b4058d185059c1b11bf3 Mon Sep 17 00:00:00 2001 From: Karl Heyes Date: Sat, 24 Dec 2011 02:35:54 +0000 Subject: [PATCH] bump to kh31 The main changes are listed below but there is a lot of noise from behind the scenes work like quicker stats updates, code re-arranging and infrequent used code like the allocation code which is not usually compiled in. Implement scatter-gather IO, initially for flv wrapping as that will make a lot of use of it and should save a lot of memory copies. The icy metadata should also be a candidate for this. Add better handling to relays with multiple servers and fallbacks. Problem entries can be skipped when restarting. Fixup build system for mingw32 cross compile. VC was getting too annoying but should still be an option if needed later. This now allows us to build with newer dependent libs without much extra work. The GUI component is not built now (VC specific) but it was of limited use and most win32 users configure the service. svn path=/icecast/branches/kh/icecast/; revision=18147 --- Makefile.am | 9 +- NEWS | 34 ++++++ admin/viewxml.xsl | 75 +++++++++++++ configure.in | 24 +++- src/Makefile.am | 10 +- src/admin.c | 84 +++++++------- src/auth.c | 47 ++++++-- src/auth.h | 3 - src/auth_cmd.c | 1 + src/auth_cmd.h | 4 - src/auth_htpasswd.c | 1 + src/auth_htpasswd.h | 3 - src/auth_url.c | 12 ++ src/auth_url.h | 3 - src/cfgfile.c | 73 +++++++++++- src/cfgfile.h | 36 ++++-- src/compat.h | 14 ++- src/connection.c | 221 +++++++++++++++++++++++++++++++----- src/connection.h | 22 ++++ src/event.c | 5 +- src/flv.c | 207 +++++++++++++++++++--------------- src/flv.h | 1 + src/format_flac.c | 1 + src/format_kate.c | 1 + src/format_midi.c | 1 + src/format_mp3.c | 41 ++++--- src/format_ogg.c | 5 +- src/format_skeleton.c | 1 + src/format_speex.c | 1 + src/format_theora.c | 1 + src/format_vorbis.c | 1 + src/fserve.c | 218 ++++++++++++++++++++++-------------- src/fserve.h | 5 +- src/global.c | 4 +- src/global.h | 7 +- src/logging.c | 64 +++++++---- src/logging.h | 2 +- src/main.c | 145 +++++++++++------------- src/md5.c | 1 + src/mpeg.c | 50 ++++----- src/sighandler.c | 2 +- src/slave.c | 177 +++++++++++++++++++---------- src/slave.h | 2 + src/source.c | 236 +++++++++++++++++++++++++-------------- src/source.h | 4 +- src/stats.c | 133 +++++++++++++++++----- src/stats.h | 4 +- src/util.c | 52 ++++----- src/util.h | 2 +- src/xslt.c | 8 +- win32/Makefile.am | 9 ++ win32/icecastService.cpp | 60 ++++++---- 52 files changed, 1448 insertions(+), 679 deletions(-) create mode 100644 admin/viewxml.xsl diff --git a/Makefile.am b/Makefile.am index d05d81cd..648b7768 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,7 +3,10 @@ AUTOMAKE_OPTIONS = foreign ACLOCAL_AMFLAGS = -I m4 -SUBDIRS = src conf doc web admin win32 +SUBDIRS = conf doc web admin src +if WIN32 +SUBDIRS += win32 +endif EXTRA_DIST = HACKING config.h.vc6 m4/acx_pthread.m4 m4/ogg.m4 \ m4/theora.m4 m4/vorbis.m4 m4/speex.m4\ @@ -19,5 +22,7 @@ debug: profile: $(MAKE) all CFLAGS="@PROFILE@" +staticdebug: + $(MAKE) all CFLAGS="-g -DLIBXML_STATIC -DCURL_STATICLIB" LDFLAGS="${LDFLAGS} -all-static" static: - $(MAKE) all LDFLAGS="${LDFLAGS} -all-static" + $(MAKE) all CFLAGS="-O2 -DLIBXML_STATIC -DCURL_STATICLIB" LDFLAGS="${LDFLAGS} -all-static" diff --git a/NEWS b/NEWS index 1b50924b..d016e6bf 100644 --- a/NEWS +++ b/NEWS @@ -16,6 +16,40 @@ Feature differences from SVN trunk any extra tags are show in the conf/icecast.xml.dist file +2.3.2-kh31 +. Add generic scattered IO routines, listeners wanting FLV wrapping now use this which + saves a lot of memory copying and therefore load. +. build setup for mingw32. This should help with library updates for us, drop GUI build +. relay restarting fixes included for certain error cases. relays are treaated as + failed if they stop within 60 seconds and skip on to next server if specified. +. a better trigger for on-demand relay if fallback has listeners. +. on reload when changeowner used, allow for listening sockets to be reopend but + prevent closing priviledged ports unless missing from xml +. prevent unescape routine from creating non-printable chars +. ignore per-mount username when source is using shoutcast protocol. +. URL auth can now accept a "ice-username: ..." in the response headers for setting the + username. For setups that use query args for auth +. expand args applied to xsl requests, url encoded now. +. modify k and m in bitrates to be on 1000 not 1024 +. accesslog modification to parsing, optionally set in to CLF-ESC + to escape-encode strings instead of using "". +. if falling back to file, the bitrate used takes, in order of priority, incoming + bitrate, limit-rate and then [value] in filename eg /fallback-[128] +. fix a large ogg page bug, flac is probably the only one showing this up currently +. fix a couple of rare crash cases with file serving clients. +. prevent intro file sending when override triggered. +. make access log report query args on the request, but truncate the strings if too long. +. minor change to send limiter +. move incoming rate limiter to source client specific code. +. Add server-wide redirect tags. +. use per-source stats handles for updates, reduces lookups and avoids taking a + shared lock. +. minor log message changes. +. fix fallback trigger if source timeout occurs +. fix possible sources count inconsistency with failed relays +. entity expansion in stats conversion routines added +. fixup debug allocation code. not normally used. + 2.3.2-kh30 . stats updates. Split stats lock into global and source stats locks, reduces contention. diff --git a/admin/viewxml.xsl b/admin/viewxml.xsl new file mode 100644 index 00000000..8955283b --- /dev/null +++ b/admin/viewxml.xsl @@ -0,0 +1,75 @@ + + + + + + + + + + + NA + NA + + + + - + + NA + NA + NA + NA + NA + NA + + + + + + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + + + + + + () + + NA + + NA + NA + + + + + + + + 1259797160 + Little Texas - She's Got Her Daddy's Money + + + + + + + \ No newline at end of file diff --git a/configure.in b/configure.in index df277a71..1b1d37f6 100644 --- a/configure.in +++ b/configure.in @@ -1,4 +1,4 @@ -AC_INIT([Icecast], [2.3.2-kh30], [karl@xiph.org]) +AC_INIT([Icecast], [2.3.2-kh31], [karl@xiph.org]) LT_INIT AC_PREREQ(2.59) @@ -9,6 +9,7 @@ AM_INIT_AUTOMAKE AM_CONFIG_HEADER(config.h) AM_MAINTAINER_MODE +AC_PROG_CXX AC_PROG_CC AC_CANONICAL_HOST AC_PROG_LIBTOOL @@ -25,6 +26,7 @@ else PROFILE="-pg -g" AC_DEFINE([_GNU_SOURCE], 1, [Define to include GNU extensions to POSIX]) fi +AM_CONDITIONAL([WIN32], [test x$host_os = xmingw32]) dnl Checks for programs. @@ -34,9 +36,8 @@ dnl Checks for header files. AC_HEADER_STDC AC_HEADER_TIME -AC_CHECK_HEADERS([signal.h fnmatch.h limits.h sys/timeb.h]) +AC_CHECK_HEADERS([signal.h fnmatch.h limits.h sys/timeb.h malloc.h]) AC_CHECK_HEADERS(pwd.h, AC_DEFINE(CHUID, 1, [Define if you have pwd.h]),,) -AC_CHECK_HEADERS(unistd.h, AC_DEFINE(CHROOT, 1, [Define if you have unistd.h]),,) dnl Checks for typedefs, structures, and compiler characteristics. XIPH_C__FUNC__ @@ -46,10 +47,19 @@ AC_TYPE_OFF_T AC_CHECK_TYPES([struct timespec]) dnl Checks for library functions. -AC_CHECK_FUNCS([localtime_r poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync]) +AC_CHECK_DECLS([localtime_r],,, + [#include +#include ]) +AC_CHECK_FUNCS([fnmatch chroot fork poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync]) AC_CHECK_TYPES([struct signalfd_siginfo], [AC_DEFINE(HAVE_SIGNALFD, 1 ,[Define if signalfd exists])], [], [#include ]) +if test "x$ac_cv_func_fnmatch" != "xyes" +then +AC_CHECK_LIB(fnmatch, fnmatch, [XIPH_VAR_APPEND([XIPH_LIBS],["-lfnmatch"])], + [ AC_CHECK_LIB(iberty, fnmatch, [XIPH_VAR_APPEND([XIPH_LIBS],["-liberty"])]) + ]) +fi AC_SEARCH_LIBS(nanosleep, rt posix4, AC_DEFINE(HAVE_NANOSLEEP, 1, [Define if you have nanosleep])) AC_SEARCH_LIBS(clock_gettime, rt posix4, @@ -140,8 +150,10 @@ XIPH_PATH_OPENSSL([ [ AC_MSG_NOTICE([SSL disabled!]) ]) -ICECAST_OPTIONAL="$ICECAST_OPTIONAL auth_cmd.o" -AC_DEFINE([MIMETYPESFILE],"/etc/mime.types", [Default location of mime types file]) +if test "x$ac_cv_func_fork" = "xyes" +then + ICECAST_OPTIONAL="$ICECAST_OPTIONAL auth_cmd.o" +fi AC_DEFINE([ICECAST_TIME_FMT],["%a, %d %b %Y %H:%M:%S %z"], [time format for strftime]) dnl Make substitutions diff --git a/src/Makefile.am b/src/Makefile.am index 5c235125..5f3979c5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -4,7 +4,11 @@ AUTOMAKE_OPTIONS = foreign SUBDIRS = avl thread httpp net log timing -bin_PROGRAMS = icecast +if WIN32 +noinst_LIBRARIES = libicecast.a +else +bin_PROGRAMS = icecast +endif noinst_HEADERS = admin.h cfgfile.h logging.h sighandler.h connection.h \ global.h util.h slave.h source.h stats.h refbuf.h client.h \ @@ -27,6 +31,10 @@ icecast_DEPENDENCIES = @ICECAST_OPTIONAL@ net/libicenet.la thread/libicethread.l httpp/libicehttpp.la log/libicelog.la avl/libiceavl.la timing/libicetiming.la icecast_LDADD = $(icecast_DEPENDENCIES) @XIPH_LIBS@ @KATE_LIBS@ +libicecast_a_SOURCES = $(icecast_SOURCES) +libicecast_a_DEPENDENCIES = $(icecast_DEPENDENCIES) +libicecast_a_LIBADD = $(icecast_DEPENDENCIES) + AM_CFLAGS = @XIPH_CFLAGS@ AM_CPPFLAGS = @XIPH_CPPFLAGS@ AM_LDFLAGS = @XIPH_LDFLAGS@ @KATE_LIBS@ diff --git a/src/admin.c b/src/admin.c index 97190e90..ec5835d4 100644 --- a/src/admin.c +++ b/src/admin.c @@ -61,6 +61,9 @@ static int command_updatemetadata(client_t *client, source_t *source, int respon static int command_admin_function (client_t *client, int response); static int command_list_log (client_t *client, int response); static int command_manage_relay (client_t *client, int response); +#ifdef MY_ALLOC +static int command_alloc(client_t *client); +#endif static int admin_handle_general_request(client_t *client, const char *command); @@ -84,6 +87,9 @@ static struct admin_command admin_general[] = { "manageauth", RAW, { command_manageauth } }, { "listmounts", RAW, { command_list_mounts } }, { "function", RAW, { command_admin_function } }, +#ifdef MY_ALLOC + { "alloc", RAW, { command_alloc } }, +#endif { "streamlist.txt", TEXT, { command_list_mounts } }, { "streams", TEXT, { command_list_mounts } }, { "showlog.txt", TEXT, { command_list_log } }, @@ -592,50 +598,6 @@ static int command_manage_relay (client_t *client, int response) } -static void add_listener_node (xmlNodePtr srcnode, client_t *listener) -{ - const char *useragent; - char buf[30]; - - xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL); - - snprintf (buf, sizeof (buf), "%lu", listener->connection.id); - xmlSetProp (node, XMLSTR("id"), XMLSTR(buf)); - - xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip)); - - useragent = httpp_getvar (listener->parser, "user-agent"); - if (useragent && xmlCheckUTF8((unsigned char *)useragent)) - { - xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(useragent)); - xmlNewChild (node, NULL, XMLSTR("UserAgent"), str); - xmlFree (str); - } - - if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE) - { - source_t *source = listener->shared_data; - snprintf (buf, sizeof (buf), "%"PRIu64, source->client->queue_pos - listener->queue_pos); - } - else - snprintf (buf, sizeof (buf), "0"); - xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf)); - - if (listener->worker) - { - snprintf (buf, sizeof (buf), "%lu", - (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time)); - xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf)); - } - if (listener->username) - { - xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(listener->username)); - xmlNewChild (node, NULL, XMLSTR("username"), str); - xmlFree (str); - } -} - - /* populate within srcnode, groups of 0 or more listener tags detailing * information about each listener connected on the provide source. */ @@ -649,7 +611,7 @@ void admin_source_listeners (source_t *source, xmlNodePtr srcnode) while (node) { client_t *listener = (client_t *)node->key; - add_listener_node (srcnode, listener); + stats_listener_to_xml (listener, srcnode); node = avl_get_next (node); } } @@ -724,7 +686,7 @@ static int command_show_listeners (client_t *client, source_t *source, int respo client_t *listener = source_find_client (source, id); if (listener) - add_listener_node (srcnode, listener); + stats_listener_to_xml (listener, srcnode); } thread_mutex_unlock (&source->lock); @@ -1242,3 +1204,33 @@ static int command_updatemetadata(client_t *client, source_t *source, int respon return admin_send_response (doc, client, response, "updatemetadata.xsl"); } + +#ifdef MY_ALLOC +static int command_alloc(client_t *client) +{ + xmlDocPtr doc = xmlNewDoc (XMLSTR("1.0")); + xmlNodePtr rootnode = xmlNewDocNode(doc, NULL, XMLSTR("icestats"), NULL); + avl_node *node; + + xmlDocSetRootElement(doc, rootnode); + avl_tree_rlock (global.alloc_tree); + node = avl_get_first (global.alloc_tree); + while (node) + { + alloc_node *an = node->key; + char value[25]; + xmlNodePtr bnode = xmlNewChild (rootnode, NULL, XMLSTR("block"), NULL); + xmlSetProp (bnode, XMLSTR("name"), XMLSTR(an->name)); + snprintf (value, sizeof value, "%d", an->count); + xmlNewChild (bnode, NULL, XMLSTR("count"), XMLSTR(value)); + snprintf (value, sizeof value, "%d", an->allocated); + xmlNewChild (bnode, NULL, XMLSTR("allocated"), XMLSTR(value)); + + node = avl_get_next (node); + } + avl_tree_unlock (global.alloc_tree); + + return admin_send_response (doc, client, RAW, "stats.xsl"); +} +#endif + diff --git a/src/auth.c b/src/auth.c index cc2ab262..7573ca66 100644 --- a/src/auth.c +++ b/src/auth.c @@ -22,6 +22,9 @@ #include #include #include +#ifdef HAVE_MALLOC_H +#include +#endif #include "auth.h" #include "auth_htpasswd.h" @@ -382,34 +385,52 @@ static void *auth_run_thread (void *arg) int move_listener (client_t *client, struct _fbinfo *finfo) { - source_t *source, *prev; + source_t *source; + const char *mount = finfo->mount; + int rate = finfo->limit; + mount_proxy *minfo; + ice_config_t *config = config_get_config(); - DEBUG1 ("moving listener to %s", finfo->mount); avl_tree_rlock (global.source_tree); - source = source_find_mount (finfo->mount); - while (source) + source = source_find_mount_raw (mount); + while (1) { + minfo = config_find_mount (config, mount); + if (rate == 0 && minfo && minfo->limit_rate) + rate = minfo->limit_rate; + if (source == NULL) + break; thread_mutex_lock (&source->lock); if (source_available (source)) { + config_release_config(); avl_tree_unlock (global.source_tree); source_setup_listener (source, client); client->flags |= CLIENT_HAS_MOVED; thread_mutex_unlock (&source->lock); return 0; } - prev = source; - if (prev->fallback.mount) - source = source_find_mount (prev->fallback.mount); - else - source = NULL; - thread_mutex_unlock (&prev->lock); + mount = minfo ? minfo->fallback_mount : NULL; + thread_mutex_unlock (&source->lock); } + config_release_config(); avl_tree_unlock (global.source_tree); if (client->flags & CLIENT_IS_SLAVE) return -1; - DEBUG1 ("no source, trying %s as a file", finfo->mount); + if (finfo->flags & FS_OVERRIDE) + { + finfo->mount = finfo->fallback; + finfo->fallback = NULL; + finfo->flags &= ~FS_OVERRIDE; + } + if (finfo->limit == 0) + { + if (rate == 0) + if (sscanf (finfo->mount, "%*[^[][%d]", &rate) == 1) + rate = rate * 1000/8; + finfo->limit = rate; + } return fserve_setup_client_fb (client, finfo); } @@ -836,7 +857,7 @@ int auth_check_source (client_t *client, const char *mount) ret = -1; if (mountinfo->password) pass = mountinfo->password; - if (mountinfo->username) + if (mountinfo->username && client->server_conn->shoutcast_compat == 0) user = mountinfo->username; } if (connection_check_pass (client->parser, user, pass) > 0) @@ -858,6 +879,8 @@ void auth_initialise (void) void auth_shutdown (void) { + if (allow_auth == 0) + return; allow_auth = 0; thread_rwlock_wlock (&auth_lock); thread_rwlock_unlock (&auth_lock); diff --git a/src/auth.h b/src/auth.h index 781768f1..a5706f54 100644 --- a/src/auth.h +++ b/src/auth.h @@ -13,9 +13,6 @@ #ifndef __AUTH_H__ #define __AUTH_H__ -#ifdef HAVE_CONFIG_H -#include -#endif struct source_tag; struct auth_tag; diff --git a/src/auth_cmd.c b/src/auth_cmd.c index 4e4c74ad..4e0c301e 100644 --- a/src/auth_cmd.c +++ b/src/auth_cmd.c @@ -44,6 +44,7 @@ #include "client.h" #include "cfgfile.h" #include "httpp/httpp.h" +#include "global.h" #include "logging.h" #define CATMODULE "auth_cmd" diff --git a/src/auth_cmd.h b/src/auth_cmd.h index 5a1f9e88..40eefd4d 100644 --- a/src/auth_cmd.h +++ b/src/auth_cmd.h @@ -13,10 +13,6 @@ #ifndef __AUTH_CMD_H__ #define __AUTH_CMD_H__ -#ifdef HAVE_CONFIG_H -#include -#endif - int auth_get_cmd_auth (auth_t *, config_options_t *options); #endif diff --git a/src/auth_htpasswd.c b/src/auth_htpasswd.c index 287bb6d7..ab30e484 100644 --- a/src/auth_htpasswd.c +++ b/src/auth_htpasswd.c @@ -32,6 +32,7 @@ #include "cfgfile.h" #include "httpp/httpp.h" #include "md5.h" +#include "global.h" #include "logging.h" #define CATMODULE "auth_htpasswd" diff --git a/src/auth_htpasswd.h b/src/auth_htpasswd.h index 791251b4..fc40025d 100644 --- a/src/auth_htpasswd.h +++ b/src/auth_htpasswd.h @@ -13,9 +13,6 @@ #ifndef __AUTH_HTPASSWD_H__ #define __AUTH_HTPASSWD_H__ -#ifdef HAVE_CONFIG_H -#include -#endif int auth_get_htpasswd_auth (auth_t *auth, config_options_t *options); diff --git a/src/auth_url.c b/src/auth_url.c index 8a848cd8..52b62787 100644 --- a/src/auth_url.c +++ b/src/auth_url.c @@ -86,6 +86,7 @@ #include "cfgfile.h" #include "httpp/httpp.h" #include "mpeg.h" +#include "global.h" #include "logging.h" #define CATMODULE "auth_url" @@ -218,6 +219,17 @@ static int handle_returned_header (void *ptr, size_t size, size_t nmemb, void *s if (eol) *eol = '\0'; } + if (strncasecmp (ptr, "ice-username: ", 14) == 0) + { + int len = strcspn ((char*)ptr+14, "\r\n"); + char *name = malloc (len+1); + if (name) + { + snprintf (name, len+1, "%s", (char *)ptr+14); + free (client->username); + client->username = name; + } + } if (strncasecmp (ptr, "Location: ", 10) == 0) { int len = strcspn ((char*)ptr+10, "\r\n"); diff --git a/src/auth_url.h b/src/auth_url.h index 4ddec631..d9a6264e 100644 --- a/src/auth_url.h +++ b/src/auth_url.h @@ -13,9 +13,6 @@ #ifndef __AUTH_URL_H__ #define __AUTH_URL_H__ -#ifdef HAVE_CONFIG_H -#include -#endif int auth_get_url_auth (auth_t *authenticator, config_options_t *options); diff --git a/src/cfgfile.c b/src/cfgfile.c index dd8c23a0..0ca0de9e 100644 --- a/src/cfgfile.c +++ b/src/cfgfile.c @@ -62,11 +62,13 @@ #define CONFIG_DEFAULT_LOG_DIR "/usr/local/icecast/logs" #define CONFIG_DEFAULT_WEBROOT_DIR "/usr/local/icecast/webroot" #define CONFIG_DEFAULT_ADMINROOT_DIR "/usr/local/icecast/admin" +#define MIMETYPESFILE "/etc/mime.types" #else #define CONFIG_DEFAULT_BASE_DIR ".\\" #define CONFIG_DEFAULT_LOG_DIR ".\\logs" #define CONFIG_DEFAULT_WEBROOT_DIR ".\\webroot" #define CONFIG_DEFAULT_ADMINROOT_DIR ".\\admin" +#define MIMETYPESFILE ".\\mime.types" #endif static ice_config_t _current_configuration; @@ -157,9 +159,9 @@ int config_get_bitrate (xmlNodePtr node, void *x) return 1; sscanf ((char*)str, "%"SCNd64 "%c", p, &metric); if (metric == 'k' || metric == 'K') - (*p) *= 1024; + (*p) *= 1000; if (metric == 'm' || metric == 'M') - (*p) *= 1024*1024; + (*p) *= 10000000; xmlFree (str); } return 0; @@ -228,6 +230,19 @@ void config_init_configuration(ice_config_t *configuration) } +redirect_host *config_clear_redirect (redirect_host *redir) +{ + redirect_host *next = NULL; + if (redir) + { + next = redir->next; + xmlFree (redir->server); + free (redir); + } + return next; +} + + relay_server *config_clear_relay (relay_server *relay) { relay_server *next = relay->next; @@ -353,6 +368,9 @@ void config_clear(ice_config_t *c) while (c->relay) c->relay = config_clear_relay (c->relay); + while (c->redirect_hosts) + c->redirect_hosts = config_clear_redirect (c->redirect_hosts); + while (c->mounts) { mount_proxy *to_go = c->mounts; @@ -602,11 +620,13 @@ static int _parse_security (xmlNodePtr node, void *arg) static int _parse_accesslog (xmlNodePtr node, void *arg) { - access_log *log = arg; + struct access_log *log = arg; + char *type = NULL; struct cfg_tag icecast_tags[] = { { "name", config_get_str, &log->name }, { "ip", config_get_bool, &log->log_ip }, + { "type", config_get_str, &type }, { "archive", config_get_bool, &log->archive }, { "exclude_ext", config_get_str, &log->exclude_ext }, { "display", config_get_int, &log->display }, @@ -615,9 +635,16 @@ static int _parse_accesslog (xmlNodePtr node, void *arg) }; log->logid = -1; - return parse_xml_tags (node, icecast_tags); + log->type = LOG_ACCESS_CLF; + if (parse_xml_tags (node, icecast_tags)) + return 2; + if (type && strcmp (type, "CLF-ESC") == 0) + log->type = LOG_ACCESS_CLF_ESC; + xmlFree (type); + return 0; } + static int _parse_errorlog (xmlNodePtr node, void *arg) { error_log *log = arg; @@ -659,7 +686,6 @@ static int _parse_logging (xmlNodePtr node, void *arg) struct cfg_tag icecast_tags[] = { { "accesslog", _parse_accesslog, &config->access_log }, - { "errorlog", _parse_errorlog, &config->error_log }, { "playlistlog", _parse_playlistlog, &config->playlist_log }, { "accesslog", config_get_str, &config->access_log.name }, { "accesslog_ip", config_get_bool, &config->access_log.log_ip }, @@ -667,7 +693,8 @@ static int _parse_logging (xmlNodePtr node, void *arg) config_get_str, &config->access_log.exclude_ext }, { "accesslog_lines", config_get_int, &config->access_log.display }, - { "errorlog", config_get_str, &config->error_log }, + { "errorlog", _parse_errorlog, &config->error_log }, + { "errorlog", config_get_str, &config->error_log.name }, { "errorlog_lines", config_get_int, &config->error_log.display }, { "loglevel", config_get_int, &config->error_log.level }, { "playlistlog", config_get_str, &config->playlist_log }, @@ -678,6 +705,7 @@ static int _parse_logging (xmlNodePtr node, void *arg) { NULL, NULL, NULL } }; + config->access_log.type = LOG_ACCESS_CLF; config->access_log.logid = -1; config->access_log.display = 100; config->access_log.archive = -1; @@ -830,8 +858,10 @@ static int _parse_mount (xmlNodePtr node, void *arg) mount->url_ogg_meta = 1; mount->source_timeout = config->source_timeout; mount->file_seekable = 1; + mount->access_log.type = LOG_ACCESS_CLF; mount->access_log.logid = -1; mount->access_log.log_ip = 1; + mount->fallback_override = 1; if (parse_xml_tags (node, icecast_tags)) return -1; @@ -963,6 +993,36 @@ static int _parse_relay (xmlNodePtr node, void *arg) return 0; } + +static int _parse_redirect (xmlNodePtr node, void *arg) +{ + ice_config_t *config = arg; + redirect_host *redir = calloc (1, sizeof (*redir)); + + struct cfg_tag icecast_tags[] = + { + { "host", config_get_str, &redir->server }, + { "port", config_get_int, &redir->port }, + { NULL, NULL, NULL }, + }; + + do + { + redir->port = 8000; + if (parse_xml_tags (node, icecast_tags)) + break; + + if (redir->server == NULL || redir->port < 1 || redir->port > 65536) + break; + redir->next = config->redirect_hosts; + config->redirect_hosts = redir; + return 0; + } while (0); + free (redir); + return -1; +} + + static int _parse_limits (xmlNodePtr node, void *arg) { ice_config_t *config = arg; @@ -1095,6 +1155,7 @@ static int _parse_root (xmlNodePtr node, ice_config_t *config) { "master-ssl-port", config_get_int, &config->master_ssl_port }, { "master-redirect", config_get_bool, &config->master_redirect }, { "max-redirect-slaves",config_get_int, &config->max_redirects }, + { "redirect", _parse_redirect, config }, { "shoutcast-mount", config_get_str, &config->shoutcast_mount }, { "listen-socket", _parse_listen_sock, config }, { "limits", _parse_limits, config }, diff --git a/src/cfgfile.h b/src/cfgfile.h index 859c32ac..7ab687cd 100644 --- a/src/cfgfile.h +++ b/src/cfgfile.h @@ -30,18 +30,32 @@ typedef struct _listener_t listener_t; #include "auth.h" #include "compat.h" -typedef struct + +typedef struct _redirect_host +{ + struct _redirect_host *next; + time_t next_update; + char *server; + int port; +} redirect_host; + + +typedef struct access_log { char *name; int logid; int log_ip; + int type; int archive; int display; int size; char *exclude_ext; } access_log; -typedef struct +#define LOG_ACCESS_CLF 0 +#define LOG_ACCESS_CLF_ESC 1 + +typedef struct error_log { char *name; int logid; @@ -51,7 +65,7 @@ typedef struct int level; } error_log; -typedef struct +typedef struct playlist_log { char *name; int logid; @@ -131,7 +145,7 @@ typedef struct _mount_proxy { unsigned int max_stream_duration; unsigned int max_listener_duration; - access_log access_log; + struct access_log access_log; char *redirect; char *stream_name; @@ -175,13 +189,14 @@ typedef struct _relay_server_master char *mount; int port; int timeout; + int skip; } relay_server_master; typedef struct _relay_server { struct _relay_server *next, *new_details; struct source_tag *source; - relay_server_master *masters; + relay_server_master *masters, *in_use; char *username; char *password; char *localmount; @@ -240,8 +255,6 @@ typedef struct ice_config_tag listener_t *listen_sock; unsigned int listen_sock_count; - ice_master_details *master; - char *master_server; int master_server_port; int master_update_interval; @@ -252,6 +265,7 @@ typedef struct ice_config_tag int master_ssl_port; int master_redirect; int max_redirects; + struct _redirect_host *redirect_hosts; relay_server *relay; @@ -267,12 +281,12 @@ typedef struct ice_config_tag char *cert_file; char *webroot_dir; char *adminroot_dir; - aliases *aliases; + struct _aliases *aliases; unsigned slaves_count; - access_log access_log; - error_log error_log; - playlist_log playlist_log; + struct access_log access_log; + struct error_log error_log; + struct playlist_log playlist_log; int chroot; int chuid; diff --git a/src/compat.h b/src/compat.h index 15b2ee98..a34b0a49 100644 --- a/src/compat.h +++ b/src/compat.h @@ -36,18 +36,24 @@ #endif /* Make sure we define 64 bit types */ -#ifdef _WIN32 +#if defined(_WIN32) || defined(_MINGW32_) # define PATH_SEPARATOR "\\" # define size_t unsigned int # define ssize_t int # define uint32_t unsigned int +# define fseeko fseek +# include // for alloca +//# define alloca _alloca +# define SCN_OFF_T "ld" +# define PRI_OFF_T "ld" #else # define PATH_SEPARATOR "/" -# if defined(HAVE_INTTYPES_H) +#endif + +#if defined(HAVE_INTTYPES_H) # include -# elif defined(HAVE_STDINT_H) +#elif defined(HAVE_STDINT_H) # include -# endif #endif #endif /* __COMPAT_H__ */ diff --git a/src/connection.c b/src/connection.c index 00e511dd..d72ec740 100644 --- a/src/connection.c +++ b/src/connection.c @@ -31,11 +31,17 @@ #ifdef _MSC_VER #include #include -#else +#endif +#ifdef HAVE_SYS_SOCKET_H #include +#endif +#ifdef HAVE_NETINET_IN_H #include +#endif +#ifdef HAVE_NETDB_H #include #endif + #ifdef HAVE_SIGNALFD #include #include @@ -218,6 +224,7 @@ void connection_initialize(void) void connection_shutdown(void) { + connection_listen_sockets_close (NULL, 1); thread_spin_destroy (&_connection_lock); } @@ -342,6 +349,126 @@ int connection_send (connection_t *con, const void *buf, size_t len) return bytes; } + +#ifdef WIN32 +#define IO_VECTOR_LEN(x) ((x)->len) +#define IO_VECTOR_BASE(x) ((x)->buf) +#else +#define IO_VECTOR_LEN(x) ((x)->iov_len) +#define IO_VECTOR_BASE(x) ((x)->iov_base) +#endif + +void connection_bufs_init (struct connection_bufs *v, short start) +{ + memset (v, 0, sizeof (struct connection_bufs)); + if (start && start < 500) + { + v->block = calloc (start, sizeof (IOVEC)); + v->max = start; + } +} + + +void connection_bufs_release (struct connection_bufs *v) +{ + free (v->block); + memset (v, 0, sizeof (struct connection_bufs)); +} + + +void connection_bufs_flush (struct connection_bufs *v) +{ + v->count = 0; + v->total = 0; +} + + +int connection_bufs_append (struct connection_bufs *v, void *buf, unsigned int len) +{ + if (v->count >= v->max) + { + int len = v->max + 16; + IOVEC *arr = realloc (v->block, (len*sizeof(IOVEC))); + v->max = len; + v->block = arr; + } + IO_VECTOR_BASE (v->block + v->count) = buf; + IO_VECTOR_LEN (v->block + v->count) = len; + v->count++; + v->total += len; + return v->total; + +} + + +static int connbufs_locate_start (struct connection_bufs *vects, int skip, IOVEC *old_value, int *offp) +{ + int sum = 0, i = vects->count; + IOVEC *p = vects->block; + + if (skip < vects->total) + { + for (; i; i--) + { + if (sum + IO_VECTOR_LEN(p) > skip) + { + int offset = skip - sum; + if (offset) + { + *old_value = *p; + IO_VECTOR_BASE(p) += offset; + IO_VECTOR_LEN(p) -= offset; + } + *offp = offset; + return p - vects->block; + } + sum += IO_VECTOR_LEN(p); + p++; + } + } + return -1; +} + + +int connection_bufs_send (connection_t *con, struct connection_bufs *vectors, int skip) +{ + IOVEC *p = vectors->block, old_vals; + int i = vectors->count, offset = 0, ret = -1; + + i = connbufs_locate_start (vectors, skip, &old_vals, &offset); + p = vectors->block + i; + + if (i >= 0) + { + if (not_ssl_connection (con)) + { + ret = sock_writev (con->sock, p, vectors->count - i); + if (ret < 0 && !sock_recoverable (sock_error())) + con->error = 1; + } +#ifdef HAVE_OPENSSL + else + { + IOVEC *io = p; + int bytes = 0; + for (; i < vectors->count; i++, io++) + { + int v = connection_send_ssl (con, IO_VECTOR_BASE(io), IO_VECTOR_LEN(io)); + if (v > 0) bytes += v; + if (v < IO_VECTOR_LEN(io)) break; + } + if (bytes > 0) ret = bytes; + } +#endif + if (offset) + *p = old_vals; + if (ret > 0) + con->sent_bytes += ret; + } + return ret; +} + + static void add_generic_text (avl_tree *t, const char *ip, time_t now) { char *str = strdup (ip); @@ -641,7 +768,7 @@ static sock_t wait_for_serversock (void) } #else fd_set rfds; - struct timeval tv, *p=NULL; + struct timeval tv; int i, ret; sock_t max = SOCK_ERROR; @@ -697,7 +824,7 @@ static client_t *accept_client (void) int i, num; refbuf_t *r; - if (sock_set_blocking (sock, 0) || sock_set_nodelay (sock)) + if (sock_set_blocking (sock, 0)) // || sock_set_nodelay (sock)) { WARN0 ("failed to set tcp options on client connection, dropping"); break; @@ -885,7 +1012,8 @@ static int http_client_request (client_t *client) if (agent && avl_get_by_key (useragents.contents, (char *)agent, &result) == 0) { - INFO1 ("dropping client because useragent is %s", agent); + INFO2 ("dropping client at %s because useragent is %s", + client->connection.ip, agent); return -1; } } @@ -977,8 +1105,7 @@ static void *connection_thread (void *arg) useragents.filename = strdup (config->agentfile); get_ssl_certificate (config); - if (config->chuid == 0) - connection_setup_sockets (config); + connection_setup_sockets (config); header_timeout = config->header_timeout; config_release_config (); @@ -1020,6 +1147,7 @@ static void *connection_thread (void *arg) return NULL; } + void connection_thread_startup () { #ifdef HAVE_SIGNALFD @@ -1034,6 +1162,7 @@ void connection_thread_startup () conn_tid = thread_create ("connection", connection_thread, NULL, THREAD_ATTACHED); } + void connection_thread_shutdown () { if (conn_tid) @@ -1375,41 +1504,71 @@ static int _handle_get_request (client_t *client) } -/* close any open listening sockets and reopen new listener sockets based on the settings - * in the configuration. +/* close any open listening sockets */ +void connection_listen_sockets_close (ice_config_t *config, int all_sockets) +{ + if (global.serversock) + { + int old = 0, new = 0, cur = global.server_sockets; + for (; old < cur; old++) + { + // close all listening sockets unless privileged ones are to stay open + // and it is still present in the config. + if (config && all_sockets == 0 && global.server_conn [old]->port < 1024) + { + listener_t *listener = config->listen_sock; + while (listener && listener->port != global.server_conn [old]->port) + listener = listener->next; + if (listener) + { + INFO2 ("Leaving port %d (%s) open", listener->port, + listener->bind_address ? listener->bind_address : ""); + if (new < old) + { + global.server_conn [new] = global.server_conn [old]; + global.serversock [new] = global.serversock [old]; + new++; + } + continue; + } + } + INFO1 ("Closing port %d", global.server_conn [old]->port); + sock_close (global.serversock [old]); + global.serversock [old] = SOCK_ERROR; + config_clear_listener (global.server_conn [old]); + global.server_sockets--; + } + if (global.server_sockets == 0) + { + free (global.serversock); + global.serversock = NULL; + free (global.server_conn); + global.server_conn = NULL; + } + } +} + + int connection_setup_sockets (ice_config_t *config) { int count = 0; listener_t *listener, **prev; + void *tmp; - global_lock(); - /* place sockets away from config, so we don't need to take config lock - * in the accept loop. */ - if (global.serversock) - { - for (; count < global.server_sockets; count++) - { - sock_close (global.serversock [count]); - config_clear_listener (global.server_conn [count]); - } - free (global.serversock); - global.serversock = NULL; - free (global.server_conn); - global.server_conn = NULL; - } - if (config == NULL) - { - global_unlock(); + if (global.server_sockets >= config->listen_sock_count) return 0; - } + global_lock(); - count = 0; - global.serversock = calloc (config->listen_sock_count, sizeof (sock_t)); - global.server_conn = calloc (config->listen_sock_count, sizeof (listener_t*)); + tmp = realloc (global.serversock, (config->listen_sock_count*sizeof (sock_t))); + if (tmp) global.serversock = tmp; + + tmp = realloc (global.server_conn, (config->listen_sock_count*sizeof (listener_t*))); + if (tmp) global.server_conn = tmp; listener = config->listen_sock; prev = &config->listen_sock; + count = global.server_sockets; while (listener) { int successful = 0; @@ -1429,6 +1588,8 @@ int connection_setup_sockets (ice_config_t *config) sock_set_send_buffer (sock, listener->so_sndbuf); sock_set_blocking (sock, 0); successful = 1; + if (count >= config->listen_sock_count) + abort(); global.serversock [count] = sock; global.server_conn [count] = listener; listener->refcount++; diff --git a/src/connection.h b/src/connection.h index 2039deb2..1b442982 100644 --- a/src/connection.h +++ b/src/connection.h @@ -13,6 +13,10 @@ #ifndef __CONNECTION_H__ #define __CONNECTION_H__ +#ifdef HAVE_WINSOCK2_H +#include +#endif + #include #include #ifdef HAVE_OPENSSL @@ -46,6 +50,15 @@ struct connection_tag char *ip; }; + +struct connection_bufs +{ + short count, max; + int total; + IOVEC *block; +}; + + #ifdef HAVE_OPENSSL #define not_ssl_connection(x) ((x)->ssl==NULL) #else @@ -63,6 +76,14 @@ void connection_uses_ssl (connection_t *con); void connection_add_banned_ip (const char *ip, int duration); void connection_release_banned_ip (const char *ip); void connection_stats (void); + +void connection_bufs_init (struct connection_bufs *vectors, short start); +void connection_bufs_release (struct connection_bufs *v); +void connection_bufs_flush (struct connection_bufs *v); +int connection_bufs_append (struct connection_bufs *vectors, void *buf, unsigned int len); +int connection_bufs_read (connection_t *con, struct connection_bufs *vecs, int skip); +int connection_bufs_send (connection_t *con, struct connection_bufs *vecs, int skip); + #ifdef HAVE_OPENSSL int connection_read_ssl (connection_t *con, void *buf, size_t len); int connection_send_ssl (connection_t *con, const void *buf, size_t len); @@ -76,6 +97,7 @@ int connection_check_relay_pass(http_parser_t *parser); int connection_check_admin_pass(http_parser_t *parser); void connection_close_sigfd (void); +void connection_listen_sockets_close (struct ice_config_tag *config, int all_sockets); extern int connection_running; diff --git a/src/event.c b/src/event.c index ceaece9a..3f25c6c3 100644 --- a/src/event.c +++ b/src/event.c @@ -64,14 +64,17 @@ void event_config_read (void) config_set_config (&new_config, &old_config); config_release_config(); + connection_thread_shutdown(); + redirector_clearall(); config = config_get_config(); yp_recheck_config (config); fserve_recheck_mime_types (config); stats_global (config); workers_adjust (config->workers_count); + connection_listen_sockets_close (config, 0); + redirector_setup (config); config_release_config(); - connection_thread_shutdown(); slave_restart(); config_clear (&old_config); } diff --git a/src/flv.c b/src/flv.c index ded916a5..6637f772 100644 --- a/src/flv.c +++ b/src/flv.c @@ -32,6 +32,7 @@ #include "logging.h" #include "mpeg.h" #include "format_mp3.h" +#include "global.h" #define CATMODULE "flv" @@ -105,11 +106,13 @@ static int flv_mpX_hdr (struct mpeg_sync *mp, unsigned char *frame, unsigned int flv->tag[15] |= 0x1; } memcpy (mp->raw->data + mp->raw_offset, &flv->tag[0], 16); + connection_bufs_append (&flv->bufs, mp->raw->data + mp->raw_offset, 16); flv->samples += mp->sample_count; flv->prev_ms = (int64_t)(flv->samples / (mp->samplerate/1000.0)); // The extra byte is for the flv audio id, usually 0x2F flv->prev_tagsize = (len + FLVHEADER + 1); mp->raw_offset += 16; + connection_bufs_append (&flv->bufs, frame, len); return 0; } @@ -132,11 +135,13 @@ static int flv_aac_hdr (struct mpeg_sync *mp, unsigned char *frame, unsigned int flv_hdr (flv, len + 2); // a single frame (headerless) follows this memcpy (mp->raw->data + mp->raw_offset, &flv->tag[0], 17); + connection_bufs_append (&flv->bufs, mp->raw->data + mp->raw_offset, 17); flv->samples += mp->sample_count; flv->prev_ms = (int64_t)(flv->samples / (mp->samplerate/1000.0)); // frame length + FLVHEADER + AVHEADER flv->prev_tagsize = (len + 11 + 2); mp->raw_offset += 17; + connection_bufs_append (&flv->bufs, frame, len); return 0; } @@ -171,6 +176,7 @@ static int flv_aac_firsthdr (struct mpeg_sync *mp, unsigned char *frame, unsigne flv->tag[15] = 0xAF; // AAC audio, need these codes first flv->tag[16] = 0x0; memcpy (mp->raw->data, &flv->tag[0], 11+4+2+c); + connection_bufs_append (&flv->bufs, mp->raw->data, 11+4+2+c); mp->raw_offset = 11+4+2+c; flv->prev_tagsize = 11 + 2 + c; flv->tag[16] = 0x01; // as per spec. headerless frame follows this @@ -184,31 +190,125 @@ static int flv_aac_firsthdr (struct mpeg_sync *mp, unsigned char *frame, unsigne static int send_flv_buffer (client_t *client, struct flv *flv) { int ret = 0; - char *buf = flv->mpeg_sync.raw->data + flv->block_pos; - unsigned int len = flv->mpeg_sync.raw_offset - flv->block_pos; + unsigned int len = flv->bufs.total - flv->block_pos; if (len > 0) { - if (len > 1400) - len = 1400; - ret = client_send_bytes (client, buf, len); + ret = connection_bufs_send (&client->connection, &flv->bufs, flv->block_pos); if (ret < (int)len) - client->schedule_ms += (ret ? 50 : 250); + client->schedule_ms += (ret > 0 ? 50 : 200); if (ret > 0) flv->block_pos += ret; } - if (flv->block_pos == flv->mpeg_sync.raw_offset) + if (flv->block_pos == flv->bufs.total) + { flv->block_pos = flv->mpeg_sync.raw_offset = 0; + connection_bufs_flush (&flv->bufs); + } return ret; } +void flv_write_metadata (struct flv *flv, refbuf_t *scmeta, const char *mount) +{ + /* the first assoc block is shoutcast meta, second is flv meta */ + int len; + struct flvmeta *flvm; + unsigned char prev_type = flv->tag[4]; + refbuf_t *flvmeta = NULL; + int meta_copied = 0; + refbuf_t *raw = flv->mpeg_sync.raw; + char *src, *dst = raw->data + flv->mpeg_sync.raw_offset; + + if (scmeta) + flvmeta = scmeta->associated; + if (flvmeta == NULL) + { + char *value = stats_get_value (mount, "server_name"); + + flvmeta = flv_meta_allocate (200); + if (value) + flv_meta_append_string (flvmeta, "name", value); + free (value); + value = stats_get_value (flv->mpeg_sync.mount, "title"); + if (value) + flv_meta_append_string (flvmeta, "title", value); + else + flv_meta_append_string (flvmeta, "title", ""); + free (value); + value = stats_get_value (mount, "audio_codecid"); + if (value) + { + int id = atoi (value); + if (id == 2 || id == 10) + flv_meta_append_number (flvmeta, "audiocodecid", (double)id); + free (value); + } + value = stats_get_value (mount, "ice-bitrate"); + if (value) + { + double rate = (double)atoi (value); + flv_meta_append_number (flvmeta, "audiodatarate", rate); + free (value); + } + value = stats_get_value (mount, "ice-samplerate"); + if (value) + { + double rate = (double)atoi (value); + flv_meta_append_number (flvmeta, "audiosamplerate", rate); + free (value); + } + value = stats_get_value (mount, "ice-channels"); + if (value) + { + int chann = atoi (value); + flv_meta_append_bool (flvmeta, "stereo", chann == 2 ? 1 : 0); + free (value); + } + flv_meta_append_string (flvmeta, NULL, NULL); + flvm = (struct flvmeta *)flvmeta->data; + meta_copied = flvm->meta_pos - sizeof (*flvm); + } + else + flvm = (struct flvmeta *)flvmeta->data; + len = flvm->meta_pos - sizeof (*flvm); + src = (char *)flvm + sizeof (*flvm); + + if (meta_copied + 15 > raw->len) + { + int newlen = raw->len + 1024; + void *p = realloc (raw->data, newlen); + if (p == NULL) + return; + raw->data = p; + raw->len = newlen; + } + flv->tag[4] = 18; // metadata + flv_hdr (flv, len); + memcpy (dst, &flv->tag[0], 15); + connection_bufs_append (&flv->bufs, dst, 15); + flv->mpeg_sync.raw_offset += 15; + dst += 15; + if (meta_copied) + { + memcpy (dst, src, len); + connection_bufs_append (&flv->bufs, dst, len); + flv->mpeg_sync.raw_offset += len; + refbuf_release (flvmeta); + } + else + connection_bufs_append (&flv->bufs, src, len); + flv->prev_tagsize = len + 11; + flv->tag[4] = prev_type; +} + + int write_flv_buf_to_client (client_t *client) { refbuf_t *ref = client->refbuf, *scmeta = ref->associated; mp3_client_data *client_mp3 = client->format_data; struct flv *flv = client_mp3->specific; - int ret, meta_to_free = 0; + int ret; if (client->pos >= ref->len) { @@ -216,99 +316,27 @@ int write_flv_buf_to_client (client_t *client) client->pos = ref->len; return -1; } + /* check for metadata updates and insert if needed */ - if (flv->seen_metadata != scmeta) - { - /* the first assoc block is shoutcast meta, second is flv meta */ - int len; - char *src, *dst = flv->mpeg_sync.raw->data; - refbuf_t *flvmeta = NULL; - struct flvmeta *flvm; - - if (scmeta) - flvmeta = scmeta->associated; - if (flvmeta == NULL) - { - char *value = stats_get_value (client->mount, "server_name"); - - flvmeta = flv_meta_allocate (200); - meta_to_free = 1; - if (value) - flv_meta_append_string (flvmeta, "name", value); - free (value); - value = stats_get_value (flv->mpeg_sync.mount, "title"); - if (value) - flv_meta_append_string (flvmeta, "title", value); - else - flv_meta_append_string (flvmeta, "title", ""); - free (value); - flv_meta_append_string (flvmeta, "title", value); - value = stats_get_value (client->mount, "audio_codecid"); - if (value) - { - int id = atoi (value); - if (id == 2 || id == 10) - flv_meta_append_number (flvmeta, "audiocodecid", (double)id); - free (value); - } - value = stats_get_value (client->mount, "ice-bitrate"); - if (value) - { - double rate = (double)atoi (value); - flv_meta_append_number (flvmeta, "audiodatarate", rate); - free (value); - } - value = stats_get_value (client->mount, "ice-samplerate"); - if (value) - { - double rate = (double)atoi (value); - flv_meta_append_number (flvmeta, "audiosamplerate", rate); - free (value); - } - value = stats_get_value (client->mount, "ice-channels"); - if (value) - { - int chann = atoi (value); - flv_meta_append_bool (flvmeta, "stereo", chann == 2 ? 1 : 0); - free (value); - } - flv_meta_append_string (flvmeta, NULL, NULL); - } - flvm = (struct flvmeta *)flvmeta->data; - src = (char *)flvm + sizeof (*flvm); - len = flvm->meta_pos - sizeof (*flvm); - - if (len + 15 < flv->mpeg_sync.raw->len) - { - unsigned char prev_type = flv->tag[4]; - flv->tag[4] = 18; // metadata - flv_hdr (flv, len); - memcpy (dst, &flv->tag[0], 15); - memcpy (dst+15, src, len); - flv->mpeg_sync.raw_offset = len + 15; - flv->prev_tagsize = len + 11; - flv->tag[4] = prev_type; - flv->seen_metadata = ref->associated; - if (meta_to_free) refbuf_release (flvmeta); - return send_flv_buffer (client, flv); - } - if (meta_to_free) refbuf_release (flvmeta); - flv->seen_metadata = ref->associated; - } - if (flv->mpeg_sync.raw_offset == 0) { int unprocessed = mpeg_complete_frames (&flv->mpeg_sync, ref, client->pos); + if (unprocessed < 0) return -1; if (unprocessed > 0) ref->len += unprocessed; /* output was truncated, so revert changes */ + + if (flv->seen_metadata != scmeta) + flv_write_metadata (flv, scmeta, client->mount); } ret = send_flv_buffer (client, flv); if (flv->mpeg_sync.raw_offset == 0) { client->pos = ref->len; client->queue_pos += client->refbuf->len; + if (flv->seen_metadata != scmeta) + flv->seen_metadata = scmeta; } return ret; } @@ -458,7 +486,8 @@ void flv_create_client_data (format_plugin_t *plugin, client_t *client) "\r\n" "FLV\x1\x4%c%c%c\x9", 0,0,0); - flv->mpeg_sync.raw = refbuf_new (4096); + // only flv headers in here, allows for up to 64 frames per read block, expandable + flv->mpeg_sync.raw = refbuf_new (1024); flv->tag[4] = 8; // Audio details only if (plugin->type == FORMAT_TYPE_AAC) { @@ -474,6 +503,7 @@ void flv_create_client_data (format_plugin_t *plugin, client_t *client) client->respcode = 200; client->refbuf->len = bytes; + connection_bufs_init (&flv->bufs, 10); } @@ -481,5 +511,6 @@ void free_flv_client_data (struct flv *flv) { flv->mpeg_sync.mount = NULL; mpeg_cleanup (&flv->mpeg_sync); + connection_bufs_release (&flv->bufs); } diff --git a/src/flv.h b/src/flv.h index 3fdb6d1a..86e242a4 100644 --- a/src/flv.h +++ b/src/flv.h @@ -25,6 +25,7 @@ struct flv int64_t samples; refbuf_t *seen_metadata; mpeg_sync mpeg_sync; + struct connection_bufs bufs; unsigned char tag[30]; }; diff --git a/src/format_flac.c b/src/format_flac.c index 98f18363..55002481 100644 --- a/src/format_flac.c +++ b/src/format_flac.c @@ -27,6 +27,7 @@ typedef struct source_tag source_t; #include "format_ogg.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-flac" #include "logging.h" diff --git a/src/format_kate.c b/src/format_kate.c index 3587a784..1fc799c8 100644 --- a/src/format_kate.c +++ b/src/format_kate.c @@ -31,6 +31,7 @@ typedef struct source_tag source_t; #include "format_kate.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-kate" #include "logging.h" diff --git a/src/format_midi.c b/src/format_midi.c index 2afd833c..1181da21 100644 --- a/src/format_midi.c +++ b/src/format_midi.c @@ -27,6 +27,7 @@ typedef struct source_tag source_t; #include "format_ogg.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-midi" #include "logging.h" diff --git a/src/format_mp3.c b/src/format_mp3.c index 68778105..1e18d09c 100644 --- a/src/format_mp3.c +++ b/src/format_mp3.c @@ -41,6 +41,7 @@ #include "format_mp3.h" #include "flv.h" #include "mpeg.h" +#include "global.h" #define CATMODULE "format-mp3" @@ -64,8 +65,9 @@ static int mpeg_process_buffer (client_t *client, format_plugin_t *plugin); /* client format flags */ -#define CLIENT_IN_METADATA CLIENT_FORMAT_BIT -#define CLIENT_USING_BLANK_META (CLIENT_FORMAT_BIT<<1) +#define CLIENT_INTERNAL_FORMAT (CLIENT_FORMAT_BIT << 4) +#define CLIENT_IN_METADATA (CLIENT_INTERNAL_FORMAT) +#define CLIENT_USING_BLANK_META (CLIENT_INTERNAL_FORMAT<<1) static refbuf_t blank_meta = { 0, 1, NULL, NULL, "\001StreamTitle='';", 17 }; @@ -138,7 +140,10 @@ static void mp3_set_tag (format_plugin_t *plugin, const char *tag, const char *i if (in_value) { - value = util_conv_string (in_value, charset, plugin->charset); + if (charset == NULL && plugin->charset) + charset = plugin->charset; + if (charset && (strcasecmp (charset, "utf-8") && strcasecmp (charset, "utf8"))) + value = util_conv_string (in_value, charset, "UTF8"); if (value == NULL) value = strdup (in_value); } @@ -450,6 +455,8 @@ static int send_stream_metadata (client_t *client, refbuf_t *refbuf) * to merge them into 1 write call */ block_len = refbuf->len - client->pos; + if (block_len > client_mp3->interval) + block_len = client_mp3->interval; // handle small intervals merge = alloca (block_len + meta_len); memcpy (merge, metadata, meta_len); @@ -502,7 +509,7 @@ static int format_mp3_write_buf_to_client (client_t *client) ret = client_send_bytes (client, buf, len); if (ret < len) - client->schedule_ms += 50; + client->schedule_ms += 80; if (ret > 0) { client_mp3->since_meta_block += ret; @@ -629,30 +636,33 @@ static int validate_mpeg (source_t *source, refbuf_t *refbuf) if (source_mp3->inline_metadata_interval > 0) { - /* inline metadata may need reading before the rest of the mpeg data */ - if (source_mp3->build_metadata_len == 0 && source_mp3->offset > unprocessed) - { - source_mp3->offset -= unprocessed; - } - else + if (source_mp3->inline_metadata_interval <= source_mp3->offset) { + // reached meta but we have a frame fragment, so keep it for later leftover = refbuf_new (unprocessed); memcpy (leftover->data, refbuf->data + refbuf->len, unprocessed); - mpeg_data_insert (mpeg_sync, leftover); /* will need to merge this after metadata */ - return 0; + mpeg_data_insert (mpeg_sync, leftover); + client->pos = 0; + return refbuf->len ? 0 : -1; } + // not reached the metadata block so save and rewind for completing the read + source_mp3->offset -= unprocessed; } /* make sure the new block has a minimum of queue_block_size */ if (unprocessed < source_mp3->queue_block_size) len = source_mp3->queue_block_size; else - len = unprocessed + 2000; + len = unprocessed + 1000; leftover = refbuf_new (len); memcpy (leftover->data, refbuf->data + refbuf->len, unprocessed); source_mp3->read_data = leftover; source_mp3->read_count = unprocessed; + client->pos = unprocessed; } + else + client->pos = 0; + if (source->format->read_bytes < 2500) stats_event_args (source->mount, "audio_codecid", "%d", (mpeg_sync->layer ? 2 : 10)); return refbuf->len ? 0 : -1; @@ -670,6 +680,8 @@ static refbuf_t *mp3_get_no_meta (source_t *source) return NULL; refbuf = source_mp3->read_data; + refbuf->len = source_mp3->read_count; + source_mp3->read_count = 0; source_mp3->read_data = NULL; if (client->format_data && validate_mpeg (source, refbuf) < 0) @@ -708,6 +720,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source) /* fill the buffer with the read data */ bytes = source_mp3->read_count; refbuf->len = 0; + while (bytes > 0) { unsigned int metadata_remaining; @@ -771,7 +784,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source) source_mp3->build_metadata_len = 0; } /* the data we have just read may of just been metadata */ - if (refbuf->len == 0) + if (refbuf->len <= 0) { refbuf_release (refbuf); return NULL; diff --git a/src/format_ogg.c b/src/format_ogg.c index ec0d88d6..f6219904 100644 --- a/src/format_ogg.c +++ b/src/format_ogg.c @@ -44,6 +44,7 @@ #include "format_flac.h" #include "format_kate.h" #include "format_skeleton.h" +#include "global.h" #define CATMODULE "format-ogg" #include "logging.h" @@ -524,7 +525,9 @@ static int send_ogg_headers (client_t *client, refbuf_t *headers) unsigned len = refbuf->len - client_data->pos; int ret = -1; - if (len < 8000 && client->connection.error == 0) + if (len > 8192) + len = 8192; + if (client->connection.error == 0) ret = client_send_bytes (client, data, len); if (ret > 0) { diff --git a/src/format_skeleton.c b/src/format_skeleton.c index b4df9c03..0e48be77 100644 --- a/src/format_skeleton.c +++ b/src/format_skeleton.c @@ -28,6 +28,7 @@ typedef struct source_tag source_t; #include "format_skeleton.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-skeleton" #include "logging.h" diff --git a/src/format_speex.c b/src/format_speex.c index fd513791..7c023a7b 100644 --- a/src/format_speex.c +++ b/src/format_speex.c @@ -26,6 +26,7 @@ typedef struct source_tag source_t; #include "format_speex.h" #include "refbuf.h" #include "client.h" +#include "global.h" #define CATMODULE "format-speex" #include "logging.h" diff --git a/src/format_theora.c b/src/format_theora.c index 6124b5a3..cc7cb784 100644 --- a/src/format_theora.c +++ b/src/format_theora.c @@ -27,6 +27,7 @@ #include "format_theora.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-theora" #include "logging.h" diff --git a/src/format_vorbis.c b/src/format_vorbis.c index 9a37de68..6294e1c9 100644 --- a/src/format_vorbis.c +++ b/src/format_vorbis.c @@ -30,6 +30,7 @@ #include "format_ogg.h" #include "stats.h" #include "format.h" +#include "global.h" #define CATMODULE "format-vorbis" #include "logging.h" diff --git a/src/fserve.c b/src/fserve.c index 7db6144d..55bb196d 100644 --- a/src/fserve.c +++ b/src/fserve.c @@ -14,6 +14,7 @@ #include #endif +#include "compat.h" #include #include #include @@ -27,15 +28,21 @@ #include #endif -#ifndef _WIN32 -#include -#include -#include -#define SCN_OFF_T SCNdMAX -#define PRI_OFF_T PRIdMAX -#else +#ifdef _MSC_VER #include #include +#else +#include +#include +# ifdef HAVE_SYS_SOCKET_H +# include +# endif +# ifndef SCN_OFF_T +# define SCN_OFF_T SCNdMAX +# endif +# ifndef PRI_OFF_T +# define PRI_OFF_T PRIdMAX +# endif #endif #include "thread/thread.h" @@ -53,7 +60,6 @@ #include "cfgfile.h" #include "util.h" #include "admin.h" -#include "compat.h" #include "slave.h" #include "fserve.h" @@ -178,6 +184,7 @@ char *fserve_content_type (const char *path) return type; } + static int _compare_fh(void *arg, void *a, void *b) { fh_node *x = a, *y = b; @@ -189,6 +196,7 @@ static int _compare_fh(void *arg, void *a, void *b) return 0; } + static int _delete_fh (void *mapping) { fh_node *fh = mapping; @@ -217,12 +225,33 @@ static int _delete_fh (void *mapping) } +static void remove_fh_from_cache (fh_node *fh) +{ + avl_tree_wlock (fh_cache); + avl_delete (fh_cache, fh, NULL); + avl_tree_unlock (fh_cache); + fh->peak = 0; +} + + static void remove_from_fh (fh_node *fh, client_t *client) { avl_delete (fh->clients, client, NULL); } +static fh_node *find_fh (fbinfo *finfo) +{ + fh_node fh, *result = NULL; + if (finfo->mount == NULL) + finfo->mount = ""; + memcpy (&fh.finfo, finfo, sizeof (fbinfo)); + if (avl_get_by_key (fh_cache, &fh, (void**)&result) == 0) + return result; + return NULL; +} + + /* find/create handle and return it with the structure in a locked state */ static fh_node *open_fh (fbinfo *finfo, client_t *client) { @@ -254,10 +283,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) if (result->format) { if (result->format->create_client_data && client->format_data == NULL) - { result->format->create_client_data (result->format, client); + if (result->format->write_buf_to_client) client->check_buffer = result->format->write_buf_to_client; - } } } DEBUG2 ("refcount now %d for %s", result->refcount, result->finfo.mount); @@ -271,9 +299,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) if (client) { if (finfo->flags & FS_FALLBACK) - DEBUG1 ("lookup of fallback file \"%s\"", finfo->mount); + INFO2 ("lookup of fallback file \"%s\" (%d)", finfo->mount, finfo->limit); else - DEBUG1 ("lookup of \"%s\"", finfo->mount); + INFO1 ("lookup of \"%s\"", finfo->mount); } fh->fp = fopen (fullpath, "rb"); if (fh->fp == NULL) @@ -292,7 +320,8 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) { char *contenttype = fserve_content_type (fullpath); - stats_event (finfo->mount, "fallback", "file"); + stats_event_flags (finfo->mount, "fallback", "file", STATS_COUNTERS|STATS_HIDDEN); + stats_event_flags (finfo->mount, "outgoing_kbitrate", "0", STATS_COUNTERS|STATS_HIDDEN); fh->format = calloc (1, sizeof (format_plugin_t)); fh->format->type = format_get_type (contenttype); free (contenttype); @@ -305,10 +334,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) return NULL; } if (fh->format->create_client_data && client->format_data == NULL) - { fh->format->create_client_data (fh->format, client); + if (fh->format->write_buf_to_client) client->check_buffer = fh->format->write_buf_to_client; - } } free (fullpath); } @@ -321,8 +349,8 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) { if (finfo->mount && (finfo->flags & FS_FALLBACK)) { - stats_event_flags (fh->finfo.mount, "listeners", "1", STATS_HIDDEN); - stats_event_flags (fh->finfo.mount, "listener_peak", "1", STATS_HIDDEN); + stats_event_flags (fh->finfo.mount, "listeners", "1", STATS_GENERAL|STATS_HIDDEN); + stats_event_flags (fh->finfo.mount, "listener_peak", "1", STATS_GENERAL|STATS_HIDDEN); } avl_insert (fh->clients, client); } @@ -595,20 +623,16 @@ static void fh_release (fh_node *fh) if (fh->refcount > 1) { fh->refcount--; + stats_event_args (fh->finfo.mount, "listeners", "%ld", fh->refcount); thread_mutex_unlock (&fh->lock); return; } - /* now we will probably remove the fh, but to prevent a deadlock with - * open_fh, we drop the node lock and acquire the tree and node locks - * in that order and only remove if the refcount is still 0 */ - thread_mutex_unlock (&fh->lock); - avl_tree_wlock (fh_cache); - thread_mutex_lock (&fh->lock); - if (fh->refcount > 1) - thread_mutex_unlock (&fh->lock); - else - avl_delete (fh_cache, fh, _delete_fh); - avl_tree_unlock (fh_cache); + stats_event (fh->finfo.mount, "fallback", NULL); + stats_event (fh->finfo.mount, NULL, NULL); + if (fh->peak) + remove_fh_from_cache (fh); + // leave as locked + _delete_fh (fh); } @@ -686,10 +710,10 @@ static void fserve_move_listener (client_t *client) remove_from_fh (fh, client); if (fh->refcount == 1) stats_event (fh->finfo.mount, NULL, NULL); - f.flags = fh->finfo.flags; + f.flags = fh->finfo.flags|FS_OVERRIDE; f.limit = fh->finfo.limit; f.mount = fh->finfo.fallback; - f.fallback = NULL; + f.fallback = fh->finfo.mount; client->intro_offset = -1; move_listener (client, &f); fh_release (fh); @@ -857,14 +881,17 @@ static int throttled_file_send (client_t *client) return 0; } if (client->flags & CLIENT_WANTS_FLV) /* increase limit for flv clients as wrapping takes more space */ - limit = (unsigned long)(limit * 1.05); + limit = (unsigned long)(limit * 1.01); if (secs) rate = (client->counter+1400)/secs; // DEBUG3 ("counter %lld, duration %ld, limit %u", client->counter, secs, rate); thread_mutex_lock (&fh->lock); if (rate > limit || secs < 3) { - client->schedule_ms += 1000/(limit/1400); + if (limit >= 1400) + client->schedule_ms += 1000/(limit/1400); + else + client->schedule_ms += 50; // should not happen but guard against it rate_add (fh->format->out_bitrate, 0, worker->time_ms); if (secs > 2) { @@ -907,7 +934,10 @@ static int throttled_file_send (client_t *client) rate_add (fh->format->out_bitrate, bytes, worker->time_ms); thread_mutex_unlock (&fh->lock); global_add_bitrates (global.out_bitrate, bytes, worker->time_ms); - client->schedule_ms += (1000/(limit/1400*2)); + if (limit > 2800) + client->schedule_ms += (1000/(limit/1400*2)); + else + client->schedule_ms += 50; /* progessive slowdown if max bandwidth is exceeded. */ if (throttle_sends > 1) @@ -928,12 +958,23 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo) { if (finfo) { + mount_proxy *minfo; fh_node *fh; if (finfo->flags & FS_FALLBACK && finfo->limit == 0) return -1; fh = open_fh (finfo, client); if (fh == NULL) return -1; + minfo = config_find_mount (config_get_config(), finfo->mount); + if (minfo && minfo->max_listeners >= 0 && fh->refcount > minfo->max_listeners) + { + config_release_config(); + remove_from_fh (fh, client); + fh_release (fh); + client->shared_data = NULL; + return client_send_403redirect (client, finfo->mount, "max listeners reached"); + } + config_release_config(); client->shared_data = fh; if (fh->finfo.limit) @@ -957,9 +998,12 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo) client->ops = &buffer_content_ops; client->flags &= ~CLIENT_HAS_INTRO_CONTENT; + client->flags |= CLIENT_IN_FSERVE; if (client->flags & CLIENT_ACTIVE) { client->schedule_ms = client->worker->time_ms; + if (finfo && finfo->flags & FS_FALLBACK) + return 0; // prevent a recursive loop return client->ops->process (client); } else @@ -979,7 +1023,7 @@ int fserve_setup_client (client_t *client) } -void fserve_set_override (const char *mount, const char *dest) +int fserve_set_override (const char *mount, const char *dest) { fh_node fh, *result; @@ -987,15 +1031,23 @@ void fserve_set_override (const char *mount, const char *dest) fh.finfo.mount = (char *)mount; fh.finfo.fallback = NULL; - avl_tree_rlock (fh_cache); + avl_tree_wlock (fh_cache); if (avl_get_by_key (fh_cache, &fh, (void**)&result) == 0) { char *tmp = result->finfo.fallback; + thread_mutex_lock (&result->lock); + avl_delete (fh_cache, result, NULL); + avl_tree_unlock (fh_cache); + + result->finfo.flags |= FS_OVERRIDE; result->finfo.fallback = strdup (dest); + thread_mutex_unlock (&result->lock); free (tmp); INFO2 ("move clients from %s to %s", mount, dest); + return 1; } avl_tree_unlock (fh_cache); + return 0; } static int _delete_mapping(void *mapping) { @@ -1110,13 +1162,16 @@ int fserve_kill_client (client_t *client, const char *mount, int response) xmlDocSetRootElement(doc, node); snprintf (buf, sizeof(buf), "Client %d not found", id); + avl_tree_rlock (fh_cache); while (1) { - fh_node *fh = open_fh (&finfo, NULL); + avl_node *node; + fh_node *fh = find_fh (&finfo); if (fh) { - avl_node *node = avl_get_first (fh->clients); - + thread_mutex_lock (&fh->lock); + avl_tree_unlock (fh_cache); + node = avl_get_first (fh->clients); while (node) { client_t *listener = (client_t *)node->key; @@ -1130,61 +1185,46 @@ int fserve_kill_client (client_t *client, const char *mount, int response) } node = avl_get_next (node); } - fh_release (fh); + thread_mutex_unlock (&fh->lock); + avl_tree_rlock (fh_cache); } if (loop == 0) break; loop--; if (loop == 1) finfo.flags = FS_FALLBACK; } + avl_tree_unlock (fh_cache); xmlNewChild (node, NULL, XMLSTR("message"), XMLSTR(buf)); xmlNewChild (node, NULL, XMLSTR("return"), XMLSTR(v)); return admin_send_response (doc, client, response, "response.xsl"); } -int fserve_list_clients_xml (xmlNodePtr srcnode, fbinfo *finfo) +int fserve_list_clients_xml (xmlNodePtr parent, fbinfo *finfo) { int ret = 0; - fh_node *fh = open_fh (finfo, NULL); + fh_node *fh; + avl_node *anode; - if (fh) + avl_tree_rlock (fh_cache); + fh = find_fh (finfo); + if (fh == NULL) { - avl_node *anode = avl_get_first (fh->clients); - - while (anode) - { - client_t *listener = (client_t *)anode->key; - char buf [100]; - - xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL); - const char *useragent; - snprintf (buf, sizeof (buf), "%lu", listener->connection.id); - xmlSetProp (node, XMLSTR("id"), XMLSTR(buf)); - - xmlNewChild (node, NULL, XMLSTR("ip"), XMLSTR(listener->connection.ip)); - useragent = httpp_getvar (listener->parser, "user-agent"); - if (useragent) - { - xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(useragent)); - xmlNewChild (node, NULL, XMLSTR("useragent"), str); - xmlFree (str); - } - xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR( "0")); - snprintf (buf, sizeof (buf), "%lu", - (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time)); - xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf)); - if (listener->username) - { - xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(listener->username)); - xmlNewChild (node, NULL, XMLSTR("username"), str); - xmlFree (str); - } - - ret++; - anode = avl_get_next (anode); - } - fh_release (fh); + avl_tree_unlock (fh_cache); + return 0; } + thread_mutex_lock (&fh->lock); + avl_tree_unlock (fh_cache); + + anode = avl_get_first (fh->clients); + while (anode) + { + client_t *listener = (client_t *)anode->key; + + stats_listener_to_xml (listener, parent); + ret++; + anode = avl_get_next (anode); + } + thread_mutex_unlock (&fh->lock); return ret; } @@ -1208,17 +1248,33 @@ int fserve_list_clients (client_t *client, const char *mount, int response, int xmlSetProp(srcnode, XMLSTR("mount"), XMLSTR(mount)); ret = fserve_list_clients_xml (srcnode, &finfo); - if (ret == 0) + if (ret == 0 && finfo.flags&FS_FALLBACK) { - finfo.flags = 0; + finfo.flags = 0; // retry ret = fserve_list_clients_xml (srcnode, &finfo); } if (ret) { - char buf[20]; - snprintf (buf, sizeof(buf), "%u", ret); + char buf [20]; + snprintf (buf, sizeof(buf), "%d", ret); xmlNewChild (srcnode, NULL, XMLSTR("listeners"), XMLSTR(buf)); return admin_send_response (doc, client, response, "listclients.xsl"); } + xmlFree (doc); return client_send_400 (client, "mount does not exist"); } + + +int fserve_query_count (fbinfo *finfo) +{ + int ret = 0; + fh_node *fh; + + avl_tree_rlock (fh_cache); + fh = find_fh (finfo); + if (fh) + ret = fh->refcount; + avl_tree_unlock (fh_cache); + return ret; +} + diff --git a/src/fserve.h b/src/fserve.h index 0d1ac4f2..4fcbff8e 100644 --- a/src/fserve.h +++ b/src/fserve.h @@ -29,6 +29,7 @@ typedef struct _fbinfo #define FS_USE_ADMIN 01 #define FS_FALLBACK 02 #define FS_FALLBACK_EOF 04 +#define FS_OVERRIDE 010 void fserve_initialize(void); void fserve_shutdown(void); @@ -38,10 +39,12 @@ void fserve_recheck_mime_types (ice_config_t *config); int fserve_setup_client (client_t *client); int fserve_setup_client_fb (client_t *client, fbinfo *finfo); -void fserve_set_override (const char *mount, const char *dest); +int fserve_set_override (const char *mount, const char *dest); int fserve_list_clients (client_t *client, const char *mount, int response, int show_listeners); int fserve_list_clients_xml (xmlNodePtr srcnode, fbinfo *finfo); int fserve_kill_client (client_t *client, const char *mount, int response); +int fserve_query_count (fbinfo *finfo); + extern int fserve_running; diff --git a/src/global.c b/src/global.c index c034a001..f16a1587 100644 --- a/src/global.c +++ b/src/global.c @@ -55,11 +55,11 @@ void global_shutdown(void) thread_mutex_destroy(&_global_mutex); thread_spin_destroy (&global.spinlock); avl_tree_free(global.source_tree, NULL); + rate_free (global.out_bitrate); + global.out_bitrate = NULL; #ifdef MY_ALLOC avl_tree_free(global.alloc_tree, free_alloc_node); #endif - rate_free (global.out_bitrate); - global.out_bitrate = NULL; } void global_lock(void) diff --git a/src/global.h b/src/global.h index fb8efddd..edcf078b 100644 --- a/src/global.h +++ b/src/global.h @@ -13,7 +13,6 @@ #ifndef __GLOBAL_H__ #define __GLOBAL_H__ -#include "config.h" #define ICE_LISTEN_QUEUE 64 @@ -25,6 +24,7 @@ #include "thread/thread.h" #include "net/sock.h" #include "compat.h" +#include "avl/avl.h" typedef struct ice_global_tag { @@ -64,6 +64,11 @@ typedef struct ice_global_tag extern unsigned int throttle_sends; +extern void initialize_subsystems (void); +extern void shutdown_subsystems (void); +extern void server_process (void); +extern int server_init (int argc, char *argv[]); + #ifdef MY_ALLOC #define calloc(x,y) my_calloc(__func__,__LINE__,x,y) diff --git a/src/logging.c b/src/logging.c index f59cfd66..1093fd47 100644 --- a/src/logging.c +++ b/src/logging.c @@ -37,7 +37,7 @@ void fatal_error (const char *perr); int errorlog = 0; int playlistlog = 0; -#ifdef _WIN32 +#ifdef _MSC_VER /* Since strftime's %z option on win32 is different, we need to go through a few loops to get the same info as %z */ int get_clf_time (char *buffer, unsigned len, struct tm *t) @@ -107,12 +107,13 @@ int get_clf_time (char *buffer, unsigned len, struct tm *t) */ void logging_access_id (access_log *accesslog, client_t *client) { - char datebuf[128]; - char reqbuf[1024]; + const char *req; struct tm thetime; time_t now; time_t stayed; - const char *referrer, *user_agent, *username, *ip = "-"; + const char *referrer, *user_agent, *username = NULL, *ip = "-"; + char datebuf[50]; + char reqbuf[128]; if (client->flags & CLIENT_SKIP_ACCESSLOG) return; @@ -121,41 +122,56 @@ void logging_access_id (access_log *accesslog, client_t *client) localtime_r (&now, &thetime); /* build the data */ -#ifdef _WIN32 +#ifdef _MSC_VER memset(datebuf, '\000', sizeof(datebuf)); get_clf_time(datebuf, sizeof(datebuf)-1, &thetime); #else strftime (datebuf, sizeof(datebuf), LOGGING_FORMAT_CLF, &thetime); #endif + req = httpp_getvar (client->parser, HTTPP_VAR_RAWURI); + if (req == NULL) + req = httpp_getvar (client->parser, HTTPP_VAR_URI); /* build the request */ snprintf (reqbuf, sizeof(reqbuf), "%s %s %s/%s", - httpp_getvar (client->parser, HTTPP_VAR_REQ_TYPE), - httpp_getvar (client->parser, HTTPP_VAR_URI), + httpp_getvar (client->parser, HTTPP_VAR_REQ_TYPE), req, httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL), httpp_getvar (client->parser, HTTPP_VAR_VERSION)); stayed = now - client->connection.con_time; - - if (client->username == NULL) - username = "-"; - else - username = client->username; - + username = client->username; referrer = httpp_getvar (client->parser, "referer"); - if (referrer == NULL) - referrer = "-"; - user_agent = httpp_getvar (client->parser, "user-agent"); - if (user_agent == NULL) - user_agent = "-"; if (accesslog->log_ip) ip = client->connection.ip; - log_write_direct (accesslog->logid, - "%s - %s [%s] \"%s\" %d %" PRIu64 " \"%s\" \"%s\" %lu", - ip, username, - datebuf, reqbuf, client->respcode, client->connection.sent_bytes, - referrer, user_agent, (unsigned long)stayed); + + if (accesslog->type == LOG_ACCESS_CLF_ESC) + { + char *un = client->username ? util_url_escape (username) : strdup ("-"), + *rq = util_url_escape (reqbuf), + *rf = referrer ? util_url_escape (referrer) : strdup ("-"), + *ua = user_agent ? util_url_escape (user_agent) : strdup ("-"); + + log_write_direct (accesslog->logid, + "%s - %s %s %s %d %" PRIu64 " %.50s %.50s %lu", + ip, un, datebuf, rq, client->respcode, client->connection.sent_bytes, + rf, ua, (unsigned long)stayed); + free (ua); + free (rf); + free (rq); + free (un); + } + else + { + if (client->username == NULL) username = "-"; + if (referrer == NULL) referrer = "-"; + if (user_agent == NULL) user_agent = "-"; + + log_write_direct (accesslog->logid, + "%s - %s [%s] \"%s\" %d %" PRIu64 " \"%.50s\" \"%.50s\" %lu", + ip, username, datebuf, reqbuf, client->respcode, client->connection.sent_bytes, + referrer, user_agent, (unsigned long)stayed); + } client->respcode = -1; } @@ -183,7 +199,7 @@ void logging_playlist(const char *mount, const char *metadata, long listeners) localtime_r (&now, &thetime); /* build the data */ -#ifdef _WIN32 +#ifdef _MSC_VER memset(datebuf, '\000', sizeof(datebuf)); get_clf_time(datebuf, sizeof(datebuf)-1, &thetime); #else diff --git a/src/logging.h b/src/logging.h index e3abdae6..622ccdb6 100644 --- a/src/logging.h +++ b/src/logging.h @@ -85,7 +85,7 @@ extern int errorlog; #define LOGGING_FORMAT_CLF "%d/%b/%Y:%H:%M:%S %z" -void logging_access_id (access_log *accesslog, client_t *client); +void logging_access_id (struct access_log *accesslog, client_t *client); void logging_access(client_t *client); void logging_playlist(const char *mount, const char *metadata, long listeners); int restart_logging (ice_config_t *config); diff --git a/src/main.c b/src/main.c index 2c04c309..2339934f 100644 --- a/src/main.c +++ b/src/main.c @@ -20,9 +20,6 @@ #include #ifdef WIN32 -#define _WIN32_WINNT 0x0400 -/* For getpid() */ -//#include #include #include #endif @@ -93,7 +90,7 @@ static void _print_usage(void) } -void _initialize_subsystems(void) +void initialize_subsystems(void) { log_initialize(); errorlog = log_open_file (stderr); @@ -112,10 +109,10 @@ void _initialize_subsystems(void) #endif } -void _shutdown_subsystems(void) + +void shutdown_subsystems(void) { connection_shutdown(); - auth_shutdown(); slave_shutdown(); fserve_shutdown(); stats_shutdown(); @@ -212,9 +209,17 @@ static int _server_proc_init(void) return 1; } + /* this is the heart of the beast */ -static void _server_proc(void) +void server_process (void) { + INFO1 ("%s server started", ICECAST_VERSION_STRING); + + global.running = ICE_RUNNING; + + /* Do this after logging init */ + auth_initialise (); + if (background) { fclose (stdin); @@ -222,8 +227,8 @@ static void _server_proc(void) fclose (stderr); } slave_initialize(); - - connection_setup_sockets (NULL); + INFO0("Shutting down"); + auth_shutdown(); } /* chroot the process. Watch out - we need to do this before starting other @@ -258,7 +263,7 @@ static void _ch_root_uid_setup(void) } #endif -#ifdef CHROOT +#ifdef HAVE_CHROOT if (conf->chroot) { if(getuid()) /* root check */ @@ -303,115 +308,97 @@ static void _ch_root_uid_setup(void) #endif } -#ifdef WIN32_SERVICE -int mainService(int argc, char **argv) -#else -int main(int argc, char **argv) -#endif + +int server_init (int argc, char *argv[]) { - int res, ret; + int ret; char filename[512]; char pbuf[1024]; - /* parse the '-c icecast.xml' option - ** only, so that we can read a configfile - */ - res = _parse_config_opts(argc, argv, filename, 512); - if (res == 1) { -#if !defined(_WIN32) || defined(_CONSOLE) - /* startup all the modules */ - _initialize_subsystems(); -#endif - /* parse the config file */ - config_get_config(); - ret = config_initial_parse_file(filename); - config_release_config(); - if (ret < 0) { - snprintf(pbuf, sizeof(pbuf)-1, - "FATAL: error parsing config file (%s)", filename); - _fatal_error(pbuf); - switch (ret) { - case CONFIG_EINSANE: - _fatal_error("filename was null or blank"); - break; - case CONFIG_ENOROOT: - _fatal_error("no root element found"); - break; - case CONFIG_EBADROOT: - _fatal_error("root element is not "); - break; - default: - _fatal_error("XML config parsing error"); - break; - } -#if !defined(_WIN32) || defined(_CONSOLE) - _shutdown_subsystems(); -#endif + switch (_parse_config_opts (argc, argv, filename, 512)) + { + case -1: + _print_usage(); return -1; - } - } else if (res == -1) { - _print_usage(); - return -1; + default: + /* parse the config file */ + config_get_config(); + ret = config_initial_parse_file(filename); + config_release_config(); + if (ret < 0) + { + snprintf (pbuf, sizeof(pbuf), + "FATAL: error parsing config file (%s)", filename); + _fatal_error (pbuf); + switch (ret) + { + case CONFIG_EINSANE: + _fatal_error("filename was null or blank"); + break; + case CONFIG_ENOROOT: + _fatal_error("no root element found"); + break; + case CONFIG_EBADROOT: + _fatal_error("root element is not "); + break; + default: + _fatal_error("XML config parsing error"); + break; + } + return -1; + } } - + /* override config file options with commandline options */ config_parse_cmdline(argc, argv); /* Bind socket, before we change userid */ - if(!_server_proc_init()) { + if (_server_proc_init() == 0) + { _fatal_error("Server startup failed. Exiting"); - _shutdown_subsystems(); return -1; } - _ch_root_uid_setup(); /* Change user id and root if requested/possible */ fserve_initialize(); #ifdef CHUID /* We'll only have getuid() if we also have setuid(), it's reasonable to * assume */ - if(!getuid()) /* Running as root! Don't allow this */ + if (getuid() == 0) /* Running as root! Don't allow this */ { - fprintf(stderr, "ERROR: You should not run icecast2 as root\n"); - fprintf(stderr, "Use the changeowner directive in the config file\n"); - _shutdown_subsystems(); - return 1; + fprintf (stderr, "ERROR: You should not run icecast2 as root\n"); + fprintf (stderr, "Use the changeowner directive in the config file\n"); + return -1; } #endif - /* setup default signal handlers */ sighandler_initialize(); if (start_logging (config_get_config_unlocked()) < 0) { _fatal_error("FATAL: Could not start logging"); - _shutdown_subsystems(); return -1; } + return 0; +} - INFO1 ("%s server started", ICECAST_VERSION_STRING); - /* REM 3D Graphics */ +#ifndef _WIN32 +int main (int argc, char *argv[]) +{ + initialize_subsystems(); - /* let her rip */ - global.running = ICE_RUNNING; + if (server_init (argc, argv) == 0) + server_process(); - /* Do this after logging init */ - auth_initialise (); + shutdown_subsystems(); - _server_proc(); - - INFO0("Shutting down"); -#if !defined(_WIN32) || defined(_CONSOLE) - _shutdown_subsystems(); -#endif if (pidfile) { remove (pidfile); free (pidfile); } - return 0; } - +#endif diff --git a/src/md5.c b/src/md5.c index 43553388..e3279fee 100644 --- a/src/md5.c +++ b/src/md5.c @@ -42,6 +42,7 @@ #include #include #include +#include "global.h" static void MD5Transform(uint32_t buf[4], uint32_t const in[HASH_LEN]); diff --git a/src/mpeg.c b/src/mpeg.c index ec62fb41..c3eb3114 100644 --- a/src/mpeg.c +++ b/src/mpeg.c @@ -22,6 +22,7 @@ #include "compat.h" #include "mpeg.h" #include "format_mp3.h" +#include "global.h" #define CATMODULE "mpeg" #include "logging.h" @@ -70,8 +71,6 @@ static int handle_aac_frame (struct mpeg_sync *mp, unsigned char *p, int len) if (mp->frame_callback) if (mp->frame_callback (mp, s, raw_frame_len) < 0) return -1; - memcpy (mp->raw->data + mp->raw_offset, s, raw_frame_len); - mp->raw_offset += raw_frame_len; } return frame_len; } @@ -163,8 +162,6 @@ static int handle_mpeg_frame (struct mpeg_sync *mp, unsigned char *p, int remain if (mp->frame_callback) if (mp->frame_callback (mp, p, frame_len) < 0) return -1; - memcpy (mp->raw->data + mp->raw_offset, p, frame_len); - mp->raw_offset += frame_len; } return frame_len; } @@ -242,31 +239,31 @@ static int check_for_mp3 (struct mpeg_sync *mp, unsigned char *p, unsigned remai samplerate = get_mpegframe_samplerate (p); if (samplerate == 0) return -1; - while (checking) + do { - int frame_len = get_mpeg_frame_length (mp, fh); + int frame_len; + + if (remaining <= 4) + return 0; + if (fh [0] != 255 || fh [1] != p[1]) + return -1; + frame_len = get_mpeg_frame_length (mp, fh); if (frame_len <= 0 || frame_len > 3000) { //DEBUG2 ("checking frame %d, but len %d invalid", 5-checking, frame_len); return -1; } - if (frame_len+4 >= remaining) + if (frame_len > remaining) { //DEBUG3 ("checking frame %d, but need more data (%d,%d)", 5-checking, frame_len, remaining); return 0; } - if (samplerate != get_mpegframe_samplerate (fh+frame_len)) + if (samplerate != get_mpegframe_samplerate (fh)) return -1; - if (fh[frame_len] != 255 || fh[frame_len+1] != p[1]) - { - //DEBUG4 ("checking frame %d, but code is %x %x %x", 5-checking, fh[frame_len], fh[frame_len+1], fh[frame_len+2]); - return -1; - } //DEBUG4 ("frame %d checked, next header codes are %x %x %x", 5-checking, fh[frame_len], fh[frame_len+1], fh[frame_len+2]); remaining -= frame_len; fh += frame_len; - checking--; - } + } while (--checking); mp->samplerate = samplerate; if (((p[3] & 0xC0) >> 6) == 3) mp->channels = 1; @@ -375,20 +372,23 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset) mp->sample_count = 0; if (mp->surplus) { - int new_len = mp->surplus->len + new_block->len; - unsigned char *p = realloc (mp->surplus->data, new_len); + if (offset >= mp->surplus->len) + offset -= mp->surplus->len; + else + { + int new_len = mp->surplus->len + new_block->len; + unsigned char *p = realloc (mp->surplus->data, new_len); - memcpy (p+mp->surplus->len, new_block->data, new_block->len); - mp->surplus->data = new_block->data; - new_block->data = (void*)p; - new_block->len = new_len; + memcpy (p+mp->surplus->len, new_block->data, new_block->len); + mp->surplus->data = new_block->data; + new_block->data = (void*)p; + new_block->len = new_len; + } refbuf_release (mp->surplus); mp->surplus = NULL; } start = (unsigned char *)new_block->data + offset; remaining = new_block->len - offset; - if (mp->raw) - mp->raw_offset = 0; while (1) { end = (unsigned char*)new_block->data + new_block->len; @@ -398,7 +398,6 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset) break; if (!is_sync_byte (mp, start)) { - // need to resync int ret = find_align_sync (mp, start, remaining); if (ret == 0) break; // no sync in the rest, so dump it @@ -422,7 +421,7 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset) { if (remaining > 20000) return -1; - new_block->len = 0; + new_block->len = offset; return remaining; } } @@ -474,5 +473,6 @@ void mpeg_cleanup (mpeg_sync *mpsync) { refbuf_release (mpsync->surplus); refbuf_release (mpsync->raw); + mpsync->mount = NULL; } } diff --git a/src/sighandler.c b/src/sighandler.c index 083d0f5b..51e44a03 100644 --- a/src/sighandler.c +++ b/src/sighandler.c @@ -20,12 +20,12 @@ #include "avl/avl.h" #include "httpp/httpp.h" -#include "global.h" #include "connection.h" #include "refbuf.h" #include "client.h" #include "logging.h" #include "event.h" +#include "global.h" #define CATMODULE "sighandler" diff --git a/src/slave.c b/src/slave.c index 0373c2b6..756ad330 100644 --- a/src/slave.c +++ b/src/slave.c @@ -60,22 +60,14 @@ #include "format.h" #include "event.h" #include "yp.h" +#include "slave.h" #define CATMODULE "slave" -typedef struct _redirect_host -{ - struct _redirect_host *next; - time_t next_update; - char *server; - int port; -} redirect_host; - static void _slave_thread(void); static void redirector_add (const char *server, int port, int interval); static redirect_host *find_slave_host (const char *server, int port); -static void redirector_clearall (void); static int relay_startup (client_t *client); static int relay_read (client_t *client); static void relay_release (client_t *client); @@ -162,7 +154,6 @@ void slave_update_all_mounts (void) void slave_restart (void) { restart_connection_thread = 1; - redirector_clearall(); slave_update_all_mounts (); streamlist_check = 0; } @@ -370,6 +361,8 @@ static int open_relay_connection (client_t *client, relay_server *relay, relay_s else INFO3 ("connecting to %s:%d for %s", server, port, relay->localmount); + con->con_time = time (NULL); + relay->in_use = master; streamsock = sock_connect_wto_bind (server, port, bind, timeout); free (bind); if (connection_init (con, streamsock, server) < 0) @@ -446,6 +439,8 @@ static int open_relay_connection (client_t *client, relay_server *relay, relay_s if (parser) httpp_destroy (parser); connection_close (con); + con->con_time = time (NULL); + if (relay->in_use) relay->in_use->skip = 1; return -1; } @@ -462,6 +457,11 @@ int open_relay (relay_server *relay) { int ret; + if (master->skip) + { + INFO3 ("skipping %s:%d for %s", master->ip, master->port, relay->localmount); + continue; + } thread_mutex_unlock (&src->lock); ret = open_relay_connection (client, relay, master); thread_mutex_lock (&src->lock); @@ -531,6 +531,7 @@ static void *start_relay_stream (void *arg) src->yp_public = -1; } + relay->in_use = NULL; INFO2 ("listener count remaining on %s is %d", src->mount, src->listeners); src->flags &= ~SOURCE_PAUSE_LISTENERS; thread_mutex_unlock (&src->lock); @@ -968,6 +969,7 @@ static void slave_startup (void) update_settings = 0; update_all_mounts = 0; + redirector_setup (config); update_master_as_slave (config); stats_global (config); workers_adjust (config->workers_count); @@ -1058,7 +1060,7 @@ relay_server *slave_find_relay (relay_server *relays, const char *mount) /* drop all redirection details. */ -static void redirector_clearall (void) +void redirector_clearall (void) { thread_rwlock_wlock (&slaves_lock); while (redirectors) @@ -1073,6 +1075,21 @@ static void redirector_clearall (void) thread_rwlock_unlock (&slaves_lock); } + +void redirector_setup (ice_config_t *config) +{ + redirect_host *redir = config->redirect_hosts; + + thread_rwlock_wlock (&slaves_lock); + while (redir) + { + redirector_add (redir->server, redir->port, 0); + redir = redir->next; + } + thread_rwlock_unlock (&slaves_lock); +} + + /* Add new redirectors or update any existing ones */ void redirector_update (client_t *client) @@ -1096,7 +1113,15 @@ void redirector_update (client_t *client) redirect = find_slave_host (rserver, rport); if (redirect == NULL) { - redirector_add (rserver, rport, interval); + ice_config_t *config = config_get_config(); + unsigned int allowed = config->max_redirects; + + config_release_config(); + + if (global.redirect_count < allowed) + redirector_add (rserver, rport, interval); + else + INFO2 ("redirect to slave limit reached (%d, %d)", global.redirect_count, allowed); } else { @@ -1126,18 +1151,7 @@ static redirect_host *find_slave_host (const char *server, int port) static void redirector_add (const char *server, int port, int interval) { - ice_config_t *config = config_get_config(); - unsigned int allowed = config->max_redirects; - redirect_host *redirect; - - config_release_config(); - - if (global.redirect_count >= allowed) - { - INFO2 ("redirect to slave limit reached (%d, %d)", global.redirect_count, allowed); - return; - } - redirect = calloc (1, sizeof (redirect_host)); + redirect_host *redirect = calloc (1, sizeof (redirect_host)); if (redirect == NULL) abort(); redirect->server = strdup (server); @@ -1174,6 +1188,17 @@ static relay_server *get_relay_details (client_t *client) } +static void relay_reset (relay_server *relay) +{ + relay_server_master *server = relay->masters; + + source_clear_source (relay->source); + for (; server; server = server->next) + server->skip = 0; + INFO1 ("servers to be retried on %s", relay->localmount); +} + + static int relay_read (client_t *client) { relay_server *relay = get_relay_details (client); @@ -1185,30 +1210,31 @@ static int relay_read (client_t *client) if (relay->cleanup) relay->running = 0; if (relay->running == 0) source->flags &= ~SOURCE_RUNNING; - if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 2000000) + if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 1000000) source->flags &= ~SOURCE_RUNNING; return source_read (source); } if ((source->flags & SOURCE_TERMINATING) == 0) { + /* this section is for once through code */ int fallback = 1; if (client->connection.con_time) { - if (relay->running) + if (relay->running && relay->in_use) fallback = 0; - if (client->worker->current_time.tv_sec - client->connection.con_time < 3) + if (client->worker->current_time.tv_sec - client->connection.con_time < 60) { - /* force a delayed restart if stream cannot be maintained for 3 seconds, by - * which time any listeners in restart wait would of come back on */ - source->flags &= ~SOURCE_PAUSE_LISTENERS; - client->connection.con_time = 0; - fallback = 1; + /* force a server skip if a stream cannot be maintained for 1 min */ + WARN1 ("stream for %s died too quickly, skipping server for now", relay->localmount); + if (relay->in_use) relay->in_use->skip = 1; + } + else + relay_reset (relay); // spent some time on this so give other servers a chance + if (source->flags & SOURCE_TIMEOUT) + { + WARN1 ("stream for %s timed out, skipping server for now", relay->localmount); + if (relay->in_use) relay->in_use->skip = 1; } - global_lock(); - global.sources--; - stats_event_args (NULL, "sources", "%d", global.sources); - global_unlock(); - global_reduce_bitrate_sampling (global.out_bitrate); } /* don't pause listeners if relay shutting down */ if (relay->running == 0) @@ -1219,11 +1245,26 @@ static int relay_read (client_t *client) if (source->termination_count && source->termination_count <= source->listeners) { client->schedule_ms = client->worker->time_ms + 150; - DEBUG3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount); + if (client->worker->current_time.tv_sec - client->timer_start > 2) + { + client->schedule_ms += 400; + WARN3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount); + } + else + DEBUG3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount); thread_mutex_unlock (&source->lock); return 0; } DEBUG1 ("all listeners have now been checked on %s", relay->localmount); + if (client->connection.con_time) + { + global_lock(); + global.sources--; + stats_event_args (NULL, "sources", "%d", global.sources); + global_unlock(); + global_reduce_bitrate_sampling (global.out_bitrate); + } + client->timer_start = 0; client->parser = NULL; free (source->fallback.mount); source->fallback.mount = NULL; @@ -1241,6 +1282,7 @@ static int relay_read (client_t *client) return 0; /* listeners may be paused, recheck and let them leave this stream */ } INFO1 ("shutting down relay %s", relay->localmount); + stats_event_args (source->mount, "listeners", "%lu", source->listeners); thread_mutex_unlock (&source->lock); stats_event (relay->localmount, NULL, NULL); slave_update_all_mounts(); @@ -1249,11 +1291,11 @@ static int relay_read (client_t *client) do { if (relay->running) { - if (client->connection.con_time) + if (client->connection.con_time && relay->in_use) { INFO1 ("standing by to restart relay on %s", relay->localmount); if (relay->on_demand && source->listeners == 0) - source_clear_source (source); + relay_reset (relay); break; } else @@ -1268,8 +1310,11 @@ static int relay_read (client_t *client) client->schedule_ms = client->worker->time_ms + 3600000; } source->flags &= ~SOURCE_ON_DEMAND; - source_clear_source (source); + relay_reset (relay); } while (0); + stats_event_args (source->mount, "listeners", "%lu", source->listeners); + client->connection.con_time = 0; + source->stats = 0; stats_event (relay->localmount, NULL, NULL); slave_update_all_mounts(); @@ -1343,37 +1388,45 @@ static int relay_startup (client_t *client) if (relay->on_demand) { - source_t *src = relay->source; - int start_relay = src->listeners; // 0 or non-zero + source_t *source = relay->source; + int fallback_def = 0, start_relay = source->listeners; // 0 or non-zero + mount_proxy *mountinfo = config_find_mount (config_get_config(), source->mount); - src->flags |= SOURCE_ON_DEMAND; - if (worker->current_time.tv_sec % 10 == 0) + source->flags |= SOURCE_ON_DEMAND; + if (mountinfo && mountinfo->fallback_mount) { - mount_proxy *mountinfo = config_find_mount (config_get_config(), src->mount); - if (mountinfo && mountinfo->fallback_mount) + source_t *fallback; + + avl_tree_rlock (global.source_tree); + fallback = source_find_mount (mountinfo->fallback_mount); + fallback_def = 1; + if (fallback) { - source_t *fallback; - avl_tree_rlock (global.source_tree); - fallback = source_find_mount (mountinfo->fallback_mount); - if (fallback) - { - if (strcmp (fallback->mount, src->mount) != 0) - { - // if there are listeners not already being moved - if (fallback->listeners && fallback->fallback.mount == NULL) - start_relay = 1; - } - } + if (strcmp (fallback->mount, source->mount) != 0 && fallback->listeners) + start_relay = 1; avl_tree_unlock (global.source_tree); } - config_release_config(); + else + { + fbinfo finfo; + + avl_tree_unlock (global.source_tree); + finfo.flags = FS_FALLBACK; + finfo.mount = (char *)mountinfo->fallback_mount; + finfo.fallback = NULL; + + // need to check for listeners on the fallback + if (fserve_query_count (&finfo) > 0) + start_relay = 1; // force a start if there is fallback defined + } } + config_release_config(); if (start_relay == 0) { - client->schedule_ms = worker->time_ms + 60000; + client->schedule_ms = worker->time_ms + (fallback_def ? (relay->interval*1000) : 60000); return 0; } - INFO1 ("Detected listeners on relay %s", relay->localmount); + INFO1 ("starting on-demand relay %s", relay->localmount); } /* limit the number of relays starting up at the same time */ diff --git a/src/slave.h b/src/slave.h index a1309697..89493c8e 100644 --- a/src/slave.h +++ b/src/slave.h @@ -22,6 +22,8 @@ void slave_update_all_mounts (void); void slave_rebuild_mounts (void); relay_server *slave_find_relay (relay_server *relays, const char *mount); int redirect_client (const char *mountpoint, client_t *client); +void redirector_clearall (void); +void redirector_setup (ice_config_t *config); void redirector_update (struct _client_tag *client); relay_server *relay_free (relay_server *relay); diff --git a/src/source.c b/src/source.c index 6102ab55..48261f43 100644 --- a/src/source.c +++ b/src/source.c @@ -149,8 +149,10 @@ source_t *source_reserve (const char *mount) src->listener_send_trigger = 10000; src->format = calloc (1, sizeof(format_plugin_t)); src->clients = avl_tree_new (client_compare, NULL); + src->stats = stats_handle (mount); thread_mutex_create (&src->lock); + stats_release (src->stats); avl_insert (global.source_tree, src); @@ -361,22 +363,22 @@ static void update_source_stats (source_t *source) unsigned long kbytes_read = source->bytes_read_since_update/1024; source->format->sent_bytes += kbytes_sent*1024; - stats_event_args (source->mount, "outgoing_kbitrate", "%ld", + source->stats = stats_lock (source->stats, source->mount); + stats_set_args (source->stats, "outgoing_kbitrate", "%ld", (long)(8 * rate_avg (source->format->out_bitrate))/1024); - stats_event_args (source->mount, "incoming_bitrate", "%ld", (8 * incoming_rate)); - stats_event_args (source->mount, "total_bytes_read", - "%"PRIu64, source->format->read_bytes); - stats_event_args (source->mount, "total_bytes_sent", - "%"PRIu64, source->format->sent_bytes); - stats_event_args (source->mount, "total_mbytes_sent", + stats_set_args (source->stats, "incoming_bitrate", "%ld", (8 * incoming_rate)); + stats_set_args (source->stats, "total_bytes_read", "%"PRIu64, source->format->read_bytes); + stats_set_args (source->stats, "total_bytes_sent", "%"PRIu64, source->format->sent_bytes); + stats_set_args (source->stats, "total_mbytes_sent", "%"PRIu64, source->format->sent_bytes/(1024*1024)); - stats_event_args (source->mount, "queue_size", "%u", source->queue_size); + stats_set_args (source->stats, "queue_size", "%u", source->queue_size); if (source->client->connection.con_time) { worker_t *worker = source->client->worker; - stats_event_args (source->mount, "connected", "%"PRIu64, + stats_set_args (source->stats, "connected", "%"PRIu64, (uint64_t)(worker->current_time.tv_sec - source->client->connection.con_time)); } + stats_release (source->stats); stats_event_add (NULL, "stream_kbytes_sent", kbytes_sent); stats_event_add (NULL, "stream_kbytes_read", kbytes_read); @@ -443,17 +445,6 @@ int source_read (source_t *source) if (source_change_worker (source)) return 1; } - if (source->limit_rate) - { - if (source->limit_rate < (8 * source->incoming_rate)) - { - rate_add (source->format->in_bitrate, 0, current); - source->skip_duration += 300; - client->schedule_ms += 150; - thread_mutex_unlock (&source->lock); - return 0; - } - } fds = util_timed_wait_for_fd (client->connection.sock, 0); if (fds < 0) { @@ -473,13 +464,14 @@ int source_read (source_t *source) DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read, source->timeout, (long)current); WARN1 ("Disconnecting %s due to socket timeout", source->mount); - source->flags &= ~(SOURCE_RUNNING|SOURCE_PAUSE_LISTENERS); + source->flags &= ~SOURCE_RUNNING; + source->flags |= SOURCE_TIMEOUT; skip = 0; break; } - source->skip_duration = (int)(source->skip_duration * 1.6); - if (source->skip_duration > 700) - source->skip_duration = 700; + source->skip_duration = (int)(source->skip_duration * 1.3); + if (source->skip_duration > 400) + source->skip_duration = 400; break; } source->skip_duration = (long)(source->skip_duration * 0.9); @@ -589,7 +581,18 @@ static int source_client_read (client_t *client) INFO1 ("streaming duration expired on %s", source->mount); } if (source_running (source)) + { + if (source->limit_rate) + { + source->incoming_rate = (long)rate_avg (source->format->in_bitrate); + if (source->limit_rate < (8 * source->incoming_rate)) + { + rate_add (source->format->in_bitrate, 0, client->worker->current_time.tv_sec); + client->schedule_ms += 200; + } + } return source_read (source); + } else { if ((source->flags & SOURCE_TERMINATING) == 0) @@ -612,6 +615,7 @@ static int source_client_read (client_t *client) return 0; } INFO1 ("no more listeners on %s", source->mount); + stats_event_args (source->mount, "listeners", "%lu", source->listeners); client->connection.discon_time = 0; client->ops = &source_client_halt_ops; free (source->fallback.mount); @@ -705,10 +709,11 @@ static int locate_start_on_queue (source_t *source, client_t *client) } -static int http_source_intro (client_t *client) +static int http_source_introfile (client_t *client) { source_t *source = client->shared_data; + //DEBUG2 ("client intro_pos is %ld, sent bytes is %ld", client->intro_offset, client->connection.sent_bytes); if (format_file_read (client, source->format, source->intro_file) < 0) { if (source->stream_data_tail) @@ -726,6 +731,21 @@ static int http_source_intro (client_t *client) } +static int http_source_intro (client_t *client) +{ + /* we only need to send the intro if nothing else has been sent */ + if (client->connection.sent_bytes > 0) + { + client_set_queue (client, NULL); + client->check_buffer = source_queue_advance; + return source_queue_advance (client); + } + client->intro_offset = 0; + client->check_buffer = http_source_introfile; + return http_source_introfile (client); +} + + static int http_source_listener (client_t *client) { refbuf_t *refbuf = client->refbuf; @@ -818,11 +838,17 @@ static int wait_for_restart (client_t *client) { source_t *source = client->shared_data; - if (source_running (source) || (source->flags & SOURCE_PAUSE_LISTENERS) == 0 || client->connection.error) + if (client->worker->current_time.tv_sec - client->timer_start > 15) + client->connection.error = 1; // in here too long, drop client + + if (source_running (source) || client->connection.error || + (source->flags & SOURCE_PAUSE_LISTENERS) == 0 || + (source->flags & (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC))) { client->ops = &listener_client_ops; return 0; } + if (source->flags & SOURCE_LISTENERS_SYNC) client->schedule_ms = client->worker->time_ms + 100; else @@ -895,6 +921,7 @@ int listener_waiting_on_source (source_t *source, client_t *client) client->ops = &listener_pause_ops; client->flags |= CLIENT_HAS_MOVED; client->schedule_ms = client->worker->time_ms + 60; + client->timer_start = client->worker->current_time.tv_sec; return 0; } return -1; @@ -911,7 +938,7 @@ static int send_listener (source_t *source, client_t *client) { int bytes; int loop = 8; /* max number of iterations in one go */ - long total_written = 0; + long total_written = 0, limiter = source->listener_send_trigger; int ret = 0, lag; if (source->flags & SOURCE_LISTENERS_SYNC) @@ -941,6 +968,9 @@ static int send_listener (source_t *source, client_t *client) lag = source->client->queue_pos - client->queue_pos; + if (source->incoming_rate && lag < source->incoming_rate) + limiter = source->incoming_rate/2; + /* progessive slowdown if nearing max bandwidth. */ if (global.max_rate) { @@ -961,9 +991,6 @@ static int send_listener (source_t *source, client_t *client) client->schedule_ms += 150; } } - if (source->incoming_rate > 1 && lag < (source->incoming_rate<<1)) - loop = lag / (source->incoming_rate>>1); - while (1) { /* jump out if client connection has died */ @@ -974,7 +1001,7 @@ static int send_listener (source_t *source, client_t *client) } /* lets not send too much to one client in one go, but don't sleep for too long if more data can be sent */ - if (loop == 0 || total_written > source->listener_send_trigger) + if (loop == 0 || total_written > limiter) { client->schedule_ms = client->worker->time_ms + 15; break; @@ -1035,6 +1062,7 @@ void source_init (source_t *source) stats_event_flags (source->mount, "total_bytes_read", "0", STATS_COUNTERS); stats_event_flags (source->mount, "outgoing_kbitrate", "0", STATS_COUNTERS); stats_event_flags (source->mount, "incoming_bitrate", "0", STATS_COUNTERS); + stats_event_flags (source->mount, "queue_size", "0", STATS_COUNTERS); stats_event_flags (source->mount, "connected", "0", STATS_COUNTERS); stats_event_flags (source->mount, "source_ip", source->client->connection.ip, STATS_COUNTERS); @@ -1055,7 +1083,7 @@ void source_init (source_t *source) if (str) { _parse_audio_info (source, str); - stats_event (source->mount, "audio_info", str); + stats_event_flags (source->mount, "audio_info", str, STATS_GENERAL); } } source->format->in_bitrate = rate_setup (60, 1); @@ -1094,9 +1122,10 @@ void source_init (source_t *source) } -void source_set_override (const char *mount, const char *dest) +int source_set_override (const char *mount, const char *dest) { source_t *source; + int ret = 0; avl_tree_rlock (global.source_tree); source = source_find_mount (mount); @@ -1111,13 +1140,20 @@ void source_set_override (const char *mount, const char *dest) source->fallback.mount = strdup (dest); source->termination_count = source->listeners; source->flags |= SOURCE_LISTENERS_SYNC; + ret = 1; } thread_mutex_unlock (&source->lock); } + avl_tree_unlock (global.source_tree); + if (ret) + INFO2 ("moving from %s to %s", mount, dest); } else - fserve_set_override (mount, dest); - avl_tree_unlock (global.source_tree); + { + avl_tree_unlock (global.source_tree); + ret = fserve_set_override (mount, dest); + } + return ret; } @@ -1127,26 +1163,37 @@ void source_set_fallback (source_t *source, const char *dest_mount) client_t *client = source->client; time_t connected = client->worker->current_time.tv_sec - client->connection.con_time; - if (dest_mount == NULL) + if (dest_mount == NULL || source->listeners == 0) + { + INFO1 ("Skipping fallback on %s, no listeners", source->mount); return; - if (connected > 20) - bitrate = (int)(rate_avg (source->format->in_bitrate) * 1.02); + } + if (connected > 40) + bitrate = (int)rate_avg (source->format->in_bitrate); + //bitrate = (int)(rate_avg (source->format->in_bitrate) * 1.02); if (bitrate == 0 && source->limit_rate) bitrate = source->limit_rate; - source->fallback.flags = FS_FALLBACK; - source->fallback.mount = strdup (dest_mount); - source->fallback.limit = bitrate; - INFO4 ("fallback set on %s to %s(%d) with %d listeners", source->mount, dest_mount, - source->fallback.limit, source->listeners); + if (source->listeners) + { + source->fallback.flags = FS_FALLBACK; + source->fallback.mount = strdup (dest_mount); + source->fallback.limit = bitrate; + INFO4 ("fallback set on %s to %s(%d) with %d listeners", source->mount, dest_mount, + source->fallback.limit, source->listeners); + } + else + INFO4 ("Skipping fallback to %s on %s (bitrate %d, listeners %d)", dest_mount, + source->mount, bitrate, source->listeners); } + void source_shutdown (source_t *source, int with_fallback) { mount_proxy *mountinfo; INFO1("Source \"%s\" exiting", source->mount); - source->flags &= ~SOURCE_ON_DEMAND; + source->flags &= ~(SOURCE_ON_DEMAND|SOURCE_TIMEOUT); source->termination_count = source->listeners; mountinfo = config_find_mount (config_get_config(), source->mount); if (source->client->connection.con_time) @@ -1163,6 +1210,7 @@ void source_shutdown (source_t *source, int with_fallback) if (mountinfo && with_fallback && global.running == ICE_RUNNING) source_set_fallback (source, mountinfo->fallback_mount); config_release_config(); + source->client->timer_start = source->client->worker->current_time.tv_sec; source->flags |= (SOURCE_TERMINATING | SOURCE_LISTENERS_SYNC); } @@ -1186,13 +1234,13 @@ static void _parse_audio_info (source_t *source, const char *s) char name[100], value[200]; int n = sscanf (start, "%99[^=]=%199[^;\r\n]", name, value); - if (n == 2 && strncmp (name, "ice-", 4) == 0) + if (n == 2 && (strncmp (name, "ice-", 4) == 0 || strncmp (name, "bitrate=", 7) == 0)) { char *esc = util_url_unescape (value); if (esc) { util_dict_set (source->audio_info, name, esc); - stats_event (source->mount, name, esc); + stats_event_flags (source->mount, name, esc, STATS_COUNTERS); } free (esc); } @@ -1215,7 +1263,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) INFO2 ("Applying mount information for \"%s\" from \"%s\"", source->mount, mountinfo->mountname); - stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners); + stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners); /* if a setting is available in the mount details then use it, else * check the parser details. */ @@ -1246,7 +1294,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) } while (0); val = atoi (str); } - stats_event_args (source->mount, "public", "%d", val); + stats_set_args (source->stats, "public", "%d", val); if (source->yp_public != val) { DEBUG1 ("YP changed to %d", val); @@ -1259,7 +1307,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) /* stream name */ if (mountinfo && mountinfo->stream_name) - stats_event (source->mount, "server_name", mountinfo->stream_name); + stats_set (source->stats, "server_name", mountinfo->stream_name); else { do { @@ -1272,12 +1320,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) str = "Unspecified name"; } while (0); if (source->format) - stats_event_conv (source->mount, "server_name", str, source->format->charset); + stats_set_conv (source->stats, "server_name", str, source->format->charset); } /* stream description */ if (mountinfo && mountinfo->stream_description) - stats_event (source->mount, "server_description", mountinfo->stream_description); + stats_set (source->stats, "server_description", mountinfo->stream_description); else { do { @@ -1289,12 +1337,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) if (str) break; } while (0); if (str && source->format) - stats_event_conv (source->mount, "server_description", str, source->format->charset); + stats_set_conv (source->stats, "server_description", str, source->format->charset); } /* stream URL */ if (mountinfo && mountinfo->stream_url) - stats_event (source->mount, "server_url", mountinfo->stream_url); + stats_set (source->stats, "server_url", mountinfo->stream_url); else { do { @@ -1306,12 +1354,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) if (str) break; } while (0); if (str && source->format) - stats_event_conv (source->mount, "server_url", str, source->format->charset); + stats_set_conv (source->stats, "server_url", str, source->format->charset); } /* stream genre */ if (mountinfo && mountinfo->stream_genre) - stats_event (source->mount, "genre", mountinfo->stream_genre); + stats_set (source->stats, "genre", mountinfo->stream_genre); else { do { @@ -1324,12 +1372,15 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) str = "various"; } while (0); if (source->format) - stats_event_conv (source->mount, "genre", str, source->format->charset); + stats_set_conv (source->stats, "genre", str, source->format->charset); } /* stream bitrate */ if (mountinfo && mountinfo->bitrate) + { str = mountinfo->bitrate; + stats_set (source->stats, "bitrate", str); + } else { do { @@ -1339,23 +1390,24 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) if (str) break; str = httpp_getvar (parser, "x-audiocast-bitrate"); } while (0); + if (str) + stats_set (source->stats, "bitrate", str); } - stats_event (source->mount, "bitrate", str); /* handle MIME-type */ if (mountinfo && mountinfo->type) - stats_event (source->mount, "server_type", mountinfo->type); + stats_set (source->stats, "server_type", mountinfo->type); else if (source->format && source->format->contenttype) - stats_event (source->mount, "server_type", source->format->contenttype); + stats_set (source->stats, "server_type", source->format->contenttype); if (mountinfo && mountinfo->subtype) - stats_event (source->mount, "subtype", mountinfo->subtype); + stats_set (source->stats, "subtype", mountinfo->subtype); if (mountinfo && mountinfo->auth) - stats_event (source->mount, "authenticator", mountinfo->auth->type); + stats_set (source->stats, "authenticator", mountinfo->auth->type); else - stats_event (source->mount, "authenticator", NULL); + stats_set (source->stats, "authenticator", NULL); source->limit_rate = 0; if (mountinfo && mountinfo->limit_rate) @@ -1437,11 +1489,12 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy source->min_queue_size = config->min_queue_size; source->timeout = config->source_timeout; source->default_burst_size = config->burst_size; + source->stats = stats_handle (source->mount); len = strlen (config->hostname) + strlen(source->mount) + 16; listen_url = alloca (len); snprintf (listen_url, len, "http://%s:%d%s", config->hostname, config->port, source->mount); - stats_event_flags (source->mount, "listenurl", listen_url, STATS_COUNTERS); + stats_set_flags (source->stats, "listenurl", listen_url, STATS_COUNTERS); source_apply_mount (source, mountinfo); @@ -1450,11 +1503,11 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy if (source->flags & SOURCE_ON_DEMAND) { DEBUG0 ("on_demand set"); - stats_event (source->mount, "on_demand", "1"); - stats_event_args (source->mount, "listeners", "%ld", source->listeners); + stats_set (source->stats, "on_demand", "1"); + stats_set_args (source->stats, "listeners", "%ld", source->listeners); } else - stats_event (source->mount, "on_demand", NULL); + stats_set (source->stats, "on_demand", NULL); if (mountinfo) { @@ -1465,23 +1518,24 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy if (mountinfo->fallback_when_full) DEBUG1 ("fallback_when_full to %u", mountinfo->fallback_when_full); DEBUG1 ("max listeners to %d", mountinfo->max_listeners); - stats_event_args (source->mount, "max_listeners", "%d", mountinfo->max_listeners); - stats_event_flags (source->mount, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN); + stats_set_args (source->stats, "max_listeners", "%d", mountinfo->max_listeners); + stats_set_flags (source->stats, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN); if (mountinfo->hidden) { - stats_event_flags (source->mount, NULL, NULL, STATS_HIDDEN); + stats_set_flags (source->stats, NULL, NULL, STATS_HIDDEN); DEBUG0 ("hidden from public"); } else - stats_event_flags (source->mount, NULL, NULL, 0); + stats_set_flags (source->stats, NULL, NULL, 0); } else { DEBUG0 ("max listeners is not specified"); - stats_event (source->mount, "max_listeners", "unlimited"); - stats_event_flags (source->mount, "cluster_password", NULL, STATS_SLAVE); - stats_event_flags (source->mount, NULL, NULL, STATS_PUBLIC); + stats_set (source->stats, "max_listeners", "unlimited"); + stats_set_flags (source->stats, "cluster_password", NULL, STATS_SLAVE); + stats_set_flags (source->stats, NULL, NULL, STATS_PUBLIC); } + stats_release (source->stats); DEBUG1 ("public set to %d", source->yp_public); DEBUG1 ("queue size to %u", source->queue_size_limit); DEBUG1 ("min queue size to %u", source->min_queue_size); @@ -1506,8 +1560,9 @@ static int source_client_callback (client_t *client) agent = httpp_getvar (source->client->parser, "user-agent"); if (agent) - stats_event (source->mount, "user_agent", agent); + stats_event_flags (source->mount, "user_agent", agent, STATS_COUNTERS); stats_event_inc(NULL, "source_client_connections"); + client_set_queue (client, NULL); source_init (source); client->ops = &source_client_ops; @@ -1594,18 +1649,25 @@ void source_recheck_mounts (int update_all) } else if (update_all) { - stats_event_flags (mount->mountname, NULL, NULL, mount->hidden?STATS_HIDDEN:0); - stats_event_args (mount->mountname, "listenurl", "http://%s:%d%s", + long stats = stats_handle (mount->mountname); + stats_set_flags (stats, NULL, NULL, mount->hidden?STATS_HIDDEN:0); + stats_set_args (stats, "listenurl", "http://%s:%d%s", config->hostname, config->port, mount->mountname); - stats_event (mount->mountname, "listeners", "0"); + stats_set (stats, "listeners", "0"); if (mount->max_listeners < 0) - stats_event (mount->mountname, "max_listeners", "unlimited"); + stats_set (stats, "max_listeners", "unlimited"); else - stats_event_args (mount->mountname, "max_listeners", "%d", mount->max_listeners); + stats_set_args (stats, "max_listeners", "%d", mount->max_listeners); + stats_release (stats); } } else - stats_event (mount->mountname, NULL, NULL); + { + // only delete these stats if no client is maintaining them + source = source_find_mount_raw (mount->mountname); + if (source == NULL) + stats_event (mount->mountname, NULL, NULL); + } mount = mount->next; } @@ -1717,7 +1779,6 @@ static int source_listener_release (source_t *source, client_t *client) rate_reduce (source->format->out_bitrate, 1000); stats_event_dec (NULL, "listeners"); - stats_event_args (source->mount, "listeners", "%lu", source->listeners); /* change of listener numbers, so reduce scope of global sampling */ global_reduce_bitrate_sampling (global.out_bitrate); @@ -1740,7 +1801,7 @@ static int source_listener_release (source_t *source, client_t *client) int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client) { - int loop = 10, bitrate = 0, do_process = 0; + int loop = 10, rate = 0, do_process = 0; int within_limits; source_t *source; mount_proxy *minfo = mountinfo; @@ -1769,16 +1830,19 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl } avl_tree_unlock (global.source_tree); if (minfo && minfo->limit_rate) - bitrate = minfo->limit_rate; + rate = minfo->limit_rate; if (minfo == NULL || minfo->fallback_mount == NULL) { - if (bitrate) + if (rate == 0) + if (sscanf (mount, "%*[^[][%d]", &rate) == 1) + rate = rate * 1000; + if (rate) { fbinfo f; f.flags = FS_FALLBACK; f.mount = (char *)mount; f.fallback = NULL; - f.limit = bitrate/8; + f.limit = rate / 8; if (move_listener (client, &f) == 0) { /* source dead but fallback to file found */ @@ -1910,6 +1974,8 @@ void source_setup_listener (source_t *source, client_t *client) client->shared_data = source; client->queue_pos = 0; client->mount = source->mount; + client->flags &= ~CLIENT_IN_FSERVE; + client->timer_start = client->worker->current_time.tv_sec; client->check_buffer = http_source_listener; // add client to the source diff --git a/src/source.h b/src/source.h index 98d52acc..1f8399ba 100644 --- a/src/source.h +++ b/src/source.h @@ -70,6 +70,7 @@ typedef struct source_tag unsigned long bytes_sent_since_update; unsigned long bytes_read_since_update; int stats_interval; + long stats; time_t last_read; @@ -86,6 +87,7 @@ typedef struct source_tag #define SOURCE_PAUSE_LISTENERS (1<<3) #define SOURCE_TERMINATING (1<<4) #define SOURCE_LISTENERS_SYNC (1<<5) +#define SOURCE_TIMEOUT (1<<6) #define source_available(x) (((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) && ((x)->flags & SOURCE_LISTENERS_SYNC) == 0) #define source_running(x) ((x)->flags & SOURCE_RUNNING) @@ -109,7 +111,7 @@ void source_setup_listener (source_t *source, client_t *client); void source_init (source_t *source); void source_shutdown (source_t *source, int with_fallback); void source_set_fallback (source_t *source, const char *dest_mount); -void source_set_override (const char *mount, const char *dest); +int source_set_override (const char *mount, const char *dest); #define SOURCE_BLOCK_SYNC 01 #define SOURCE_BLOCK_RELEASE 02 diff --git a/src/stats.c b/src/stats.c index 6d007e82..53ced550 100644 --- a/src/stats.c +++ b/src/stats.c @@ -21,6 +21,7 @@ #include #include +#include #include #include "thread/thread.h" @@ -596,7 +597,12 @@ static void process_source_event (stats_event_t *event) } if (event->action == STATS_EVENT_REMOVE && event->name == NULL) { - avl_delete(_stats.source_tree, (void *)snode, _free_source_stats); + int fallback_stream = 0; + avl_tree_wlock (snode->stats_tree); + fallback_stream = _find_node (snode->stats_tree, "fallback") == NULL ? 1 : 0; + avl_tree_unlock (snode->stats_tree); + if (fallback_stream) + avl_delete(_stats.source_tree, (void *)snode, _free_source_stats); avl_tree_unlock (_stats.source_tree); return; } @@ -645,7 +651,7 @@ static int stats_listeners_send (client_t *client) if (refbuf == NULL) { - client->schedule_ms = client->worker->time_ms + 100; + client->schedule_ms = client->worker->time_ms + 60; break; } if (loop == 0 || total > 32768) @@ -667,14 +673,14 @@ static int stats_listeners_send (client_t *client) if (listener->content_len) WARN1 ("content length is %u", listener->content_len); listener->recent_block = NULL; - client->schedule_ms = client->worker->time_ms + 100; + client->schedule_ms = client->worker->time_ms + 50; break; } loop--; } else { - client->schedule_ms = client->worker->time_ms + 250; + client->schedule_ms = client->worker->time_ms + 200; break; /* short write, so stop for now */ } } @@ -712,7 +718,11 @@ static void stats_listener_send (int mask, const char *fmt, ...) while (listener) { - if (listener->mask & mask) + int admuser = listener->mask & STATS_HIDDEN, + hidden = mask & STATS_HIDDEN, + flags = mask & ~STATS_HIDDEN; + + if (admuser || (hidden == 0 && (flags & listener->mask))) _add_stats_to_stats_client (listener->client, fmt, ap); listener = listener->next; } @@ -987,7 +997,6 @@ static void stats_client_release (client_t *client) { event_listener_t *listener, **trail; - INFO0 ("removing stats client"); thread_mutex_lock (&_stats.listeners_lock); listener = _stats.event_listeners; trail = &_stats.event_listeners; @@ -1034,7 +1043,7 @@ void stats_add_listener (client_t *client, int mask) client_set_queue (client, NULL); client->refbuf = refbuf_new (100); snprintf (client->refbuf->data, 100, - "HTTP/1.0 200 OK\r\nCapability: streamlist\r\n\r\n"); + "HTTP/1.0 200 OK\r\nCapability: streamlist stats\r\n\r\n"); client->refbuf->len = strlen (client->refbuf->data); listener->content_len = client->refbuf->len; listener->recent_block = client->refbuf; @@ -1263,6 +1272,8 @@ long stats_handle (const char *mount) { stats_source_t *src_stats; + if (mount == NULL) + return 0; avl_tree_wlock (_stats.source_tree); src_stats = _find_source(_stats.source_tree, mount); if (src_stats == NULL) @@ -1284,11 +1295,14 @@ long stats_handle (const char *mount) } -void stats_lock (long handle) +long stats_lock (long handle, const char *mount) { stats_source_t *src_stats = (stats_source_t *)handle; - if (src_stats) + if (src_stats == NULL) + src_stats = (stats_source_t*)stats_handle (mount); + else avl_tree_wlock (src_stats->stats_tree); + return (long)src_stats; } @@ -1352,29 +1366,88 @@ void stats_set_flags (long handle, const char *name, const char *value, int flag } -void stats_set_conv(long handle, const char *name, const char *value, const char *charset) +static void stats_set_entity_decode (long handle, const char *name, const char *value) { - const char *metadata = value; - xmlBufferPtr conv = xmlBufferCreate (); - - if (charset && value) + xmlParserCtxtPtr parser = xmlNewParserCtxt(); + if (parser) { - xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset); - - if (handle) - { - xmlBufferPtr raw = xmlBufferCreate (); - xmlBufferAdd (raw, (const xmlChar *)value, strlen (value)); - if (xmlCharEncInFunc (handle, conv, raw) > 0) - metadata = (char *)xmlBufferContent (conv); - xmlBufferFree (raw); - xmlCharEncCloseFunc (handle); - } - else - WARN1 ("No charset found for \"%s\"", charset); + xmlChar *decoded = xmlStringDecodeEntities (parser, + (const xmlChar *) value, XML_SUBSTITUTE_BOTH, 0, 0, 0); + stats_set (handle, name, (void*)decoded); + xmlFreeParserCtxt (parser); + xmlFree (decoded); + return; + } + stats_set (handle, name, value); +} + + +void stats_set_conv (long handle, const char *name, const char *value, const char *charset) +{ + if (charset) + { + xmlCharEncodingHandlerPtr encoding = xmlFindCharEncodingHandler (charset); + + if (encoding) + { + xmlBufferPtr in = xmlBufferCreate (); + xmlBufferPtr conv = xmlBufferCreate (); + + xmlBufferCCat (in, value); + if (xmlCharEncInFunc (encoding, conv, in) > 0) + stats_set_entity_decode (handle, name, (void*)xmlBufferContent (conv)); + xmlBufferFree (in); + xmlBufferFree (conv); + xmlCharEncCloseFunc (encoding); + return; + } + WARN1 ("No charset found for \"%s\"", charset); + return; + } + stats_set_entity_decode (handle, name, value); +} + + +void stats_listener_to_xml (client_t *listener, xmlNodePtr parent) +{ + const char *useragent; + char buf[30]; + + xmlNodePtr node = xmlNewChild (parent, NULL, XMLSTR("listener"), NULL); + + snprintf (buf, sizeof (buf), "%lu", listener->connection.id); + xmlSetProp (node, XMLSTR("id"), XMLSTR(buf)); + + xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip)); + + useragent = httpp_getvar (listener->parser, "user-agent"); + if (useragent && xmlCheckUTF8((unsigned char *)useragent)) + { + xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(useragent)); + xmlNewChild (node, NULL, XMLSTR("UserAgent"), str); + xmlFree (str); + } + + if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE) + { + source_t *source = listener->shared_data; + snprintf (buf, sizeof (buf), "%"PRIu64, source->client->queue_pos - listener->queue_pos); + } + else + snprintf (buf, sizeof (buf), "0"); + xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf)); + + if (listener->worker) + { + snprintf (buf, sizeof (buf), "%lu", + (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time)); + xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf)); + } + if (listener->username) + { + xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(listener->username)); + xmlNewChild (node, NULL, XMLSTR("username"), str); + xmlFree (str); } - - stats_set (handle, name, metadata); - xmlBufferFree (conv); } diff --git a/src/stats.h b/src/stats.h index 6914d6b3..536c798c 100644 --- a/src/stats.h +++ b/src/stats.h @@ -58,12 +58,14 @@ xmlDocPtr stats_get_xml(int flags, const char *show_mount); char *stats_get_value(const char *source, const char *name); long stats_handle (const char *mount); -void stats_lock (long handle); +long stats_lock (long handle, const char *mount); void stats_release (long handle); void stats_set (long handle, const char *name, const char *value); void stats_set_args (long handle, const char *name, const char *format, ...); void stats_set_flags (long handle, const char *name, const char *value, int flags); void stats_set_conv (long handle, const char *name, const char *value, const char *charset); +void stats_listener_to_xml (client_t *listener, xmlNodePtr parent); + #endif /* __STATS_H__ */ diff --git a/src/util.c b/src/util.c index 2ac9b1ea..d02ff87c 100644 --- a/src/util.c +++ b/src/util.c @@ -17,6 +17,7 @@ #include #include #include +#include #ifndef _WIN32 #include @@ -40,6 +41,7 @@ #include "refbuf.h" #include "connection.h" #include "client.h" +#include "global.h" #define CATMODULE "util" @@ -309,42 +311,39 @@ char *util_url_escape (const char *src) return dst; } + +static int unescape_code (const char *src) +{ + if (hex (src[0]) == -1 || hex (src[1]) == -1) + return -1; + return (hex (src[0]) << 4) + hex (src[1]); +} + + char *util_url_unescape (const char *src) { int len = strlen(src); char *decoded; - int i; + int i, v; char *dst; - int done = 0; decoded = calloc(1, len + 1); dst = decoded; - for(i=0; i < len; i++) { - switch(src[i]) { - case '%': - if (i+2 >= len || hex(src[i+1]) == -1 || hex(src[i+2]) == -1) - { - /* no matching pattern so assume just the % */ - *dst++ = '%'; - } - else - { - *dst++ = hex(src[i+1]) * 16 + hex(src[i+2]); - i+= 2; - } - break; - case 0: - ERROR0("Fatal internal logic error in util_url_unescape()"); - free(decoded); - return NULL; - default: - *dst++ = src[i]; - break; + for(i=0; i < len; i++) + { + if (src[i] == '%' && i+2 < len) + { + v = unescape_code (src + i +1); + if (v >= 0 && isprint(v)) + { + *dst++ = (char)v; + i += 2; + continue; + } } - if(done) - break; + *dst++ = src[i]; } *dst = 0; /* null terminator */ @@ -352,6 +351,7 @@ char *util_url_unescape (const char *src) return decoded; } + /* Get an absolute path (from the webroot dir) from a URI. Return NULL if the * path contains 'disallowed' sequences like foo/../ (which could be used to * escape from the webroot) or if it cannot be URI-decoded. @@ -637,7 +637,7 @@ char *util_dict_urlencode(util_dict *dict, char delim) return res; } -#ifndef HAVE_LOCALTIME_R +#ifndef HAVE_DECL_LOCALTIME_R struct tm *localtime_r (const time_t *timep, struct tm *result) { static mutex_t localtime_lock; diff --git a/src/util.h b/src/util.h index a1da029c..df9cd456 100644 --- a/src/util.h +++ b/src/util.h @@ -52,7 +52,7 @@ int util_dict_set(util_dict *dict, const char *key, const char *val); const char *util_dict_get(util_dict *dict, const char *key); char *util_dict_urlencode(util_dict *dict, char delim); -#ifndef HAVE_LOCALTIME_R +#ifndef HAVE_DECL_LOCALTIME_R struct tm *localtime_r (const time_t *timep, struct tm *result); #endif char *util_conv_string (const char *string, const char *in_charset, const char *out_charset); diff --git a/src/xslt.c b/src/xslt.c index 840b3781..b19474d4 100644 --- a/src/xslt.c +++ b/src/xslt.c @@ -46,6 +46,7 @@ #include "client.h" #include "stats.h" #include "fserve.h" +#include "util.h" #define CATMODULE "xslt" @@ -280,9 +281,12 @@ int xslt_transform (xmlDocPtr doc, const char *xslfilename, client_t *client) for (i = 0; node && i < arg_count; node = avl_get_next (node)) { http_var_t *param = (http_var_t *)node->key; + char *tmp = util_url_escape (param->value); params[i++] = param->name; - params[i] = (char*)alloca (strlen (param->value) +3); - sprintf (params[i++], "\'%s\'", param->value); + // use alloca for now, should really url esc into a supplied buffer + params[i] = (char*)alloca (strlen (tmp) + 3); + sprintf (params[i++], "\'%s\'", tmp); + free (tmp); } params[i] = NULL; } diff --git a/win32/Makefile.am b/win32/Makefile.am index bee287e0..aa76d60a 100644 --- a/win32/Makefile.am +++ b/win32/Makefile.am @@ -14,3 +14,12 @@ EXTRA_DIST = ConfigTab.cpp ConfigTab.h Icecast2win.clw Icecast2win.cpp \ icecast2_console.dsw icecast2_console.dsp credits.bmp icecast2title.bmp \ icecastService.cpp icecastService.dsp +bin_PROGRAMS = icecast +icecast_DEPENDENCIES = ../src/libicecast.a ../src/net/libicenet.la \ + ../src/thread/libicethread.la ../src/httpp/libicehttpp.la ../src/log/libicelog.la \ + ../src/avl/libiceavl.la ../src/timing/libicetiming.la + +icecast_SOURCES = icecastService.cpp +icecast_CPPFLAGS = -I../src @XIPH_CFLAGS@ @XIPH_CPPFLAGS@ +icecast_LDADD = $(icecast_DEPENDENCIES) @XIPH_LDFLAGS@ @XIPH_LIBS@ @KATE_LIBS@ + diff --git a/win32/icecastService.cpp b/win32/icecastService.cpp index af52b752..0294ff32 100644 --- a/win32/icecastService.cpp +++ b/win32/icecastService.cpp @@ -1,7 +1,4 @@ -#define _WIN32_WINNT 0x0400 -#include #include -#include extern "C" { #include @@ -27,7 +24,6 @@ SERVICE_STATUS_HANDLE hStatus; void ServiceMain(int argc, char** argv); void ControlHandler(DWORD request); -extern "C" int mainService(int argc, char **argv); void installService (const char *path) @@ -112,10 +108,26 @@ void ControlHandler(DWORD request) SetServiceStatus (hStatus, &ServiceStatus); } + +static int run_server (int argc, char *argv[]) +{ + int ret; + + initialize_subsystems(); + + ret = server_init (argc, argv); + if (ret == 0) + server_process(); + + shutdown_subsystems(); + return ret; +} + + void ServiceMain(int argc, char** argv) { ServiceStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; - ServiceStatus.dwWin32ExitCode = 0; + ServiceStatus.dwWin32ExitCode = -1; ServiceStatus.dwServiceSpecificExitCode = 0; ServiceStatus.dwCheckPoint = 0; ServiceStatus.dwWaitHint = 0; @@ -123,7 +135,7 @@ void ServiceMain(int argc, char** argv) hStatus = RegisterServiceCtrlHandler(PACKAGE_STRING, (LPHANDLER_FUNCTION)ControlHandler); if (hStatus == (SERVICE_STATUS_HANDLE)0) { // Registering Control Handler failed - MessageBox (NULL, "RegisterServiceCtrlHandler failed", NULL, MB_SERVICE_NOTIFICATION); + MessageBox (NULL, "RegisterServiceCtrlHandler failed", NULL, MB_SERVICE_NOTIFICATION); return; } @@ -144,19 +156,21 @@ void ServiceMain(int argc, char** argv) else argv2 [2] = argv[1]; - ServiceStatus.dwWin32ExitCode = mainService(argc2, (char **)argv2); + ServiceStatus.dwWin32ExitCode = run_server (argc2, argv2); ServiceStatus.dwCurrentState = SERVICE_STOPPED; SetServiceStatus(hStatus, &ServiceStatus); } -void main(int argc, char *argv[]) +int main (int argc, char *argv[]) { if (argc < 2) { - printf ("Usage: icecastservice [remove] | [install ]\n"); - return; + printf (PACKAGE_STRING "\n\n" + "Usage: icecastService [remove] | [install ]\n" + " icecastService -c icecast.xml\n\n"); + return -1; } if (!strcmp(argv[1], "install")) { @@ -165,29 +179,33 @@ void main(int argc, char *argv[]) else printf ("install requires a path arg as well\n"); Sleep (1000); - return; + return 0; } if (!strcmp(argv[1], "remove") || !strcmp(argv[1], "uninstall")) { removeService(); - return; + return 0; } - if (_chdir(argv[1]) < 0) - { - char buffer [256]; - _snprintf (buffer, sizeof(buffer), "Unable to change to directory %s", argv[1]); - MessageBox (NULL, buffer, NULL, MB_SERVICE_NOTIFICATION); - return; - } + if (strcmp (argv[1], "-c") == 0) + return run_server (argc, argv); + + if (_chdir(argv[1]) < 0) + { + char buffer [256]; + snprintf (buffer, sizeof(buffer), "Unable to change to directory %s", argv[1]); + MessageBox (NULL, buffer, NULL, MB_SERVICE_NOTIFICATION); + return -1; + } SERVICE_TABLE_ENTRY ServiceTable[2]; - ServiceTable[0].lpServiceName = PACKAGE_STRING; + ServiceTable[0].lpServiceName = (char*)PACKAGE_STRING; ServiceTable[0].lpServiceProc = (LPSERVICE_MAIN_FUNCTION)ServiceMain; ServiceTable[1].lpServiceName = NULL; ServiceTable[1].lpServiceProc = NULL; // Start the control dispatcher thread for our service if (StartServiceCtrlDispatcher (ServiceTable) == 0) - MessageBox (NULL, "StartServiceCtrlDispatcher failed", NULL, MB_SERVICE_NOTIFICATION); + MessageBox (NULL, "StartServiceCtrlDispatcher failed", NULL, MB_SERVICE_NOTIFICATION); + return 0; }