diff --git a/Makefile.am b/Makefile.am index d05d81cd..648b7768 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,7 +3,10 @@ AUTOMAKE_OPTIONS = foreign ACLOCAL_AMFLAGS = -I m4 -SUBDIRS = src conf doc web admin win32 +SUBDIRS = conf doc web admin src +if WIN32 +SUBDIRS += win32 +endif EXTRA_DIST = HACKING config.h.vc6 m4/acx_pthread.m4 m4/ogg.m4 \ m4/theora.m4 m4/vorbis.m4 m4/speex.m4\ @@ -19,5 +22,7 @@ debug: profile: $(MAKE) all CFLAGS="@PROFILE@" +staticdebug: + $(MAKE) all CFLAGS="-g -DLIBXML_STATIC -DCURL_STATICLIB" LDFLAGS="${LDFLAGS} -all-static" static: - $(MAKE) all LDFLAGS="${LDFLAGS} -all-static" + $(MAKE) all CFLAGS="-O2 -DLIBXML_STATIC -DCURL_STATICLIB" LDFLAGS="${LDFLAGS} -all-static" diff --git a/NEWS b/NEWS index 1b50924b..d016e6bf 100644 --- a/NEWS +++ b/NEWS @@ -16,6 +16,40 @@ Feature differences from SVN trunk any extra tags are show in the conf/icecast.xml.dist file +2.3.2-kh31 +. Add generic scattered IO routines, listeners wanting FLV wrapping now use this which + saves a lot of memory copying and therefore load. +. build setup for mingw32. This should help with library updates for us, drop GUI build +. relay restarting fixes included for certain error cases. relays are treaated as + failed if they stop within 60 seconds and skip on to next server if specified. +. a better trigger for on-demand relay if fallback has listeners. +. on reload when changeowner used, allow for listening sockets to be reopend but + prevent closing priviledged ports unless missing from xml +. prevent unescape routine from creating non-printable chars +. ignore per-mount username when source is using shoutcast protocol. +. URL auth can now accept a "ice-username: ..." in the response headers for setting the + username. For setups that use query args for auth +. expand args applied to xsl requests, url encoded now. +. modify k and m in bitrates to be on 1000 not 1024 +. accesslog modification to parsing, optionally set in to CLF-ESC + to escape-encode strings instead of using "". +. if falling back to file, the bitrate used takes, in order of priority, incoming + bitrate, limit-rate and then [value] in filename eg /fallback-[128] +. fix a large ogg page bug, flac is probably the only one showing this up currently +. fix a couple of rare crash cases with file serving clients. +. prevent intro file sending when override triggered. +. make access log report query args on the request, but truncate the strings if too long. +. minor change to send limiter +. move incoming rate limiter to source client specific code. +. Add server-wide redirect tags. +. use per-source stats handles for updates, reduces lookups and avoids taking a + shared lock. +. minor log message changes. +. fix fallback trigger if source timeout occurs +. fix possible sources count inconsistency with failed relays +. entity expansion in stats conversion routines added +. fixup debug allocation code. not normally used. + 2.3.2-kh30 . stats updates. Split stats lock into global and source stats locks, reduces contention. diff --git a/admin/viewxml.xsl b/admin/viewxml.xsl new file mode 100644 index 00000000..8955283b --- /dev/null +++ b/admin/viewxml.xsl @@ -0,0 +1,75 @@ + + + + + + + + + + + NA + NA + + + + - + + NA + NA + NA + NA + NA + NA + + + + + + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + NA + + + + + + () + + NA + + NA + NA + + + + + + + + 1259797160 + Little Texas - She's Got Her Daddy's Money + + + + + + + \ No newline at end of file diff --git a/configure.in b/configure.in index df277a71..1b1d37f6 100644 --- a/configure.in +++ b/configure.in @@ -1,4 +1,4 @@ -AC_INIT([Icecast], [2.3.2-kh30], [karl@xiph.org]) +AC_INIT([Icecast], [2.3.2-kh31], [karl@xiph.org]) LT_INIT AC_PREREQ(2.59) @@ -9,6 +9,7 @@ AM_INIT_AUTOMAKE AM_CONFIG_HEADER(config.h) AM_MAINTAINER_MODE +AC_PROG_CXX AC_PROG_CC AC_CANONICAL_HOST AC_PROG_LIBTOOL @@ -25,6 +26,7 @@ else PROFILE="-pg -g" AC_DEFINE([_GNU_SOURCE], 1, [Define to include GNU extensions to POSIX]) fi +AM_CONDITIONAL([WIN32], [test x$host_os = xmingw32]) dnl Checks for programs. @@ -34,9 +36,8 @@ dnl Checks for header files. AC_HEADER_STDC AC_HEADER_TIME -AC_CHECK_HEADERS([signal.h fnmatch.h limits.h sys/timeb.h]) +AC_CHECK_HEADERS([signal.h fnmatch.h limits.h sys/timeb.h malloc.h]) AC_CHECK_HEADERS(pwd.h, AC_DEFINE(CHUID, 1, [Define if you have pwd.h]),,) -AC_CHECK_HEADERS(unistd.h, AC_DEFINE(CHROOT, 1, [Define if you have unistd.h]),,) dnl Checks for typedefs, structures, and compiler characteristics. XIPH_C__FUNC__ @@ -46,10 +47,19 @@ AC_TYPE_OFF_T AC_CHECK_TYPES([struct timespec]) dnl Checks for library functions. -AC_CHECK_FUNCS([localtime_r poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync]) +AC_CHECK_DECLS([localtime_r],,, + [#include +#include ]) +AC_CHECK_FUNCS([fnmatch chroot fork poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync]) AC_CHECK_TYPES([struct signalfd_siginfo], [AC_DEFINE(HAVE_SIGNALFD, 1 ,[Define if signalfd exists])], [], [#include ]) +if test "x$ac_cv_func_fnmatch" != "xyes" +then +AC_CHECK_LIB(fnmatch, fnmatch, [XIPH_VAR_APPEND([XIPH_LIBS],["-lfnmatch"])], + [ AC_CHECK_LIB(iberty, fnmatch, [XIPH_VAR_APPEND([XIPH_LIBS],["-liberty"])]) + ]) +fi AC_SEARCH_LIBS(nanosleep, rt posix4, AC_DEFINE(HAVE_NANOSLEEP, 1, [Define if you have nanosleep])) AC_SEARCH_LIBS(clock_gettime, rt posix4, @@ -140,8 +150,10 @@ XIPH_PATH_OPENSSL([ [ AC_MSG_NOTICE([SSL disabled!]) ]) -ICECAST_OPTIONAL="$ICECAST_OPTIONAL auth_cmd.o" -AC_DEFINE([MIMETYPESFILE],"/etc/mime.types", [Default location of mime types file]) +if test "x$ac_cv_func_fork" = "xyes" +then + ICECAST_OPTIONAL="$ICECAST_OPTIONAL auth_cmd.o" +fi AC_DEFINE([ICECAST_TIME_FMT],["%a, %d %b %Y %H:%M:%S %z"], [time format for strftime]) dnl Make substitutions diff --git a/src/Makefile.am b/src/Makefile.am index 5c235125..5f3979c5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -4,7 +4,11 @@ AUTOMAKE_OPTIONS = foreign SUBDIRS = avl thread httpp net log timing -bin_PROGRAMS = icecast +if WIN32 +noinst_LIBRARIES = libicecast.a +else +bin_PROGRAMS = icecast +endif noinst_HEADERS = admin.h cfgfile.h logging.h sighandler.h connection.h \ global.h util.h slave.h source.h stats.h refbuf.h client.h \ @@ -27,6 +31,10 @@ icecast_DEPENDENCIES = @ICECAST_OPTIONAL@ net/libicenet.la thread/libicethread.l httpp/libicehttpp.la log/libicelog.la avl/libiceavl.la timing/libicetiming.la icecast_LDADD = $(icecast_DEPENDENCIES) @XIPH_LIBS@ @KATE_LIBS@ +libicecast_a_SOURCES = $(icecast_SOURCES) +libicecast_a_DEPENDENCIES = $(icecast_DEPENDENCIES) +libicecast_a_LIBADD = $(icecast_DEPENDENCIES) + AM_CFLAGS = @XIPH_CFLAGS@ AM_CPPFLAGS = @XIPH_CPPFLAGS@ AM_LDFLAGS = @XIPH_LDFLAGS@ @KATE_LIBS@ diff --git a/src/admin.c b/src/admin.c index 97190e90..ec5835d4 100644 --- a/src/admin.c +++ b/src/admin.c @@ -61,6 +61,9 @@ static int command_updatemetadata(client_t *client, source_t *source, int respon static int command_admin_function (client_t *client, int response); static int command_list_log (client_t *client, int response); static int command_manage_relay (client_t *client, int response); +#ifdef MY_ALLOC +static int command_alloc(client_t *client); +#endif static int admin_handle_general_request(client_t *client, const char *command); @@ -84,6 +87,9 @@ static struct admin_command admin_general[] = { "manageauth", RAW, { command_manageauth } }, { "listmounts", RAW, { command_list_mounts } }, { "function", RAW, { command_admin_function } }, +#ifdef MY_ALLOC + { "alloc", RAW, { command_alloc } }, +#endif { "streamlist.txt", TEXT, { command_list_mounts } }, { "streams", TEXT, { command_list_mounts } }, { "showlog.txt", TEXT, { command_list_log } }, @@ -592,50 +598,6 @@ static int command_manage_relay (client_t *client, int response) } -static void add_listener_node (xmlNodePtr srcnode, client_t *listener) -{ - const char *useragent; - char buf[30]; - - xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL); - - snprintf (buf, sizeof (buf), "%lu", listener->connection.id); - xmlSetProp (node, XMLSTR("id"), XMLSTR(buf)); - - xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip)); - - useragent = httpp_getvar (listener->parser, "user-agent"); - if (useragent && xmlCheckUTF8((unsigned char *)useragent)) - { - xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(useragent)); - xmlNewChild (node, NULL, XMLSTR("UserAgent"), str); - xmlFree (str); - } - - if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE) - { - source_t *source = listener->shared_data; - snprintf (buf, sizeof (buf), "%"PRIu64, source->client->queue_pos - listener->queue_pos); - } - else - snprintf (buf, sizeof (buf), "0"); - xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf)); - - if (listener->worker) - { - snprintf (buf, sizeof (buf), "%lu", - (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time)); - xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf)); - } - if (listener->username) - { - xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(listener->username)); - xmlNewChild (node, NULL, XMLSTR("username"), str); - xmlFree (str); - } -} - - /* populate within srcnode, groups of 0 or more listener tags detailing * information about each listener connected on the provide source. */ @@ -649,7 +611,7 @@ void admin_source_listeners (source_t *source, xmlNodePtr srcnode) while (node) { client_t *listener = (client_t *)node->key; - add_listener_node (srcnode, listener); + stats_listener_to_xml (listener, srcnode); node = avl_get_next (node); } } @@ -724,7 +686,7 @@ static int command_show_listeners (client_t *client, source_t *source, int respo client_t *listener = source_find_client (source, id); if (listener) - add_listener_node (srcnode, listener); + stats_listener_to_xml (listener, srcnode); } thread_mutex_unlock (&source->lock); @@ -1242,3 +1204,33 @@ static int command_updatemetadata(client_t *client, source_t *source, int respon return admin_send_response (doc, client, response, "updatemetadata.xsl"); } + +#ifdef MY_ALLOC +static int command_alloc(client_t *client) +{ + xmlDocPtr doc = xmlNewDoc (XMLSTR("1.0")); + xmlNodePtr rootnode = xmlNewDocNode(doc, NULL, XMLSTR("icestats"), NULL); + avl_node *node; + + xmlDocSetRootElement(doc, rootnode); + avl_tree_rlock (global.alloc_tree); + node = avl_get_first (global.alloc_tree); + while (node) + { + alloc_node *an = node->key; + char value[25]; + xmlNodePtr bnode = xmlNewChild (rootnode, NULL, XMLSTR("block"), NULL); + xmlSetProp (bnode, XMLSTR("name"), XMLSTR(an->name)); + snprintf (value, sizeof value, "%d", an->count); + xmlNewChild (bnode, NULL, XMLSTR("count"), XMLSTR(value)); + snprintf (value, sizeof value, "%d", an->allocated); + xmlNewChild (bnode, NULL, XMLSTR("allocated"), XMLSTR(value)); + + node = avl_get_next (node); + } + avl_tree_unlock (global.alloc_tree); + + return admin_send_response (doc, client, RAW, "stats.xsl"); +} +#endif + diff --git a/src/auth.c b/src/auth.c index cc2ab262..7573ca66 100644 --- a/src/auth.c +++ b/src/auth.c @@ -22,6 +22,9 @@ #include #include #include +#ifdef HAVE_MALLOC_H +#include +#endif #include "auth.h" #include "auth_htpasswd.h" @@ -382,34 +385,52 @@ static void *auth_run_thread (void *arg) int move_listener (client_t *client, struct _fbinfo *finfo) { - source_t *source, *prev; + source_t *source; + const char *mount = finfo->mount; + int rate = finfo->limit; + mount_proxy *minfo; + ice_config_t *config = config_get_config(); - DEBUG1 ("moving listener to %s", finfo->mount); avl_tree_rlock (global.source_tree); - source = source_find_mount (finfo->mount); - while (source) + source = source_find_mount_raw (mount); + while (1) { + minfo = config_find_mount (config, mount); + if (rate == 0 && minfo && minfo->limit_rate) + rate = minfo->limit_rate; + if (source == NULL) + break; thread_mutex_lock (&source->lock); if (source_available (source)) { + config_release_config(); avl_tree_unlock (global.source_tree); source_setup_listener (source, client); client->flags |= CLIENT_HAS_MOVED; thread_mutex_unlock (&source->lock); return 0; } - prev = source; - if (prev->fallback.mount) - source = source_find_mount (prev->fallback.mount); - else - source = NULL; - thread_mutex_unlock (&prev->lock); + mount = minfo ? minfo->fallback_mount : NULL; + thread_mutex_unlock (&source->lock); } + config_release_config(); avl_tree_unlock (global.source_tree); if (client->flags & CLIENT_IS_SLAVE) return -1; - DEBUG1 ("no source, trying %s as a file", finfo->mount); + if (finfo->flags & FS_OVERRIDE) + { + finfo->mount = finfo->fallback; + finfo->fallback = NULL; + finfo->flags &= ~FS_OVERRIDE; + } + if (finfo->limit == 0) + { + if (rate == 0) + if (sscanf (finfo->mount, "%*[^[][%d]", &rate) == 1) + rate = rate * 1000/8; + finfo->limit = rate; + } return fserve_setup_client_fb (client, finfo); } @@ -836,7 +857,7 @@ int auth_check_source (client_t *client, const char *mount) ret = -1; if (mountinfo->password) pass = mountinfo->password; - if (mountinfo->username) + if (mountinfo->username && client->server_conn->shoutcast_compat == 0) user = mountinfo->username; } if (connection_check_pass (client->parser, user, pass) > 0) @@ -858,6 +879,8 @@ void auth_initialise (void) void auth_shutdown (void) { + if (allow_auth == 0) + return; allow_auth = 0; thread_rwlock_wlock (&auth_lock); thread_rwlock_unlock (&auth_lock); diff --git a/src/auth.h b/src/auth.h index 781768f1..a5706f54 100644 --- a/src/auth.h +++ b/src/auth.h @@ -13,9 +13,6 @@ #ifndef __AUTH_H__ #define __AUTH_H__ -#ifdef HAVE_CONFIG_H -#include -#endif struct source_tag; struct auth_tag; diff --git a/src/auth_cmd.c b/src/auth_cmd.c index 4e4c74ad..4e0c301e 100644 --- a/src/auth_cmd.c +++ b/src/auth_cmd.c @@ -44,6 +44,7 @@ #include "client.h" #include "cfgfile.h" #include "httpp/httpp.h" +#include "global.h" #include "logging.h" #define CATMODULE "auth_cmd" diff --git a/src/auth_cmd.h b/src/auth_cmd.h index 5a1f9e88..40eefd4d 100644 --- a/src/auth_cmd.h +++ b/src/auth_cmd.h @@ -13,10 +13,6 @@ #ifndef __AUTH_CMD_H__ #define __AUTH_CMD_H__ -#ifdef HAVE_CONFIG_H -#include -#endif - int auth_get_cmd_auth (auth_t *, config_options_t *options); #endif diff --git a/src/auth_htpasswd.c b/src/auth_htpasswd.c index 287bb6d7..ab30e484 100644 --- a/src/auth_htpasswd.c +++ b/src/auth_htpasswd.c @@ -32,6 +32,7 @@ #include "cfgfile.h" #include "httpp/httpp.h" #include "md5.h" +#include "global.h" #include "logging.h" #define CATMODULE "auth_htpasswd" diff --git a/src/auth_htpasswd.h b/src/auth_htpasswd.h index 791251b4..fc40025d 100644 --- a/src/auth_htpasswd.h +++ b/src/auth_htpasswd.h @@ -13,9 +13,6 @@ #ifndef __AUTH_HTPASSWD_H__ #define __AUTH_HTPASSWD_H__ -#ifdef HAVE_CONFIG_H -#include -#endif int auth_get_htpasswd_auth (auth_t *auth, config_options_t *options); diff --git a/src/auth_url.c b/src/auth_url.c index 8a848cd8..52b62787 100644 --- a/src/auth_url.c +++ b/src/auth_url.c @@ -86,6 +86,7 @@ #include "cfgfile.h" #include "httpp/httpp.h" #include "mpeg.h" +#include "global.h" #include "logging.h" #define CATMODULE "auth_url" @@ -218,6 +219,17 @@ static int handle_returned_header (void *ptr, size_t size, size_t nmemb, void *s if (eol) *eol = '\0'; } + if (strncasecmp (ptr, "ice-username: ", 14) == 0) + { + int len = strcspn ((char*)ptr+14, "\r\n"); + char *name = malloc (len+1); + if (name) + { + snprintf (name, len+1, "%s", (char *)ptr+14); + free (client->username); + client->username = name; + } + } if (strncasecmp (ptr, "Location: ", 10) == 0) { int len = strcspn ((char*)ptr+10, "\r\n"); diff --git a/src/auth_url.h b/src/auth_url.h index 4ddec631..d9a6264e 100644 --- a/src/auth_url.h +++ b/src/auth_url.h @@ -13,9 +13,6 @@ #ifndef __AUTH_URL_H__ #define __AUTH_URL_H__ -#ifdef HAVE_CONFIG_H -#include -#endif int auth_get_url_auth (auth_t *authenticator, config_options_t *options); diff --git a/src/cfgfile.c b/src/cfgfile.c index dd8c23a0..0ca0de9e 100644 --- a/src/cfgfile.c +++ b/src/cfgfile.c @@ -62,11 +62,13 @@ #define CONFIG_DEFAULT_LOG_DIR "/usr/local/icecast/logs" #define CONFIG_DEFAULT_WEBROOT_DIR "/usr/local/icecast/webroot" #define CONFIG_DEFAULT_ADMINROOT_DIR "/usr/local/icecast/admin" +#define MIMETYPESFILE "/etc/mime.types" #else #define CONFIG_DEFAULT_BASE_DIR ".\\" #define CONFIG_DEFAULT_LOG_DIR ".\\logs" #define CONFIG_DEFAULT_WEBROOT_DIR ".\\webroot" #define CONFIG_DEFAULT_ADMINROOT_DIR ".\\admin" +#define MIMETYPESFILE ".\\mime.types" #endif static ice_config_t _current_configuration; @@ -157,9 +159,9 @@ int config_get_bitrate (xmlNodePtr node, void *x) return 1; sscanf ((char*)str, "%"SCNd64 "%c", p, &metric); if (metric == 'k' || metric == 'K') - (*p) *= 1024; + (*p) *= 1000; if (metric == 'm' || metric == 'M') - (*p) *= 1024*1024; + (*p) *= 10000000; xmlFree (str); } return 0; @@ -228,6 +230,19 @@ void config_init_configuration(ice_config_t *configuration) } +redirect_host *config_clear_redirect (redirect_host *redir) +{ + redirect_host *next = NULL; + if (redir) + { + next = redir->next; + xmlFree (redir->server); + free (redir); + } + return next; +} + + relay_server *config_clear_relay (relay_server *relay) { relay_server *next = relay->next; @@ -353,6 +368,9 @@ void config_clear(ice_config_t *c) while (c->relay) c->relay = config_clear_relay (c->relay); + while (c->redirect_hosts) + c->redirect_hosts = config_clear_redirect (c->redirect_hosts); + while (c->mounts) { mount_proxy *to_go = c->mounts; @@ -602,11 +620,13 @@ static int _parse_security (xmlNodePtr node, void *arg) static int _parse_accesslog (xmlNodePtr node, void *arg) { - access_log *log = arg; + struct access_log *log = arg; + char *type = NULL; struct cfg_tag icecast_tags[] = { { "name", config_get_str, &log->name }, { "ip", config_get_bool, &log->log_ip }, + { "type", config_get_str, &type }, { "archive", config_get_bool, &log->archive }, { "exclude_ext", config_get_str, &log->exclude_ext }, { "display", config_get_int, &log->display }, @@ -615,9 +635,16 @@ static int _parse_accesslog (xmlNodePtr node, void *arg) }; log->logid = -1; - return parse_xml_tags (node, icecast_tags); + log->type = LOG_ACCESS_CLF; + if (parse_xml_tags (node, icecast_tags)) + return 2; + if (type && strcmp (type, "CLF-ESC") == 0) + log->type = LOG_ACCESS_CLF_ESC; + xmlFree (type); + return 0; } + static int _parse_errorlog (xmlNodePtr node, void *arg) { error_log *log = arg; @@ -659,7 +686,6 @@ static int _parse_logging (xmlNodePtr node, void *arg) struct cfg_tag icecast_tags[] = { { "accesslog", _parse_accesslog, &config->access_log }, - { "errorlog", _parse_errorlog, &config->error_log }, { "playlistlog", _parse_playlistlog, &config->playlist_log }, { "accesslog", config_get_str, &config->access_log.name }, { "accesslog_ip", config_get_bool, &config->access_log.log_ip }, @@ -667,7 +693,8 @@ static int _parse_logging (xmlNodePtr node, void *arg) config_get_str, &config->access_log.exclude_ext }, { "accesslog_lines", config_get_int, &config->access_log.display }, - { "errorlog", config_get_str, &config->error_log }, + { "errorlog", _parse_errorlog, &config->error_log }, + { "errorlog", config_get_str, &config->error_log.name }, { "errorlog_lines", config_get_int, &config->error_log.display }, { "loglevel", config_get_int, &config->error_log.level }, { "playlistlog", config_get_str, &config->playlist_log }, @@ -678,6 +705,7 @@ static int _parse_logging (xmlNodePtr node, void *arg) { NULL, NULL, NULL } }; + config->access_log.type = LOG_ACCESS_CLF; config->access_log.logid = -1; config->access_log.display = 100; config->access_log.archive = -1; @@ -830,8 +858,10 @@ static int _parse_mount (xmlNodePtr node, void *arg) mount->url_ogg_meta = 1; mount->source_timeout = config->source_timeout; mount->file_seekable = 1; + mount->access_log.type = LOG_ACCESS_CLF; mount->access_log.logid = -1; mount->access_log.log_ip = 1; + mount->fallback_override = 1; if (parse_xml_tags (node, icecast_tags)) return -1; @@ -963,6 +993,36 @@ static int _parse_relay (xmlNodePtr node, void *arg) return 0; } + +static int _parse_redirect (xmlNodePtr node, void *arg) +{ + ice_config_t *config = arg; + redirect_host *redir = calloc (1, sizeof (*redir)); + + struct cfg_tag icecast_tags[] = + { + { "host", config_get_str, &redir->server }, + { "port", config_get_int, &redir->port }, + { NULL, NULL, NULL }, + }; + + do + { + redir->port = 8000; + if (parse_xml_tags (node, icecast_tags)) + break; + + if (redir->server == NULL || redir->port < 1 || redir->port > 65536) + break; + redir->next = config->redirect_hosts; + config->redirect_hosts = redir; + return 0; + } while (0); + free (redir); + return -1; +} + + static int _parse_limits (xmlNodePtr node, void *arg) { ice_config_t *config = arg; @@ -1095,6 +1155,7 @@ static int _parse_root (xmlNodePtr node, ice_config_t *config) { "master-ssl-port", config_get_int, &config->master_ssl_port }, { "master-redirect", config_get_bool, &config->master_redirect }, { "max-redirect-slaves",config_get_int, &config->max_redirects }, + { "redirect", _parse_redirect, config }, { "shoutcast-mount", config_get_str, &config->shoutcast_mount }, { "listen-socket", _parse_listen_sock, config }, { "limits", _parse_limits, config }, diff --git a/src/cfgfile.h b/src/cfgfile.h index 859c32ac..7ab687cd 100644 --- a/src/cfgfile.h +++ b/src/cfgfile.h @@ -30,18 +30,32 @@ typedef struct _listener_t listener_t; #include "auth.h" #include "compat.h" -typedef struct + +typedef struct _redirect_host +{ + struct _redirect_host *next; + time_t next_update; + char *server; + int port; +} redirect_host; + + +typedef struct access_log { char *name; int logid; int log_ip; + int type; int archive; int display; int size; char *exclude_ext; } access_log; -typedef struct +#define LOG_ACCESS_CLF 0 +#define LOG_ACCESS_CLF_ESC 1 + +typedef struct error_log { char *name; int logid; @@ -51,7 +65,7 @@ typedef struct int level; } error_log; -typedef struct +typedef struct playlist_log { char *name; int logid; @@ -131,7 +145,7 @@ typedef struct _mount_proxy { unsigned int max_stream_duration; unsigned int max_listener_duration; - access_log access_log; + struct access_log access_log; char *redirect; char *stream_name; @@ -175,13 +189,14 @@ typedef struct _relay_server_master char *mount; int port; int timeout; + int skip; } relay_server_master; typedef struct _relay_server { struct _relay_server *next, *new_details; struct source_tag *source; - relay_server_master *masters; + relay_server_master *masters, *in_use; char *username; char *password; char *localmount; @@ -240,8 +255,6 @@ typedef struct ice_config_tag listener_t *listen_sock; unsigned int listen_sock_count; - ice_master_details *master; - char *master_server; int master_server_port; int master_update_interval; @@ -252,6 +265,7 @@ typedef struct ice_config_tag int master_ssl_port; int master_redirect; int max_redirects; + struct _redirect_host *redirect_hosts; relay_server *relay; @@ -267,12 +281,12 @@ typedef struct ice_config_tag char *cert_file; char *webroot_dir; char *adminroot_dir; - aliases *aliases; + struct _aliases *aliases; unsigned slaves_count; - access_log access_log; - error_log error_log; - playlist_log playlist_log; + struct access_log access_log; + struct error_log error_log; + struct playlist_log playlist_log; int chroot; int chuid; diff --git a/src/compat.h b/src/compat.h index 15b2ee98..a34b0a49 100644 --- a/src/compat.h +++ b/src/compat.h @@ -36,18 +36,24 @@ #endif /* Make sure we define 64 bit types */ -#ifdef _WIN32 +#if defined(_WIN32) || defined(_MINGW32_) # define PATH_SEPARATOR "\\" # define size_t unsigned int # define ssize_t int # define uint32_t unsigned int +# define fseeko fseek +# include // for alloca +//# define alloca _alloca +# define SCN_OFF_T "ld" +# define PRI_OFF_T "ld" #else # define PATH_SEPARATOR "/" -# if defined(HAVE_INTTYPES_H) +#endif + +#if defined(HAVE_INTTYPES_H) # include -# elif defined(HAVE_STDINT_H) +#elif defined(HAVE_STDINT_H) # include -# endif #endif #endif /* __COMPAT_H__ */ diff --git a/src/connection.c b/src/connection.c index 00e511dd..d72ec740 100644 --- a/src/connection.c +++ b/src/connection.c @@ -31,11 +31,17 @@ #ifdef _MSC_VER #include #include -#else +#endif +#ifdef HAVE_SYS_SOCKET_H #include +#endif +#ifdef HAVE_NETINET_IN_H #include +#endif +#ifdef HAVE_NETDB_H #include #endif + #ifdef HAVE_SIGNALFD #include #include @@ -218,6 +224,7 @@ void connection_initialize(void) void connection_shutdown(void) { + connection_listen_sockets_close (NULL, 1); thread_spin_destroy (&_connection_lock); } @@ -342,6 +349,126 @@ int connection_send (connection_t *con, const void *buf, size_t len) return bytes; } + +#ifdef WIN32 +#define IO_VECTOR_LEN(x) ((x)->len) +#define IO_VECTOR_BASE(x) ((x)->buf) +#else +#define IO_VECTOR_LEN(x) ((x)->iov_len) +#define IO_VECTOR_BASE(x) ((x)->iov_base) +#endif + +void connection_bufs_init (struct connection_bufs *v, short start) +{ + memset (v, 0, sizeof (struct connection_bufs)); + if (start && start < 500) + { + v->block = calloc (start, sizeof (IOVEC)); + v->max = start; + } +} + + +void connection_bufs_release (struct connection_bufs *v) +{ + free (v->block); + memset (v, 0, sizeof (struct connection_bufs)); +} + + +void connection_bufs_flush (struct connection_bufs *v) +{ + v->count = 0; + v->total = 0; +} + + +int connection_bufs_append (struct connection_bufs *v, void *buf, unsigned int len) +{ + if (v->count >= v->max) + { + int len = v->max + 16; + IOVEC *arr = realloc (v->block, (len*sizeof(IOVEC))); + v->max = len; + v->block = arr; + } + IO_VECTOR_BASE (v->block + v->count) = buf; + IO_VECTOR_LEN (v->block + v->count) = len; + v->count++; + v->total += len; + return v->total; + +} + + +static int connbufs_locate_start (struct connection_bufs *vects, int skip, IOVEC *old_value, int *offp) +{ + int sum = 0, i = vects->count; + IOVEC *p = vects->block; + + if (skip < vects->total) + { + for (; i; i--) + { + if (sum + IO_VECTOR_LEN(p) > skip) + { + int offset = skip - sum; + if (offset) + { + *old_value = *p; + IO_VECTOR_BASE(p) += offset; + IO_VECTOR_LEN(p) -= offset; + } + *offp = offset; + return p - vects->block; + } + sum += IO_VECTOR_LEN(p); + p++; + } + } + return -1; +} + + +int connection_bufs_send (connection_t *con, struct connection_bufs *vectors, int skip) +{ + IOVEC *p = vectors->block, old_vals; + int i = vectors->count, offset = 0, ret = -1; + + i = connbufs_locate_start (vectors, skip, &old_vals, &offset); + p = vectors->block + i; + + if (i >= 0) + { + if (not_ssl_connection (con)) + { + ret = sock_writev (con->sock, p, vectors->count - i); + if (ret < 0 && !sock_recoverable (sock_error())) + con->error = 1; + } +#ifdef HAVE_OPENSSL + else + { + IOVEC *io = p; + int bytes = 0; + for (; i < vectors->count; i++, io++) + { + int v = connection_send_ssl (con, IO_VECTOR_BASE(io), IO_VECTOR_LEN(io)); + if (v > 0) bytes += v; + if (v < IO_VECTOR_LEN(io)) break; + } + if (bytes > 0) ret = bytes; + } +#endif + if (offset) + *p = old_vals; + if (ret > 0) + con->sent_bytes += ret; + } + return ret; +} + + static void add_generic_text (avl_tree *t, const char *ip, time_t now) { char *str = strdup (ip); @@ -641,7 +768,7 @@ static sock_t wait_for_serversock (void) } #else fd_set rfds; - struct timeval tv, *p=NULL; + struct timeval tv; int i, ret; sock_t max = SOCK_ERROR; @@ -697,7 +824,7 @@ static client_t *accept_client (void) int i, num; refbuf_t *r; - if (sock_set_blocking (sock, 0) || sock_set_nodelay (sock)) + if (sock_set_blocking (sock, 0)) // || sock_set_nodelay (sock)) { WARN0 ("failed to set tcp options on client connection, dropping"); break; @@ -885,7 +1012,8 @@ static int http_client_request (client_t *client) if (agent && avl_get_by_key (useragents.contents, (char *)agent, &result) == 0) { - INFO1 ("dropping client because useragent is %s", agent); + INFO2 ("dropping client at %s because useragent is %s", + client->connection.ip, agent); return -1; } } @@ -977,8 +1105,7 @@ static void *connection_thread (void *arg) useragents.filename = strdup (config->agentfile); get_ssl_certificate (config); - if (config->chuid == 0) - connection_setup_sockets (config); + connection_setup_sockets (config); header_timeout = config->header_timeout; config_release_config (); @@ -1020,6 +1147,7 @@ static void *connection_thread (void *arg) return NULL; } + void connection_thread_startup () { #ifdef HAVE_SIGNALFD @@ -1034,6 +1162,7 @@ void connection_thread_startup () conn_tid = thread_create ("connection", connection_thread, NULL, THREAD_ATTACHED); } + void connection_thread_shutdown () { if (conn_tid) @@ -1375,41 +1504,71 @@ static int _handle_get_request (client_t *client) } -/* close any open listening sockets and reopen new listener sockets based on the settings - * in the configuration. +/* close any open listening sockets */ +void connection_listen_sockets_close (ice_config_t *config, int all_sockets) +{ + if (global.serversock) + { + int old = 0, new = 0, cur = global.server_sockets; + for (; old < cur; old++) + { + // close all listening sockets unless privileged ones are to stay open + // and it is still present in the config. + if (config && all_sockets == 0 && global.server_conn [old]->port < 1024) + { + listener_t *listener = config->listen_sock; + while (listener && listener->port != global.server_conn [old]->port) + listener = listener->next; + if (listener) + { + INFO2 ("Leaving port %d (%s) open", listener->port, + listener->bind_address ? listener->bind_address : ""); + if (new < old) + { + global.server_conn [new] = global.server_conn [old]; + global.serversock [new] = global.serversock [old]; + new++; + } + continue; + } + } + INFO1 ("Closing port %d", global.server_conn [old]->port); + sock_close (global.serversock [old]); + global.serversock [old] = SOCK_ERROR; + config_clear_listener (global.server_conn [old]); + global.server_sockets--; + } + if (global.server_sockets == 0) + { + free (global.serversock); + global.serversock = NULL; + free (global.server_conn); + global.server_conn = NULL; + } + } +} + + int connection_setup_sockets (ice_config_t *config) { int count = 0; listener_t *listener, **prev; + void *tmp; - global_lock(); - /* place sockets away from config, so we don't need to take config lock - * in the accept loop. */ - if (global.serversock) - { - for (; count < global.server_sockets; count++) - { - sock_close (global.serversock [count]); - config_clear_listener (global.server_conn [count]); - } - free (global.serversock); - global.serversock = NULL; - free (global.server_conn); - global.server_conn = NULL; - } - if (config == NULL) - { - global_unlock(); + if (global.server_sockets >= config->listen_sock_count) return 0; - } + global_lock(); - count = 0; - global.serversock = calloc (config->listen_sock_count, sizeof (sock_t)); - global.server_conn = calloc (config->listen_sock_count, sizeof (listener_t*)); + tmp = realloc (global.serversock, (config->listen_sock_count*sizeof (sock_t))); + if (tmp) global.serversock = tmp; + + tmp = realloc (global.server_conn, (config->listen_sock_count*sizeof (listener_t*))); + if (tmp) global.server_conn = tmp; listener = config->listen_sock; prev = &config->listen_sock; + count = global.server_sockets; while (listener) { int successful = 0; @@ -1429,6 +1588,8 @@ int connection_setup_sockets (ice_config_t *config) sock_set_send_buffer (sock, listener->so_sndbuf); sock_set_blocking (sock, 0); successful = 1; + if (count >= config->listen_sock_count) + abort(); global.serversock [count] = sock; global.server_conn [count] = listener; listener->refcount++; diff --git a/src/connection.h b/src/connection.h index 2039deb2..1b442982 100644 --- a/src/connection.h +++ b/src/connection.h @@ -13,6 +13,10 @@ #ifndef __CONNECTION_H__ #define __CONNECTION_H__ +#ifdef HAVE_WINSOCK2_H +#include +#endif + #include #include #ifdef HAVE_OPENSSL @@ -46,6 +50,15 @@ struct connection_tag char *ip; }; + +struct connection_bufs +{ + short count, max; + int total; + IOVEC *block; +}; + + #ifdef HAVE_OPENSSL #define not_ssl_connection(x) ((x)->ssl==NULL) #else @@ -63,6 +76,14 @@ void connection_uses_ssl (connection_t *con); void connection_add_banned_ip (const char *ip, int duration); void connection_release_banned_ip (const char *ip); void connection_stats (void); + +void connection_bufs_init (struct connection_bufs *vectors, short start); +void connection_bufs_release (struct connection_bufs *v); +void connection_bufs_flush (struct connection_bufs *v); +int connection_bufs_append (struct connection_bufs *vectors, void *buf, unsigned int len); +int connection_bufs_read (connection_t *con, struct connection_bufs *vecs, int skip); +int connection_bufs_send (connection_t *con, struct connection_bufs *vecs, int skip); + #ifdef HAVE_OPENSSL int connection_read_ssl (connection_t *con, void *buf, size_t len); int connection_send_ssl (connection_t *con, const void *buf, size_t len); @@ -76,6 +97,7 @@ int connection_check_relay_pass(http_parser_t *parser); int connection_check_admin_pass(http_parser_t *parser); void connection_close_sigfd (void); +void connection_listen_sockets_close (struct ice_config_tag *config, int all_sockets); extern int connection_running; diff --git a/src/event.c b/src/event.c index ceaece9a..3f25c6c3 100644 --- a/src/event.c +++ b/src/event.c @@ -64,14 +64,17 @@ void event_config_read (void) config_set_config (&new_config, &old_config); config_release_config(); + connection_thread_shutdown(); + redirector_clearall(); config = config_get_config(); yp_recheck_config (config); fserve_recheck_mime_types (config); stats_global (config); workers_adjust (config->workers_count); + connection_listen_sockets_close (config, 0); + redirector_setup (config); config_release_config(); - connection_thread_shutdown(); slave_restart(); config_clear (&old_config); } diff --git a/src/flv.c b/src/flv.c index ded916a5..6637f772 100644 --- a/src/flv.c +++ b/src/flv.c @@ -32,6 +32,7 @@ #include "logging.h" #include "mpeg.h" #include "format_mp3.h" +#include "global.h" #define CATMODULE "flv" @@ -105,11 +106,13 @@ static int flv_mpX_hdr (struct mpeg_sync *mp, unsigned char *frame, unsigned int flv->tag[15] |= 0x1; } memcpy (mp->raw->data + mp->raw_offset, &flv->tag[0], 16); + connection_bufs_append (&flv->bufs, mp->raw->data + mp->raw_offset, 16); flv->samples += mp->sample_count; flv->prev_ms = (int64_t)(flv->samples / (mp->samplerate/1000.0)); // The extra byte is for the flv audio id, usually 0x2F flv->prev_tagsize = (len + FLVHEADER + 1); mp->raw_offset += 16; + connection_bufs_append (&flv->bufs, frame, len); return 0; } @@ -132,11 +135,13 @@ static int flv_aac_hdr (struct mpeg_sync *mp, unsigned char *frame, unsigned int flv_hdr (flv, len + 2); // a single frame (headerless) follows this memcpy (mp->raw->data + mp->raw_offset, &flv->tag[0], 17); + connection_bufs_append (&flv->bufs, mp->raw->data + mp->raw_offset, 17); flv->samples += mp->sample_count; flv->prev_ms = (int64_t)(flv->samples / (mp->samplerate/1000.0)); // frame length + FLVHEADER + AVHEADER flv->prev_tagsize = (len + 11 + 2); mp->raw_offset += 17; + connection_bufs_append (&flv->bufs, frame, len); return 0; } @@ -171,6 +176,7 @@ static int flv_aac_firsthdr (struct mpeg_sync *mp, unsigned char *frame, unsigne flv->tag[15] = 0xAF; // AAC audio, need these codes first flv->tag[16] = 0x0; memcpy (mp->raw->data, &flv->tag[0], 11+4+2+c); + connection_bufs_append (&flv->bufs, mp->raw->data, 11+4+2+c); mp->raw_offset = 11+4+2+c; flv->prev_tagsize = 11 + 2 + c; flv->tag[16] = 0x01; // as per spec. headerless frame follows this @@ -184,31 +190,125 @@ static int flv_aac_firsthdr (struct mpeg_sync *mp, unsigned char *frame, unsigne static int send_flv_buffer (client_t *client, struct flv *flv) { int ret = 0; - char *buf = flv->mpeg_sync.raw->data + flv->block_pos; - unsigned int len = flv->mpeg_sync.raw_offset - flv->block_pos; + unsigned int len = flv->bufs.total - flv->block_pos; if (len > 0) { - if (len > 1400) - len = 1400; - ret = client_send_bytes (client, buf, len); + ret = connection_bufs_send (&client->connection, &flv->bufs, flv->block_pos); if (ret < (int)len) - client->schedule_ms += (ret ? 50 : 250); + client->schedule_ms += (ret > 0 ? 50 : 200); if (ret > 0) flv->block_pos += ret; } - if (flv->block_pos == flv->mpeg_sync.raw_offset) + if (flv->block_pos == flv->bufs.total) + { flv->block_pos = flv->mpeg_sync.raw_offset = 0; + connection_bufs_flush (&flv->bufs); + } return ret; } +void flv_write_metadata (struct flv *flv, refbuf_t *scmeta, const char *mount) +{ + /* the first assoc block is shoutcast meta, second is flv meta */ + int len; + struct flvmeta *flvm; + unsigned char prev_type = flv->tag[4]; + refbuf_t *flvmeta = NULL; + int meta_copied = 0; + refbuf_t *raw = flv->mpeg_sync.raw; + char *src, *dst = raw->data + flv->mpeg_sync.raw_offset; + + if (scmeta) + flvmeta = scmeta->associated; + if (flvmeta == NULL) + { + char *value = stats_get_value (mount, "server_name"); + + flvmeta = flv_meta_allocate (200); + if (value) + flv_meta_append_string (flvmeta, "name", value); + free (value); + value = stats_get_value (flv->mpeg_sync.mount, "title"); + if (value) + flv_meta_append_string (flvmeta, "title", value); + else + flv_meta_append_string (flvmeta, "title", ""); + free (value); + value = stats_get_value (mount, "audio_codecid"); + if (value) + { + int id = atoi (value); + if (id == 2 || id == 10) + flv_meta_append_number (flvmeta, "audiocodecid", (double)id); + free (value); + } + value = stats_get_value (mount, "ice-bitrate"); + if (value) + { + double rate = (double)atoi (value); + flv_meta_append_number (flvmeta, "audiodatarate", rate); + free (value); + } + value = stats_get_value (mount, "ice-samplerate"); + if (value) + { + double rate = (double)atoi (value); + flv_meta_append_number (flvmeta, "audiosamplerate", rate); + free (value); + } + value = stats_get_value (mount, "ice-channels"); + if (value) + { + int chann = atoi (value); + flv_meta_append_bool (flvmeta, "stereo", chann == 2 ? 1 : 0); + free (value); + } + flv_meta_append_string (flvmeta, NULL, NULL); + flvm = (struct flvmeta *)flvmeta->data; + meta_copied = flvm->meta_pos - sizeof (*flvm); + } + else + flvm = (struct flvmeta *)flvmeta->data; + len = flvm->meta_pos - sizeof (*flvm); + src = (char *)flvm + sizeof (*flvm); + + if (meta_copied + 15 > raw->len) + { + int newlen = raw->len + 1024; + void *p = realloc (raw->data, newlen); + if (p == NULL) + return; + raw->data = p; + raw->len = newlen; + } + flv->tag[4] = 18; // metadata + flv_hdr (flv, len); + memcpy (dst, &flv->tag[0], 15); + connection_bufs_append (&flv->bufs, dst, 15); + flv->mpeg_sync.raw_offset += 15; + dst += 15; + if (meta_copied) + { + memcpy (dst, src, len); + connection_bufs_append (&flv->bufs, dst, len); + flv->mpeg_sync.raw_offset += len; + refbuf_release (flvmeta); + } + else + connection_bufs_append (&flv->bufs, src, len); + flv->prev_tagsize = len + 11; + flv->tag[4] = prev_type; +} + + int write_flv_buf_to_client (client_t *client) { refbuf_t *ref = client->refbuf, *scmeta = ref->associated; mp3_client_data *client_mp3 = client->format_data; struct flv *flv = client_mp3->specific; - int ret, meta_to_free = 0; + int ret; if (client->pos >= ref->len) { @@ -216,99 +316,27 @@ int write_flv_buf_to_client (client_t *client) client->pos = ref->len; return -1; } + /* check for metadata updates and insert if needed */ - if (flv->seen_metadata != scmeta) - { - /* the first assoc block is shoutcast meta, second is flv meta */ - int len; - char *src, *dst = flv->mpeg_sync.raw->data; - refbuf_t *flvmeta = NULL; - struct flvmeta *flvm; - - if (scmeta) - flvmeta = scmeta->associated; - if (flvmeta == NULL) - { - char *value = stats_get_value (client->mount, "server_name"); - - flvmeta = flv_meta_allocate (200); - meta_to_free = 1; - if (value) - flv_meta_append_string (flvmeta, "name", value); - free (value); - value = stats_get_value (flv->mpeg_sync.mount, "title"); - if (value) - flv_meta_append_string (flvmeta, "title", value); - else - flv_meta_append_string (flvmeta, "title", ""); - free (value); - flv_meta_append_string (flvmeta, "title", value); - value = stats_get_value (client->mount, "audio_codecid"); - if (value) - { - int id = atoi (value); - if (id == 2 || id == 10) - flv_meta_append_number (flvmeta, "audiocodecid", (double)id); - free (value); - } - value = stats_get_value (client->mount, "ice-bitrate"); - if (value) - { - double rate = (double)atoi (value); - flv_meta_append_number (flvmeta, "audiodatarate", rate); - free (value); - } - value = stats_get_value (client->mount, "ice-samplerate"); - if (value) - { - double rate = (double)atoi (value); - flv_meta_append_number (flvmeta, "audiosamplerate", rate); - free (value); - } - value = stats_get_value (client->mount, "ice-channels"); - if (value) - { - int chann = atoi (value); - flv_meta_append_bool (flvmeta, "stereo", chann == 2 ? 1 : 0); - free (value); - } - flv_meta_append_string (flvmeta, NULL, NULL); - } - flvm = (struct flvmeta *)flvmeta->data; - src = (char *)flvm + sizeof (*flvm); - len = flvm->meta_pos - sizeof (*flvm); - - if (len + 15 < flv->mpeg_sync.raw->len) - { - unsigned char prev_type = flv->tag[4]; - flv->tag[4] = 18; // metadata - flv_hdr (flv, len); - memcpy (dst, &flv->tag[0], 15); - memcpy (dst+15, src, len); - flv->mpeg_sync.raw_offset = len + 15; - flv->prev_tagsize = len + 11; - flv->tag[4] = prev_type; - flv->seen_metadata = ref->associated; - if (meta_to_free) refbuf_release (flvmeta); - return send_flv_buffer (client, flv); - } - if (meta_to_free) refbuf_release (flvmeta); - flv->seen_metadata = ref->associated; - } - if (flv->mpeg_sync.raw_offset == 0) { int unprocessed = mpeg_complete_frames (&flv->mpeg_sync, ref, client->pos); + if (unprocessed < 0) return -1; if (unprocessed > 0) ref->len += unprocessed; /* output was truncated, so revert changes */ + + if (flv->seen_metadata != scmeta) + flv_write_metadata (flv, scmeta, client->mount); } ret = send_flv_buffer (client, flv); if (flv->mpeg_sync.raw_offset == 0) { client->pos = ref->len; client->queue_pos += client->refbuf->len; + if (flv->seen_metadata != scmeta) + flv->seen_metadata = scmeta; } return ret; } @@ -458,7 +486,8 @@ void flv_create_client_data (format_plugin_t *plugin, client_t *client) "\r\n" "FLV\x1\x4%c%c%c\x9", 0,0,0); - flv->mpeg_sync.raw = refbuf_new (4096); + // only flv headers in here, allows for up to 64 frames per read block, expandable + flv->mpeg_sync.raw = refbuf_new (1024); flv->tag[4] = 8; // Audio details only if (plugin->type == FORMAT_TYPE_AAC) { @@ -474,6 +503,7 @@ void flv_create_client_data (format_plugin_t *plugin, client_t *client) client->respcode = 200; client->refbuf->len = bytes; + connection_bufs_init (&flv->bufs, 10); } @@ -481,5 +511,6 @@ void free_flv_client_data (struct flv *flv) { flv->mpeg_sync.mount = NULL; mpeg_cleanup (&flv->mpeg_sync); + connection_bufs_release (&flv->bufs); } diff --git a/src/flv.h b/src/flv.h index 3fdb6d1a..86e242a4 100644 --- a/src/flv.h +++ b/src/flv.h @@ -25,6 +25,7 @@ struct flv int64_t samples; refbuf_t *seen_metadata; mpeg_sync mpeg_sync; + struct connection_bufs bufs; unsigned char tag[30]; }; diff --git a/src/format_flac.c b/src/format_flac.c index 98f18363..55002481 100644 --- a/src/format_flac.c +++ b/src/format_flac.c @@ -27,6 +27,7 @@ typedef struct source_tag source_t; #include "format_ogg.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-flac" #include "logging.h" diff --git a/src/format_kate.c b/src/format_kate.c index 3587a784..1fc799c8 100644 --- a/src/format_kate.c +++ b/src/format_kate.c @@ -31,6 +31,7 @@ typedef struct source_tag source_t; #include "format_kate.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-kate" #include "logging.h" diff --git a/src/format_midi.c b/src/format_midi.c index 2afd833c..1181da21 100644 --- a/src/format_midi.c +++ b/src/format_midi.c @@ -27,6 +27,7 @@ typedef struct source_tag source_t; #include "format_ogg.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-midi" #include "logging.h" diff --git a/src/format_mp3.c b/src/format_mp3.c index 68778105..1e18d09c 100644 --- a/src/format_mp3.c +++ b/src/format_mp3.c @@ -41,6 +41,7 @@ #include "format_mp3.h" #include "flv.h" #include "mpeg.h" +#include "global.h" #define CATMODULE "format-mp3" @@ -64,8 +65,9 @@ static int mpeg_process_buffer (client_t *client, format_plugin_t *plugin); /* client format flags */ -#define CLIENT_IN_METADATA CLIENT_FORMAT_BIT -#define CLIENT_USING_BLANK_META (CLIENT_FORMAT_BIT<<1) +#define CLIENT_INTERNAL_FORMAT (CLIENT_FORMAT_BIT << 4) +#define CLIENT_IN_METADATA (CLIENT_INTERNAL_FORMAT) +#define CLIENT_USING_BLANK_META (CLIENT_INTERNAL_FORMAT<<1) static refbuf_t blank_meta = { 0, 1, NULL, NULL, "\001StreamTitle='';", 17 }; @@ -138,7 +140,10 @@ static void mp3_set_tag (format_plugin_t *plugin, const char *tag, const char *i if (in_value) { - value = util_conv_string (in_value, charset, plugin->charset); + if (charset == NULL && plugin->charset) + charset = plugin->charset; + if (charset && (strcasecmp (charset, "utf-8") && strcasecmp (charset, "utf8"))) + value = util_conv_string (in_value, charset, "UTF8"); if (value == NULL) value = strdup (in_value); } @@ -450,6 +455,8 @@ static int send_stream_metadata (client_t *client, refbuf_t *refbuf) * to merge them into 1 write call */ block_len = refbuf->len - client->pos; + if (block_len > client_mp3->interval) + block_len = client_mp3->interval; // handle small intervals merge = alloca (block_len + meta_len); memcpy (merge, metadata, meta_len); @@ -502,7 +509,7 @@ static int format_mp3_write_buf_to_client (client_t *client) ret = client_send_bytes (client, buf, len); if (ret < len) - client->schedule_ms += 50; + client->schedule_ms += 80; if (ret > 0) { client_mp3->since_meta_block += ret; @@ -629,30 +636,33 @@ static int validate_mpeg (source_t *source, refbuf_t *refbuf) if (source_mp3->inline_metadata_interval > 0) { - /* inline metadata may need reading before the rest of the mpeg data */ - if (source_mp3->build_metadata_len == 0 && source_mp3->offset > unprocessed) - { - source_mp3->offset -= unprocessed; - } - else + if (source_mp3->inline_metadata_interval <= source_mp3->offset) { + // reached meta but we have a frame fragment, so keep it for later leftover = refbuf_new (unprocessed); memcpy (leftover->data, refbuf->data + refbuf->len, unprocessed); - mpeg_data_insert (mpeg_sync, leftover); /* will need to merge this after metadata */ - return 0; + mpeg_data_insert (mpeg_sync, leftover); + client->pos = 0; + return refbuf->len ? 0 : -1; } + // not reached the metadata block so save and rewind for completing the read + source_mp3->offset -= unprocessed; } /* make sure the new block has a minimum of queue_block_size */ if (unprocessed < source_mp3->queue_block_size) len = source_mp3->queue_block_size; else - len = unprocessed + 2000; + len = unprocessed + 1000; leftover = refbuf_new (len); memcpy (leftover->data, refbuf->data + refbuf->len, unprocessed); source_mp3->read_data = leftover; source_mp3->read_count = unprocessed; + client->pos = unprocessed; } + else + client->pos = 0; + if (source->format->read_bytes < 2500) stats_event_args (source->mount, "audio_codecid", "%d", (mpeg_sync->layer ? 2 : 10)); return refbuf->len ? 0 : -1; @@ -670,6 +680,8 @@ static refbuf_t *mp3_get_no_meta (source_t *source) return NULL; refbuf = source_mp3->read_data; + refbuf->len = source_mp3->read_count; + source_mp3->read_count = 0; source_mp3->read_data = NULL; if (client->format_data && validate_mpeg (source, refbuf) < 0) @@ -708,6 +720,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source) /* fill the buffer with the read data */ bytes = source_mp3->read_count; refbuf->len = 0; + while (bytes > 0) { unsigned int metadata_remaining; @@ -771,7 +784,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source) source_mp3->build_metadata_len = 0; } /* the data we have just read may of just been metadata */ - if (refbuf->len == 0) + if (refbuf->len <= 0) { refbuf_release (refbuf); return NULL; diff --git a/src/format_ogg.c b/src/format_ogg.c index ec0d88d6..f6219904 100644 --- a/src/format_ogg.c +++ b/src/format_ogg.c @@ -44,6 +44,7 @@ #include "format_flac.h" #include "format_kate.h" #include "format_skeleton.h" +#include "global.h" #define CATMODULE "format-ogg" #include "logging.h" @@ -524,7 +525,9 @@ static int send_ogg_headers (client_t *client, refbuf_t *headers) unsigned len = refbuf->len - client_data->pos; int ret = -1; - if (len < 8000 && client->connection.error == 0) + if (len > 8192) + len = 8192; + if (client->connection.error == 0) ret = client_send_bytes (client, data, len); if (ret > 0) { diff --git a/src/format_skeleton.c b/src/format_skeleton.c index b4df9c03..0e48be77 100644 --- a/src/format_skeleton.c +++ b/src/format_skeleton.c @@ -28,6 +28,7 @@ typedef struct source_tag source_t; #include "format_skeleton.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-skeleton" #include "logging.h" diff --git a/src/format_speex.c b/src/format_speex.c index fd513791..7c023a7b 100644 --- a/src/format_speex.c +++ b/src/format_speex.c @@ -26,6 +26,7 @@ typedef struct source_tag source_t; #include "format_speex.h" #include "refbuf.h" #include "client.h" +#include "global.h" #define CATMODULE "format-speex" #include "logging.h" diff --git a/src/format_theora.c b/src/format_theora.c index 6124b5a3..cc7cb784 100644 --- a/src/format_theora.c +++ b/src/format_theora.c @@ -27,6 +27,7 @@ #include "format_theora.h" #include "client.h" #include "stats.h" +#include "global.h" #define CATMODULE "format-theora" #include "logging.h" diff --git a/src/format_vorbis.c b/src/format_vorbis.c index 9a37de68..6294e1c9 100644 --- a/src/format_vorbis.c +++ b/src/format_vorbis.c @@ -30,6 +30,7 @@ #include "format_ogg.h" #include "stats.h" #include "format.h" +#include "global.h" #define CATMODULE "format-vorbis" #include "logging.h" diff --git a/src/fserve.c b/src/fserve.c index 7db6144d..55bb196d 100644 --- a/src/fserve.c +++ b/src/fserve.c @@ -14,6 +14,7 @@ #include #endif +#include "compat.h" #include #include #include @@ -27,15 +28,21 @@ #include #endif -#ifndef _WIN32 -#include -#include -#include -#define SCN_OFF_T SCNdMAX -#define PRI_OFF_T PRIdMAX -#else +#ifdef _MSC_VER #include #include +#else +#include +#include +# ifdef HAVE_SYS_SOCKET_H +# include +# endif +# ifndef SCN_OFF_T +# define SCN_OFF_T SCNdMAX +# endif +# ifndef PRI_OFF_T +# define PRI_OFF_T PRIdMAX +# endif #endif #include "thread/thread.h" @@ -53,7 +60,6 @@ #include "cfgfile.h" #include "util.h" #include "admin.h" -#include "compat.h" #include "slave.h" #include "fserve.h" @@ -178,6 +184,7 @@ char *fserve_content_type (const char *path) return type; } + static int _compare_fh(void *arg, void *a, void *b) { fh_node *x = a, *y = b; @@ -189,6 +196,7 @@ static int _compare_fh(void *arg, void *a, void *b) return 0; } + static int _delete_fh (void *mapping) { fh_node *fh = mapping; @@ -217,12 +225,33 @@ static int _delete_fh (void *mapping) } +static void remove_fh_from_cache (fh_node *fh) +{ + avl_tree_wlock (fh_cache); + avl_delete (fh_cache, fh, NULL); + avl_tree_unlock (fh_cache); + fh->peak = 0; +} + + static void remove_from_fh (fh_node *fh, client_t *client) { avl_delete (fh->clients, client, NULL); } +static fh_node *find_fh (fbinfo *finfo) +{ + fh_node fh, *result = NULL; + if (finfo->mount == NULL) + finfo->mount = ""; + memcpy (&fh.finfo, finfo, sizeof (fbinfo)); + if (avl_get_by_key (fh_cache, &fh, (void**)&result) == 0) + return result; + return NULL; +} + + /* find/create handle and return it with the structure in a locked state */ static fh_node *open_fh (fbinfo *finfo, client_t *client) { @@ -254,10 +283,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) if (result->format) { if (result->format->create_client_data && client->format_data == NULL) - { result->format->create_client_data (result->format, client); + if (result->format->write_buf_to_client) client->check_buffer = result->format->write_buf_to_client; - } } } DEBUG2 ("refcount now %d for %s", result->refcount, result->finfo.mount); @@ -271,9 +299,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) if (client) { if (finfo->flags & FS_FALLBACK) - DEBUG1 ("lookup of fallback file \"%s\"", finfo->mount); + INFO2 ("lookup of fallback file \"%s\" (%d)", finfo->mount, finfo->limit); else - DEBUG1 ("lookup of \"%s\"", finfo->mount); + INFO1 ("lookup of \"%s\"", finfo->mount); } fh->fp = fopen (fullpath, "rb"); if (fh->fp == NULL) @@ -292,7 +320,8 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) { char *contenttype = fserve_content_type (fullpath); - stats_event (finfo->mount, "fallback", "file"); + stats_event_flags (finfo->mount, "fallback", "file", STATS_COUNTERS|STATS_HIDDEN); + stats_event_flags (finfo->mount, "outgoing_kbitrate", "0", STATS_COUNTERS|STATS_HIDDEN); fh->format = calloc (1, sizeof (format_plugin_t)); fh->format->type = format_get_type (contenttype); free (contenttype); @@ -305,10 +334,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) return NULL; } if (fh->format->create_client_data && client->format_data == NULL) - { fh->format->create_client_data (fh->format, client); + if (fh->format->write_buf_to_client) client->check_buffer = fh->format->write_buf_to_client; - } } free (fullpath); } @@ -321,8 +349,8 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client) { if (finfo->mount && (finfo->flags & FS_FALLBACK)) { - stats_event_flags (fh->finfo.mount, "listeners", "1", STATS_HIDDEN); - stats_event_flags (fh->finfo.mount, "listener_peak", "1", STATS_HIDDEN); + stats_event_flags (fh->finfo.mount, "listeners", "1", STATS_GENERAL|STATS_HIDDEN); + stats_event_flags (fh->finfo.mount, "listener_peak", "1", STATS_GENERAL|STATS_HIDDEN); } avl_insert (fh->clients, client); } @@ -595,20 +623,16 @@ static void fh_release (fh_node *fh) if (fh->refcount > 1) { fh->refcount--; + stats_event_args (fh->finfo.mount, "listeners", "%ld", fh->refcount); thread_mutex_unlock (&fh->lock); return; } - /* now we will probably remove the fh, but to prevent a deadlock with - * open_fh, we drop the node lock and acquire the tree and node locks - * in that order and only remove if the refcount is still 0 */ - thread_mutex_unlock (&fh->lock); - avl_tree_wlock (fh_cache); - thread_mutex_lock (&fh->lock); - if (fh->refcount > 1) - thread_mutex_unlock (&fh->lock); - else - avl_delete (fh_cache, fh, _delete_fh); - avl_tree_unlock (fh_cache); + stats_event (fh->finfo.mount, "fallback", NULL); + stats_event (fh->finfo.mount, NULL, NULL); + if (fh->peak) + remove_fh_from_cache (fh); + // leave as locked + _delete_fh (fh); } @@ -686,10 +710,10 @@ static void fserve_move_listener (client_t *client) remove_from_fh (fh, client); if (fh->refcount == 1) stats_event (fh->finfo.mount, NULL, NULL); - f.flags = fh->finfo.flags; + f.flags = fh->finfo.flags|FS_OVERRIDE; f.limit = fh->finfo.limit; f.mount = fh->finfo.fallback; - f.fallback = NULL; + f.fallback = fh->finfo.mount; client->intro_offset = -1; move_listener (client, &f); fh_release (fh); @@ -857,14 +881,17 @@ static int throttled_file_send (client_t *client) return 0; } if (client->flags & CLIENT_WANTS_FLV) /* increase limit for flv clients as wrapping takes more space */ - limit = (unsigned long)(limit * 1.05); + limit = (unsigned long)(limit * 1.01); if (secs) rate = (client->counter+1400)/secs; // DEBUG3 ("counter %lld, duration %ld, limit %u", client->counter, secs, rate); thread_mutex_lock (&fh->lock); if (rate > limit || secs < 3) { - client->schedule_ms += 1000/(limit/1400); + if (limit >= 1400) + client->schedule_ms += 1000/(limit/1400); + else + client->schedule_ms += 50; // should not happen but guard against it rate_add (fh->format->out_bitrate, 0, worker->time_ms); if (secs > 2) { @@ -907,7 +934,10 @@ static int throttled_file_send (client_t *client) rate_add (fh->format->out_bitrate, bytes, worker->time_ms); thread_mutex_unlock (&fh->lock); global_add_bitrates (global.out_bitrate, bytes, worker->time_ms); - client->schedule_ms += (1000/(limit/1400*2)); + if (limit > 2800) + client->schedule_ms += (1000/(limit/1400*2)); + else + client->schedule_ms += 50; /* progessive slowdown if max bandwidth is exceeded. */ if (throttle_sends > 1) @@ -928,12 +958,23 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo) { if (finfo) { + mount_proxy *minfo; fh_node *fh; if (finfo->flags & FS_FALLBACK && finfo->limit == 0) return -1; fh = open_fh (finfo, client); if (fh == NULL) return -1; + minfo = config_find_mount (config_get_config(), finfo->mount); + if (minfo && minfo->max_listeners >= 0 && fh->refcount > minfo->max_listeners) + { + config_release_config(); + remove_from_fh (fh, client); + fh_release (fh); + client->shared_data = NULL; + return client_send_403redirect (client, finfo->mount, "max listeners reached"); + } + config_release_config(); client->shared_data = fh; if (fh->finfo.limit) @@ -957,9 +998,12 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo) client->ops = &buffer_content_ops; client->flags &= ~CLIENT_HAS_INTRO_CONTENT; + client->flags |= CLIENT_IN_FSERVE; if (client->flags & CLIENT_ACTIVE) { client->schedule_ms = client->worker->time_ms; + if (finfo && finfo->flags & FS_FALLBACK) + return 0; // prevent a recursive loop return client->ops->process (client); } else @@ -979,7 +1023,7 @@ int fserve_setup_client (client_t *client) } -void fserve_set_override (const char *mount, const char *dest) +int fserve_set_override (const char *mount, const char *dest) { fh_node fh, *result; @@ -987,15 +1031,23 @@ void fserve_set_override (const char *mount, const char *dest) fh.finfo.mount = (char *)mount; fh.finfo.fallback = NULL; - avl_tree_rlock (fh_cache); + avl_tree_wlock (fh_cache); if (avl_get_by_key (fh_cache, &fh, (void**)&result) == 0) { char *tmp = result->finfo.fallback; + thread_mutex_lock (&result->lock); + avl_delete (fh_cache, result, NULL); + avl_tree_unlock (fh_cache); + + result->finfo.flags |= FS_OVERRIDE; result->finfo.fallback = strdup (dest); + thread_mutex_unlock (&result->lock); free (tmp); INFO2 ("move clients from %s to %s", mount, dest); + return 1; } avl_tree_unlock (fh_cache); + return 0; } static int _delete_mapping(void *mapping) { @@ -1110,13 +1162,16 @@ int fserve_kill_client (client_t *client, const char *mount, int response) xmlDocSetRootElement(doc, node); snprintf (buf, sizeof(buf), "Client %d not found", id); + avl_tree_rlock (fh_cache); while (1) { - fh_node *fh = open_fh (&finfo, NULL); + avl_node *node; + fh_node *fh = find_fh (&finfo); if (fh) { - avl_node *node = avl_get_first (fh->clients); - + thread_mutex_lock (&fh->lock); + avl_tree_unlock (fh_cache); + node = avl_get_first (fh->clients); while (node) { client_t *listener = (client_t *)node->key; @@ -1130,61 +1185,46 @@ int fserve_kill_client (client_t *client, const char *mount, int response) } node = avl_get_next (node); } - fh_release (fh); + thread_mutex_unlock (&fh->lock); + avl_tree_rlock (fh_cache); } if (loop == 0) break; loop--; if (loop == 1) finfo.flags = FS_FALLBACK; } + avl_tree_unlock (fh_cache); xmlNewChild (node, NULL, XMLSTR("message"), XMLSTR(buf)); xmlNewChild (node, NULL, XMLSTR("return"), XMLSTR(v)); return admin_send_response (doc, client, response, "response.xsl"); } -int fserve_list_clients_xml (xmlNodePtr srcnode, fbinfo *finfo) +int fserve_list_clients_xml (xmlNodePtr parent, fbinfo *finfo) { int ret = 0; - fh_node *fh = open_fh (finfo, NULL); + fh_node *fh; + avl_node *anode; - if (fh) + avl_tree_rlock (fh_cache); + fh = find_fh (finfo); + if (fh == NULL) { - avl_node *anode = avl_get_first (fh->clients); - - while (anode) - { - client_t *listener = (client_t *)anode->key; - char buf [100]; - - xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL); - const char *useragent; - snprintf (buf, sizeof (buf), "%lu", listener->connection.id); - xmlSetProp (node, XMLSTR("id"), XMLSTR(buf)); - - xmlNewChild (node, NULL, XMLSTR("ip"), XMLSTR(listener->connection.ip)); - useragent = httpp_getvar (listener->parser, "user-agent"); - if (useragent) - { - xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(useragent)); - xmlNewChild (node, NULL, XMLSTR("useragent"), str); - xmlFree (str); - } - xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR( "0")); - snprintf (buf, sizeof (buf), "%lu", - (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time)); - xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf)); - if (listener->username) - { - xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(listener->username)); - xmlNewChild (node, NULL, XMLSTR("username"), str); - xmlFree (str); - } - - ret++; - anode = avl_get_next (anode); - } - fh_release (fh); + avl_tree_unlock (fh_cache); + return 0; } + thread_mutex_lock (&fh->lock); + avl_tree_unlock (fh_cache); + + anode = avl_get_first (fh->clients); + while (anode) + { + client_t *listener = (client_t *)anode->key; + + stats_listener_to_xml (listener, parent); + ret++; + anode = avl_get_next (anode); + } + thread_mutex_unlock (&fh->lock); return ret; } @@ -1208,17 +1248,33 @@ int fserve_list_clients (client_t *client, const char *mount, int response, int xmlSetProp(srcnode, XMLSTR("mount"), XMLSTR(mount)); ret = fserve_list_clients_xml (srcnode, &finfo); - if (ret == 0) + if (ret == 0 && finfo.flags&FS_FALLBACK) { - finfo.flags = 0; + finfo.flags = 0; // retry ret = fserve_list_clients_xml (srcnode, &finfo); } if (ret) { - char buf[20]; - snprintf (buf, sizeof(buf), "%u", ret); + char buf [20]; + snprintf (buf, sizeof(buf), "%d", ret); xmlNewChild (srcnode, NULL, XMLSTR("listeners"), XMLSTR(buf)); return admin_send_response (doc, client, response, "listclients.xsl"); } + xmlFree (doc); return client_send_400 (client, "mount does not exist"); } + + +int fserve_query_count (fbinfo *finfo) +{ + int ret = 0; + fh_node *fh; + + avl_tree_rlock (fh_cache); + fh = find_fh (finfo); + if (fh) + ret = fh->refcount; + avl_tree_unlock (fh_cache); + return ret; +} + diff --git a/src/fserve.h b/src/fserve.h index 0d1ac4f2..4fcbff8e 100644 --- a/src/fserve.h +++ b/src/fserve.h @@ -29,6 +29,7 @@ typedef struct _fbinfo #define FS_USE_ADMIN 01 #define FS_FALLBACK 02 #define FS_FALLBACK_EOF 04 +#define FS_OVERRIDE 010 void fserve_initialize(void); void fserve_shutdown(void); @@ -38,10 +39,12 @@ void fserve_recheck_mime_types (ice_config_t *config); int fserve_setup_client (client_t *client); int fserve_setup_client_fb (client_t *client, fbinfo *finfo); -void fserve_set_override (const char *mount, const char *dest); +int fserve_set_override (const char *mount, const char *dest); int fserve_list_clients (client_t *client, const char *mount, int response, int show_listeners); int fserve_list_clients_xml (xmlNodePtr srcnode, fbinfo *finfo); int fserve_kill_client (client_t *client, const char *mount, int response); +int fserve_query_count (fbinfo *finfo); + extern int fserve_running; diff --git a/src/global.c b/src/global.c index c034a001..f16a1587 100644 --- a/src/global.c +++ b/src/global.c @@ -55,11 +55,11 @@ void global_shutdown(void) thread_mutex_destroy(&_global_mutex); thread_spin_destroy (&global.spinlock); avl_tree_free(global.source_tree, NULL); + rate_free (global.out_bitrate); + global.out_bitrate = NULL; #ifdef MY_ALLOC avl_tree_free(global.alloc_tree, free_alloc_node); #endif - rate_free (global.out_bitrate); - global.out_bitrate = NULL; } void global_lock(void) diff --git a/src/global.h b/src/global.h index fb8efddd..edcf078b 100644 --- a/src/global.h +++ b/src/global.h @@ -13,7 +13,6 @@ #ifndef __GLOBAL_H__ #define __GLOBAL_H__ -#include "config.h" #define ICE_LISTEN_QUEUE 64 @@ -25,6 +24,7 @@ #include "thread/thread.h" #include "net/sock.h" #include "compat.h" +#include "avl/avl.h" typedef struct ice_global_tag { @@ -64,6 +64,11 @@ typedef struct ice_global_tag extern unsigned int throttle_sends; +extern void initialize_subsystems (void); +extern void shutdown_subsystems (void); +extern void server_process (void); +extern int server_init (int argc, char *argv[]); + #ifdef MY_ALLOC #define calloc(x,y) my_calloc(__func__,__LINE__,x,y) diff --git a/src/logging.c b/src/logging.c index f59cfd66..1093fd47 100644 --- a/src/logging.c +++ b/src/logging.c @@ -37,7 +37,7 @@ void fatal_error (const char *perr); int errorlog = 0; int playlistlog = 0; -#ifdef _WIN32 +#ifdef _MSC_VER /* Since strftime's %z option on win32 is different, we need to go through a few loops to get the same info as %z */ int get_clf_time (char *buffer, unsigned len, struct tm *t) @@ -107,12 +107,13 @@ int get_clf_time (char *buffer, unsigned len, struct tm *t) */ void logging_access_id (access_log *accesslog, client_t *client) { - char datebuf[128]; - char reqbuf[1024]; + const char *req; struct tm thetime; time_t now; time_t stayed; - const char *referrer, *user_agent, *username, *ip = "-"; + const char *referrer, *user_agent, *username = NULL, *ip = "-"; + char datebuf[50]; + char reqbuf[128]; if (client->flags & CLIENT_SKIP_ACCESSLOG) return; @@ -121,41 +122,56 @@ void logging_access_id (access_log *accesslog, client_t *client) localtime_r (&now, &thetime); /* build the data */ -#ifdef _WIN32 +#ifdef _MSC_VER memset(datebuf, '\000', sizeof(datebuf)); get_clf_time(datebuf, sizeof(datebuf)-1, &thetime); #else strftime (datebuf, sizeof(datebuf), LOGGING_FORMAT_CLF, &thetime); #endif + req = httpp_getvar (client->parser, HTTPP_VAR_RAWURI); + if (req == NULL) + req = httpp_getvar (client->parser, HTTPP_VAR_URI); /* build the request */ snprintf (reqbuf, sizeof(reqbuf), "%s %s %s/%s", - httpp_getvar (client->parser, HTTPP_VAR_REQ_TYPE), - httpp_getvar (client->parser, HTTPP_VAR_URI), + httpp_getvar (client->parser, HTTPP_VAR_REQ_TYPE), req, httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL), httpp_getvar (client->parser, HTTPP_VAR_VERSION)); stayed = now - client->connection.con_time; - - if (client->username == NULL) - username = "-"; - else - username = client->username; - + username = client->username; referrer = httpp_getvar (client->parser, "referer"); - if (referrer == NULL) - referrer = "-"; - user_agent = httpp_getvar (client->parser, "user-agent"); - if (user_agent == NULL) - user_agent = "-"; if (accesslog->log_ip) ip = client->connection.ip; - log_write_direct (accesslog->logid, - "%s - %s [%s] \"%s\" %d %" PRIu64 " \"%s\" \"%s\" %lu", - ip, username, - datebuf, reqbuf, client->respcode, client->connection.sent_bytes, - referrer, user_agent, (unsigned long)stayed); + + if (accesslog->type == LOG_ACCESS_CLF_ESC) + { + char *un = client->username ? util_url_escape (username) : strdup ("-"), + *rq = util_url_escape (reqbuf), + *rf = referrer ? util_url_escape (referrer) : strdup ("-"), + *ua = user_agent ? util_url_escape (user_agent) : strdup ("-"); + + log_write_direct (accesslog->logid, + "%s - %s %s %s %d %" PRIu64 " %.50s %.50s %lu", + ip, un, datebuf, rq, client->respcode, client->connection.sent_bytes, + rf, ua, (unsigned long)stayed); + free (ua); + free (rf); + free (rq); + free (un); + } + else + { + if (client->username == NULL) username = "-"; + if (referrer == NULL) referrer = "-"; + if (user_agent == NULL) user_agent = "-"; + + log_write_direct (accesslog->logid, + "%s - %s [%s] \"%s\" %d %" PRIu64 " \"%.50s\" \"%.50s\" %lu", + ip, username, datebuf, reqbuf, client->respcode, client->connection.sent_bytes, + referrer, user_agent, (unsigned long)stayed); + } client->respcode = -1; } @@ -183,7 +199,7 @@ void logging_playlist(const char *mount, const char *metadata, long listeners) localtime_r (&now, &thetime); /* build the data */ -#ifdef _WIN32 +#ifdef _MSC_VER memset(datebuf, '\000', sizeof(datebuf)); get_clf_time(datebuf, sizeof(datebuf)-1, &thetime); #else diff --git a/src/logging.h b/src/logging.h index e3abdae6..622ccdb6 100644 --- a/src/logging.h +++ b/src/logging.h @@ -85,7 +85,7 @@ extern int errorlog; #define LOGGING_FORMAT_CLF "%d/%b/%Y:%H:%M:%S %z" -void logging_access_id (access_log *accesslog, client_t *client); +void logging_access_id (struct access_log *accesslog, client_t *client); void logging_access(client_t *client); void logging_playlist(const char *mount, const char *metadata, long listeners); int restart_logging (ice_config_t *config); diff --git a/src/main.c b/src/main.c index 2c04c309..2339934f 100644 --- a/src/main.c +++ b/src/main.c @@ -20,9 +20,6 @@ #include #ifdef WIN32 -#define _WIN32_WINNT 0x0400 -/* For getpid() */ -//#include #include #include #endif @@ -93,7 +90,7 @@ static void _print_usage(void) } -void _initialize_subsystems(void) +void initialize_subsystems(void) { log_initialize(); errorlog = log_open_file (stderr); @@ -112,10 +109,10 @@ void _initialize_subsystems(void) #endif } -void _shutdown_subsystems(void) + +void shutdown_subsystems(void) { connection_shutdown(); - auth_shutdown(); slave_shutdown(); fserve_shutdown(); stats_shutdown(); @@ -212,9 +209,17 @@ static int _server_proc_init(void) return 1; } + /* this is the heart of the beast */ -static void _server_proc(void) +void server_process (void) { + INFO1 ("%s server started", ICECAST_VERSION_STRING); + + global.running = ICE_RUNNING; + + /* Do this after logging init */ + auth_initialise (); + if (background) { fclose (stdin); @@ -222,8 +227,8 @@ static void _server_proc(void) fclose (stderr); } slave_initialize(); - - connection_setup_sockets (NULL); + INFO0("Shutting down"); + auth_shutdown(); } /* chroot the process. Watch out - we need to do this before starting other @@ -258,7 +263,7 @@ static void _ch_root_uid_setup(void) } #endif -#ifdef CHROOT +#ifdef HAVE_CHROOT if (conf->chroot) { if(getuid()) /* root check */ @@ -303,115 +308,97 @@ static void _ch_root_uid_setup(void) #endif } -#ifdef WIN32_SERVICE -int mainService(int argc, char **argv) -#else -int main(int argc, char **argv) -#endif + +int server_init (int argc, char *argv[]) { - int res, ret; + int ret; char filename[512]; char pbuf[1024]; - /* parse the '-c icecast.xml' option - ** only, so that we can read a configfile - */ - res = _parse_config_opts(argc, argv, filename, 512); - if (res == 1) { -#if !defined(_WIN32) || defined(_CONSOLE) - /* startup all the modules */ - _initialize_subsystems(); -#endif - /* parse the config file */ - config_get_config(); - ret = config_initial_parse_file(filename); - config_release_config(); - if (ret < 0) { - snprintf(pbuf, sizeof(pbuf)-1, - "FATAL: error parsing config file (%s)", filename); - _fatal_error(pbuf); - switch (ret) { - case CONFIG_EINSANE: - _fatal_error("filename was null or blank"); - break; - case CONFIG_ENOROOT: - _fatal_error("no root element found"); - break; - case CONFIG_EBADROOT: - _fatal_error("root element is not "); - break; - default: - _fatal_error("XML config parsing error"); - break; - } -#if !defined(_WIN32) || defined(_CONSOLE) - _shutdown_subsystems(); -#endif + switch (_parse_config_opts (argc, argv, filename, 512)) + { + case -1: + _print_usage(); return -1; - } - } else if (res == -1) { - _print_usage(); - return -1; + default: + /* parse the config file */ + config_get_config(); + ret = config_initial_parse_file(filename); + config_release_config(); + if (ret < 0) + { + snprintf (pbuf, sizeof(pbuf), + "FATAL: error parsing config file (%s)", filename); + _fatal_error (pbuf); + switch (ret) + { + case CONFIG_EINSANE: + _fatal_error("filename was null or blank"); + break; + case CONFIG_ENOROOT: + _fatal_error("no root element found"); + break; + case CONFIG_EBADROOT: + _fatal_error("root element is not "); + break; + default: + _fatal_error("XML config parsing error"); + break; + } + return -1; + } } - + /* override config file options with commandline options */ config_parse_cmdline(argc, argv); /* Bind socket, before we change userid */ - if(!_server_proc_init()) { + if (_server_proc_init() == 0) + { _fatal_error("Server startup failed. Exiting"); - _shutdown_subsystems(); return -1; } - _ch_root_uid_setup(); /* Change user id and root if requested/possible */ fserve_initialize(); #ifdef CHUID /* We'll only have getuid() if we also have setuid(), it's reasonable to * assume */ - if(!getuid()) /* Running as root! Don't allow this */ + if (getuid() == 0) /* Running as root! Don't allow this */ { - fprintf(stderr, "ERROR: You should not run icecast2 as root\n"); - fprintf(stderr, "Use the changeowner directive in the config file\n"); - _shutdown_subsystems(); - return 1; + fprintf (stderr, "ERROR: You should not run icecast2 as root\n"); + fprintf (stderr, "Use the changeowner directive in the config file\n"); + return -1; } #endif - /* setup default signal handlers */ sighandler_initialize(); if (start_logging (config_get_config_unlocked()) < 0) { _fatal_error("FATAL: Could not start logging"); - _shutdown_subsystems(); return -1; } + return 0; +} - INFO1 ("%s server started", ICECAST_VERSION_STRING); - /* REM 3D Graphics */ +#ifndef _WIN32 +int main (int argc, char *argv[]) +{ + initialize_subsystems(); - /* let her rip */ - global.running = ICE_RUNNING; + if (server_init (argc, argv) == 0) + server_process(); - /* Do this after logging init */ - auth_initialise (); + shutdown_subsystems(); - _server_proc(); - - INFO0("Shutting down"); -#if !defined(_WIN32) || defined(_CONSOLE) - _shutdown_subsystems(); -#endif if (pidfile) { remove (pidfile); free (pidfile); } - return 0; } - +#endif diff --git a/src/md5.c b/src/md5.c index 43553388..e3279fee 100644 --- a/src/md5.c +++ b/src/md5.c @@ -42,6 +42,7 @@ #include #include #include +#include "global.h" static void MD5Transform(uint32_t buf[4], uint32_t const in[HASH_LEN]); diff --git a/src/mpeg.c b/src/mpeg.c index ec62fb41..c3eb3114 100644 --- a/src/mpeg.c +++ b/src/mpeg.c @@ -22,6 +22,7 @@ #include "compat.h" #include "mpeg.h" #include "format_mp3.h" +#include "global.h" #define CATMODULE "mpeg" #include "logging.h" @@ -70,8 +71,6 @@ static int handle_aac_frame (struct mpeg_sync *mp, unsigned char *p, int len) if (mp->frame_callback) if (mp->frame_callback (mp, s, raw_frame_len) < 0) return -1; - memcpy (mp->raw->data + mp->raw_offset, s, raw_frame_len); - mp->raw_offset += raw_frame_len; } return frame_len; } @@ -163,8 +162,6 @@ static int handle_mpeg_frame (struct mpeg_sync *mp, unsigned char *p, int remain if (mp->frame_callback) if (mp->frame_callback (mp, p, frame_len) < 0) return -1; - memcpy (mp->raw->data + mp->raw_offset, p, frame_len); - mp->raw_offset += frame_len; } return frame_len; } @@ -242,31 +239,31 @@ static int check_for_mp3 (struct mpeg_sync *mp, unsigned char *p, unsigned remai samplerate = get_mpegframe_samplerate (p); if (samplerate == 0) return -1; - while (checking) + do { - int frame_len = get_mpeg_frame_length (mp, fh); + int frame_len; + + if (remaining <= 4) + return 0; + if (fh [0] != 255 || fh [1] != p[1]) + return -1; + frame_len = get_mpeg_frame_length (mp, fh); if (frame_len <= 0 || frame_len > 3000) { //DEBUG2 ("checking frame %d, but len %d invalid", 5-checking, frame_len); return -1; } - if (frame_len+4 >= remaining) + if (frame_len > remaining) { //DEBUG3 ("checking frame %d, but need more data (%d,%d)", 5-checking, frame_len, remaining); return 0; } - if (samplerate != get_mpegframe_samplerate (fh+frame_len)) + if (samplerate != get_mpegframe_samplerate (fh)) return -1; - if (fh[frame_len] != 255 || fh[frame_len+1] != p[1]) - { - //DEBUG4 ("checking frame %d, but code is %x %x %x", 5-checking, fh[frame_len], fh[frame_len+1], fh[frame_len+2]); - return -1; - } //DEBUG4 ("frame %d checked, next header codes are %x %x %x", 5-checking, fh[frame_len], fh[frame_len+1], fh[frame_len+2]); remaining -= frame_len; fh += frame_len; - checking--; - } + } while (--checking); mp->samplerate = samplerate; if (((p[3] & 0xC0) >> 6) == 3) mp->channels = 1; @@ -375,20 +372,23 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset) mp->sample_count = 0; if (mp->surplus) { - int new_len = mp->surplus->len + new_block->len; - unsigned char *p = realloc (mp->surplus->data, new_len); + if (offset >= mp->surplus->len) + offset -= mp->surplus->len; + else + { + int new_len = mp->surplus->len + new_block->len; + unsigned char *p = realloc (mp->surplus->data, new_len); - memcpy (p+mp->surplus->len, new_block->data, new_block->len); - mp->surplus->data = new_block->data; - new_block->data = (void*)p; - new_block->len = new_len; + memcpy (p+mp->surplus->len, new_block->data, new_block->len); + mp->surplus->data = new_block->data; + new_block->data = (void*)p; + new_block->len = new_len; + } refbuf_release (mp->surplus); mp->surplus = NULL; } start = (unsigned char *)new_block->data + offset; remaining = new_block->len - offset; - if (mp->raw) - mp->raw_offset = 0; while (1) { end = (unsigned char*)new_block->data + new_block->len; @@ -398,7 +398,6 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset) break; if (!is_sync_byte (mp, start)) { - // need to resync int ret = find_align_sync (mp, start, remaining); if (ret == 0) break; // no sync in the rest, so dump it @@ -422,7 +421,7 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset) { if (remaining > 20000) return -1; - new_block->len = 0; + new_block->len = offset; return remaining; } } @@ -474,5 +473,6 @@ void mpeg_cleanup (mpeg_sync *mpsync) { refbuf_release (mpsync->surplus); refbuf_release (mpsync->raw); + mpsync->mount = NULL; } } diff --git a/src/sighandler.c b/src/sighandler.c index 083d0f5b..51e44a03 100644 --- a/src/sighandler.c +++ b/src/sighandler.c @@ -20,12 +20,12 @@ #include "avl/avl.h" #include "httpp/httpp.h" -#include "global.h" #include "connection.h" #include "refbuf.h" #include "client.h" #include "logging.h" #include "event.h" +#include "global.h" #define CATMODULE "sighandler" diff --git a/src/slave.c b/src/slave.c index 0373c2b6..756ad330 100644 --- a/src/slave.c +++ b/src/slave.c @@ -60,22 +60,14 @@ #include "format.h" #include "event.h" #include "yp.h" +#include "slave.h" #define CATMODULE "slave" -typedef struct _redirect_host -{ - struct _redirect_host *next; - time_t next_update; - char *server; - int port; -} redirect_host; - static void _slave_thread(void); static void redirector_add (const char *server, int port, int interval); static redirect_host *find_slave_host (const char *server, int port); -static void redirector_clearall (void); static int relay_startup (client_t *client); static int relay_read (client_t *client); static void relay_release (client_t *client); @@ -162,7 +154,6 @@ void slave_update_all_mounts (void) void slave_restart (void) { restart_connection_thread = 1; - redirector_clearall(); slave_update_all_mounts (); streamlist_check = 0; } @@ -370,6 +361,8 @@ static int open_relay_connection (client_t *client, relay_server *relay, relay_s else INFO3 ("connecting to %s:%d for %s", server, port, relay->localmount); + con->con_time = time (NULL); + relay->in_use = master; streamsock = sock_connect_wto_bind (server, port, bind, timeout); free (bind); if (connection_init (con, streamsock, server) < 0) @@ -446,6 +439,8 @@ static int open_relay_connection (client_t *client, relay_server *relay, relay_s if (parser) httpp_destroy (parser); connection_close (con); + con->con_time = time (NULL); + if (relay->in_use) relay->in_use->skip = 1; return -1; } @@ -462,6 +457,11 @@ int open_relay (relay_server *relay) { int ret; + if (master->skip) + { + INFO3 ("skipping %s:%d for %s", master->ip, master->port, relay->localmount); + continue; + } thread_mutex_unlock (&src->lock); ret = open_relay_connection (client, relay, master); thread_mutex_lock (&src->lock); @@ -531,6 +531,7 @@ static void *start_relay_stream (void *arg) src->yp_public = -1; } + relay->in_use = NULL; INFO2 ("listener count remaining on %s is %d", src->mount, src->listeners); src->flags &= ~SOURCE_PAUSE_LISTENERS; thread_mutex_unlock (&src->lock); @@ -968,6 +969,7 @@ static void slave_startup (void) update_settings = 0; update_all_mounts = 0; + redirector_setup (config); update_master_as_slave (config); stats_global (config); workers_adjust (config->workers_count); @@ -1058,7 +1060,7 @@ relay_server *slave_find_relay (relay_server *relays, const char *mount) /* drop all redirection details. */ -static void redirector_clearall (void) +void redirector_clearall (void) { thread_rwlock_wlock (&slaves_lock); while (redirectors) @@ -1073,6 +1075,21 @@ static void redirector_clearall (void) thread_rwlock_unlock (&slaves_lock); } + +void redirector_setup (ice_config_t *config) +{ + redirect_host *redir = config->redirect_hosts; + + thread_rwlock_wlock (&slaves_lock); + while (redir) + { + redirector_add (redir->server, redir->port, 0); + redir = redir->next; + } + thread_rwlock_unlock (&slaves_lock); +} + + /* Add new redirectors or update any existing ones */ void redirector_update (client_t *client) @@ -1096,7 +1113,15 @@ void redirector_update (client_t *client) redirect = find_slave_host (rserver, rport); if (redirect == NULL) { - redirector_add (rserver, rport, interval); + ice_config_t *config = config_get_config(); + unsigned int allowed = config->max_redirects; + + config_release_config(); + + if (global.redirect_count < allowed) + redirector_add (rserver, rport, interval); + else + INFO2 ("redirect to slave limit reached (%d, %d)", global.redirect_count, allowed); } else { @@ -1126,18 +1151,7 @@ static redirect_host *find_slave_host (const char *server, int port) static void redirector_add (const char *server, int port, int interval) { - ice_config_t *config = config_get_config(); - unsigned int allowed = config->max_redirects; - redirect_host *redirect; - - config_release_config(); - - if (global.redirect_count >= allowed) - { - INFO2 ("redirect to slave limit reached (%d, %d)", global.redirect_count, allowed); - return; - } - redirect = calloc (1, sizeof (redirect_host)); + redirect_host *redirect = calloc (1, sizeof (redirect_host)); if (redirect == NULL) abort(); redirect->server = strdup (server); @@ -1174,6 +1188,17 @@ static relay_server *get_relay_details (client_t *client) } +static void relay_reset (relay_server *relay) +{ + relay_server_master *server = relay->masters; + + source_clear_source (relay->source); + for (; server; server = server->next) + server->skip = 0; + INFO1 ("servers to be retried on %s", relay->localmount); +} + + static int relay_read (client_t *client) { relay_server *relay = get_relay_details (client); @@ -1185,30 +1210,31 @@ static int relay_read (client_t *client) if (relay->cleanup) relay->running = 0; if (relay->running == 0) source->flags &= ~SOURCE_RUNNING; - if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 2000000) + if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 1000000) source->flags &= ~SOURCE_RUNNING; return source_read (source); } if ((source->flags & SOURCE_TERMINATING) == 0) { + /* this section is for once through code */ int fallback = 1; if (client->connection.con_time) { - if (relay->running) + if (relay->running && relay->in_use) fallback = 0; - if (client->worker->current_time.tv_sec - client->connection.con_time < 3) + if (client->worker->current_time.tv_sec - client->connection.con_time < 60) { - /* force a delayed restart if stream cannot be maintained for 3 seconds, by - * which time any listeners in restart wait would of come back on */ - source->flags &= ~SOURCE_PAUSE_LISTENERS; - client->connection.con_time = 0; - fallback = 1; + /* force a server skip if a stream cannot be maintained for 1 min */ + WARN1 ("stream for %s died too quickly, skipping server for now", relay->localmount); + if (relay->in_use) relay->in_use->skip = 1; + } + else + relay_reset (relay); // spent some time on this so give other servers a chance + if (source->flags & SOURCE_TIMEOUT) + { + WARN1 ("stream for %s timed out, skipping server for now", relay->localmount); + if (relay->in_use) relay->in_use->skip = 1; } - global_lock(); - global.sources--; - stats_event_args (NULL, "sources", "%d", global.sources); - global_unlock(); - global_reduce_bitrate_sampling (global.out_bitrate); } /* don't pause listeners if relay shutting down */ if (relay->running == 0) @@ -1219,11 +1245,26 @@ static int relay_read (client_t *client) if (source->termination_count && source->termination_count <= source->listeners) { client->schedule_ms = client->worker->time_ms + 150; - DEBUG3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount); + if (client->worker->current_time.tv_sec - client->timer_start > 2) + { + client->schedule_ms += 400; + WARN3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount); + } + else + DEBUG3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount); thread_mutex_unlock (&source->lock); return 0; } DEBUG1 ("all listeners have now been checked on %s", relay->localmount); + if (client->connection.con_time) + { + global_lock(); + global.sources--; + stats_event_args (NULL, "sources", "%d", global.sources); + global_unlock(); + global_reduce_bitrate_sampling (global.out_bitrate); + } + client->timer_start = 0; client->parser = NULL; free (source->fallback.mount); source->fallback.mount = NULL; @@ -1241,6 +1282,7 @@ static int relay_read (client_t *client) return 0; /* listeners may be paused, recheck and let them leave this stream */ } INFO1 ("shutting down relay %s", relay->localmount); + stats_event_args (source->mount, "listeners", "%lu", source->listeners); thread_mutex_unlock (&source->lock); stats_event (relay->localmount, NULL, NULL); slave_update_all_mounts(); @@ -1249,11 +1291,11 @@ static int relay_read (client_t *client) do { if (relay->running) { - if (client->connection.con_time) + if (client->connection.con_time && relay->in_use) { INFO1 ("standing by to restart relay on %s", relay->localmount); if (relay->on_demand && source->listeners == 0) - source_clear_source (source); + relay_reset (relay); break; } else @@ -1268,8 +1310,11 @@ static int relay_read (client_t *client) client->schedule_ms = client->worker->time_ms + 3600000; } source->flags &= ~SOURCE_ON_DEMAND; - source_clear_source (source); + relay_reset (relay); } while (0); + stats_event_args (source->mount, "listeners", "%lu", source->listeners); + client->connection.con_time = 0; + source->stats = 0; stats_event (relay->localmount, NULL, NULL); slave_update_all_mounts(); @@ -1343,37 +1388,45 @@ static int relay_startup (client_t *client) if (relay->on_demand) { - source_t *src = relay->source; - int start_relay = src->listeners; // 0 or non-zero + source_t *source = relay->source; + int fallback_def = 0, start_relay = source->listeners; // 0 or non-zero + mount_proxy *mountinfo = config_find_mount (config_get_config(), source->mount); - src->flags |= SOURCE_ON_DEMAND; - if (worker->current_time.tv_sec % 10 == 0) + source->flags |= SOURCE_ON_DEMAND; + if (mountinfo && mountinfo->fallback_mount) { - mount_proxy *mountinfo = config_find_mount (config_get_config(), src->mount); - if (mountinfo && mountinfo->fallback_mount) + source_t *fallback; + + avl_tree_rlock (global.source_tree); + fallback = source_find_mount (mountinfo->fallback_mount); + fallback_def = 1; + if (fallback) { - source_t *fallback; - avl_tree_rlock (global.source_tree); - fallback = source_find_mount (mountinfo->fallback_mount); - if (fallback) - { - if (strcmp (fallback->mount, src->mount) != 0) - { - // if there are listeners not already being moved - if (fallback->listeners && fallback->fallback.mount == NULL) - start_relay = 1; - } - } + if (strcmp (fallback->mount, source->mount) != 0 && fallback->listeners) + start_relay = 1; avl_tree_unlock (global.source_tree); } - config_release_config(); + else + { + fbinfo finfo; + + avl_tree_unlock (global.source_tree); + finfo.flags = FS_FALLBACK; + finfo.mount = (char *)mountinfo->fallback_mount; + finfo.fallback = NULL; + + // need to check for listeners on the fallback + if (fserve_query_count (&finfo) > 0) + start_relay = 1; // force a start if there is fallback defined + } } + config_release_config(); if (start_relay == 0) { - client->schedule_ms = worker->time_ms + 60000; + client->schedule_ms = worker->time_ms + (fallback_def ? (relay->interval*1000) : 60000); return 0; } - INFO1 ("Detected listeners on relay %s", relay->localmount); + INFO1 ("starting on-demand relay %s", relay->localmount); } /* limit the number of relays starting up at the same time */ diff --git a/src/slave.h b/src/slave.h index a1309697..89493c8e 100644 --- a/src/slave.h +++ b/src/slave.h @@ -22,6 +22,8 @@ void slave_update_all_mounts (void); void slave_rebuild_mounts (void); relay_server *slave_find_relay (relay_server *relays, const char *mount); int redirect_client (const char *mountpoint, client_t *client); +void redirector_clearall (void); +void redirector_setup (ice_config_t *config); void redirector_update (struct _client_tag *client); relay_server *relay_free (relay_server *relay); diff --git a/src/source.c b/src/source.c index 6102ab55..48261f43 100644 --- a/src/source.c +++ b/src/source.c @@ -149,8 +149,10 @@ source_t *source_reserve (const char *mount) src->listener_send_trigger = 10000; src->format = calloc (1, sizeof(format_plugin_t)); src->clients = avl_tree_new (client_compare, NULL); + src->stats = stats_handle (mount); thread_mutex_create (&src->lock); + stats_release (src->stats); avl_insert (global.source_tree, src); @@ -361,22 +363,22 @@ static void update_source_stats (source_t *source) unsigned long kbytes_read = source->bytes_read_since_update/1024; source->format->sent_bytes += kbytes_sent*1024; - stats_event_args (source->mount, "outgoing_kbitrate", "%ld", + source->stats = stats_lock (source->stats, source->mount); + stats_set_args (source->stats, "outgoing_kbitrate", "%ld", (long)(8 * rate_avg (source->format->out_bitrate))/1024); - stats_event_args (source->mount, "incoming_bitrate", "%ld", (8 * incoming_rate)); - stats_event_args (source->mount, "total_bytes_read", - "%"PRIu64, source->format->read_bytes); - stats_event_args (source->mount, "total_bytes_sent", - "%"PRIu64, source->format->sent_bytes); - stats_event_args (source->mount, "total_mbytes_sent", + stats_set_args (source->stats, "incoming_bitrate", "%ld", (8 * incoming_rate)); + stats_set_args (source->stats, "total_bytes_read", "%"PRIu64, source->format->read_bytes); + stats_set_args (source->stats, "total_bytes_sent", "%"PRIu64, source->format->sent_bytes); + stats_set_args (source->stats, "total_mbytes_sent", "%"PRIu64, source->format->sent_bytes/(1024*1024)); - stats_event_args (source->mount, "queue_size", "%u", source->queue_size); + stats_set_args (source->stats, "queue_size", "%u", source->queue_size); if (source->client->connection.con_time) { worker_t *worker = source->client->worker; - stats_event_args (source->mount, "connected", "%"PRIu64, + stats_set_args (source->stats, "connected", "%"PRIu64, (uint64_t)(worker->current_time.tv_sec - source->client->connection.con_time)); } + stats_release (source->stats); stats_event_add (NULL, "stream_kbytes_sent", kbytes_sent); stats_event_add (NULL, "stream_kbytes_read", kbytes_read); @@ -443,17 +445,6 @@ int source_read (source_t *source) if (source_change_worker (source)) return 1; } - if (source->limit_rate) - { - if (source->limit_rate < (8 * source->incoming_rate)) - { - rate_add (source->format->in_bitrate, 0, current); - source->skip_duration += 300; - client->schedule_ms += 150; - thread_mutex_unlock (&source->lock); - return 0; - } - } fds = util_timed_wait_for_fd (client->connection.sock, 0); if (fds < 0) { @@ -473,13 +464,14 @@ int source_read (source_t *source) DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read, source->timeout, (long)current); WARN1 ("Disconnecting %s due to socket timeout", source->mount); - source->flags &= ~(SOURCE_RUNNING|SOURCE_PAUSE_LISTENERS); + source->flags &= ~SOURCE_RUNNING; + source->flags |= SOURCE_TIMEOUT; skip = 0; break; } - source->skip_duration = (int)(source->skip_duration * 1.6); - if (source->skip_duration > 700) - source->skip_duration = 700; + source->skip_duration = (int)(source->skip_duration * 1.3); + if (source->skip_duration > 400) + source->skip_duration = 400; break; } source->skip_duration = (long)(source->skip_duration * 0.9); @@ -589,7 +581,18 @@ static int source_client_read (client_t *client) INFO1 ("streaming duration expired on %s", source->mount); } if (source_running (source)) + { + if (source->limit_rate) + { + source->incoming_rate = (long)rate_avg (source->format->in_bitrate); + if (source->limit_rate < (8 * source->incoming_rate)) + { + rate_add (source->format->in_bitrate, 0, client->worker->current_time.tv_sec); + client->schedule_ms += 200; + } + } return source_read (source); + } else { if ((source->flags & SOURCE_TERMINATING) == 0) @@ -612,6 +615,7 @@ static int source_client_read (client_t *client) return 0; } INFO1 ("no more listeners on %s", source->mount); + stats_event_args (source->mount, "listeners", "%lu", source->listeners); client->connection.discon_time = 0; client->ops = &source_client_halt_ops; free (source->fallback.mount); @@ -705,10 +709,11 @@ static int locate_start_on_queue (source_t *source, client_t *client) } -static int http_source_intro (client_t *client) +static int http_source_introfile (client_t *client) { source_t *source = client->shared_data; + //DEBUG2 ("client intro_pos is %ld, sent bytes is %ld", client->intro_offset, client->connection.sent_bytes); if (format_file_read (client, source->format, source->intro_file) < 0) { if (source->stream_data_tail) @@ -726,6 +731,21 @@ static int http_source_intro (client_t *client) } +static int http_source_intro (client_t *client) +{ + /* we only need to send the intro if nothing else has been sent */ + if (client->connection.sent_bytes > 0) + { + client_set_queue (client, NULL); + client->check_buffer = source_queue_advance; + return source_queue_advance (client); + } + client->intro_offset = 0; + client->check_buffer = http_source_introfile; + return http_source_introfile (client); +} + + static int http_source_listener (client_t *client) { refbuf_t *refbuf = client->refbuf; @@ -818,11 +838,17 @@ static int wait_for_restart (client_t *client) { source_t *source = client->shared_data; - if (source_running (source) || (source->flags & SOURCE_PAUSE_LISTENERS) == 0 || client->connection.error) + if (client->worker->current_time.tv_sec - client->timer_start > 15) + client->connection.error = 1; // in here too long, drop client + + if (source_running (source) || client->connection.error || + (source->flags & SOURCE_PAUSE_LISTENERS) == 0 || + (source->flags & (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC))) { client->ops = &listener_client_ops; return 0; } + if (source->flags & SOURCE_LISTENERS_SYNC) client->schedule_ms = client->worker->time_ms + 100; else @@ -895,6 +921,7 @@ int listener_waiting_on_source (source_t *source, client_t *client) client->ops = &listener_pause_ops; client->flags |= CLIENT_HAS_MOVED; client->schedule_ms = client->worker->time_ms + 60; + client->timer_start = client->worker->current_time.tv_sec; return 0; } return -1; @@ -911,7 +938,7 @@ static int send_listener (source_t *source, client_t *client) { int bytes; int loop = 8; /* max number of iterations in one go */ - long total_written = 0; + long total_written = 0, limiter = source->listener_send_trigger; int ret = 0, lag; if (source->flags & SOURCE_LISTENERS_SYNC) @@ -941,6 +968,9 @@ static int send_listener (source_t *source, client_t *client) lag = source->client->queue_pos - client->queue_pos; + if (source->incoming_rate && lag < source->incoming_rate) + limiter = source->incoming_rate/2; + /* progessive slowdown if nearing max bandwidth. */ if (global.max_rate) { @@ -961,9 +991,6 @@ static int send_listener (source_t *source, client_t *client) client->schedule_ms += 150; } } - if (source->incoming_rate > 1 && lag < (source->incoming_rate<<1)) - loop = lag / (source->incoming_rate>>1); - while (1) { /* jump out if client connection has died */ @@ -974,7 +1001,7 @@ static int send_listener (source_t *source, client_t *client) } /* lets not send too much to one client in one go, but don't sleep for too long if more data can be sent */ - if (loop == 0 || total_written > source->listener_send_trigger) + if (loop == 0 || total_written > limiter) { client->schedule_ms = client->worker->time_ms + 15; break; @@ -1035,6 +1062,7 @@ void source_init (source_t *source) stats_event_flags (source->mount, "total_bytes_read", "0", STATS_COUNTERS); stats_event_flags (source->mount, "outgoing_kbitrate", "0", STATS_COUNTERS); stats_event_flags (source->mount, "incoming_bitrate", "0", STATS_COUNTERS); + stats_event_flags (source->mount, "queue_size", "0", STATS_COUNTERS); stats_event_flags (source->mount, "connected", "0", STATS_COUNTERS); stats_event_flags (source->mount, "source_ip", source->client->connection.ip, STATS_COUNTERS); @@ -1055,7 +1083,7 @@ void source_init (source_t *source) if (str) { _parse_audio_info (source, str); - stats_event (source->mount, "audio_info", str); + stats_event_flags (source->mount, "audio_info", str, STATS_GENERAL); } } source->format->in_bitrate = rate_setup (60, 1); @@ -1094,9 +1122,10 @@ void source_init (source_t *source) } -void source_set_override (const char *mount, const char *dest) +int source_set_override (const char *mount, const char *dest) { source_t *source; + int ret = 0; avl_tree_rlock (global.source_tree); source = source_find_mount (mount); @@ -1111,13 +1140,20 @@ void source_set_override (const char *mount, const char *dest) source->fallback.mount = strdup (dest); source->termination_count = source->listeners; source->flags |= SOURCE_LISTENERS_SYNC; + ret = 1; } thread_mutex_unlock (&source->lock); } + avl_tree_unlock (global.source_tree); + if (ret) + INFO2 ("moving from %s to %s", mount, dest); } else - fserve_set_override (mount, dest); - avl_tree_unlock (global.source_tree); + { + avl_tree_unlock (global.source_tree); + ret = fserve_set_override (mount, dest); + } + return ret; } @@ -1127,26 +1163,37 @@ void source_set_fallback (source_t *source, const char *dest_mount) client_t *client = source->client; time_t connected = client->worker->current_time.tv_sec - client->connection.con_time; - if (dest_mount == NULL) + if (dest_mount == NULL || source->listeners == 0) + { + INFO1 ("Skipping fallback on %s, no listeners", source->mount); return; - if (connected > 20) - bitrate = (int)(rate_avg (source->format->in_bitrate) * 1.02); + } + if (connected > 40) + bitrate = (int)rate_avg (source->format->in_bitrate); + //bitrate = (int)(rate_avg (source->format->in_bitrate) * 1.02); if (bitrate == 0 && source->limit_rate) bitrate = source->limit_rate; - source->fallback.flags = FS_FALLBACK; - source->fallback.mount = strdup (dest_mount); - source->fallback.limit = bitrate; - INFO4 ("fallback set on %s to %s(%d) with %d listeners", source->mount, dest_mount, - source->fallback.limit, source->listeners); + if (source->listeners) + { + source->fallback.flags = FS_FALLBACK; + source->fallback.mount = strdup (dest_mount); + source->fallback.limit = bitrate; + INFO4 ("fallback set on %s to %s(%d) with %d listeners", source->mount, dest_mount, + source->fallback.limit, source->listeners); + } + else + INFO4 ("Skipping fallback to %s on %s (bitrate %d, listeners %d)", dest_mount, + source->mount, bitrate, source->listeners); } + void source_shutdown (source_t *source, int with_fallback) { mount_proxy *mountinfo; INFO1("Source \"%s\" exiting", source->mount); - source->flags &= ~SOURCE_ON_DEMAND; + source->flags &= ~(SOURCE_ON_DEMAND|SOURCE_TIMEOUT); source->termination_count = source->listeners; mountinfo = config_find_mount (config_get_config(), source->mount); if (source->client->connection.con_time) @@ -1163,6 +1210,7 @@ void source_shutdown (source_t *source, int with_fallback) if (mountinfo && with_fallback && global.running == ICE_RUNNING) source_set_fallback (source, mountinfo->fallback_mount); config_release_config(); + source->client->timer_start = source->client->worker->current_time.tv_sec; source->flags |= (SOURCE_TERMINATING | SOURCE_LISTENERS_SYNC); } @@ -1186,13 +1234,13 @@ static void _parse_audio_info (source_t *source, const char *s) char name[100], value[200]; int n = sscanf (start, "%99[^=]=%199[^;\r\n]", name, value); - if (n == 2 && strncmp (name, "ice-", 4) == 0) + if (n == 2 && (strncmp (name, "ice-", 4) == 0 || strncmp (name, "bitrate=", 7) == 0)) { char *esc = util_url_unescape (value); if (esc) { util_dict_set (source->audio_info, name, esc); - stats_event (source->mount, name, esc); + stats_event_flags (source->mount, name, esc, STATS_COUNTERS); } free (esc); } @@ -1215,7 +1263,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) INFO2 ("Applying mount information for \"%s\" from \"%s\"", source->mount, mountinfo->mountname); - stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners); + stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners); /* if a setting is available in the mount details then use it, else * check the parser details. */ @@ -1246,7 +1294,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) } while (0); val = atoi (str); } - stats_event_args (source->mount, "public", "%d", val); + stats_set_args (source->stats, "public", "%d", val); if (source->yp_public != val) { DEBUG1 ("YP changed to %d", val); @@ -1259,7 +1307,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) /* stream name */ if (mountinfo && mountinfo->stream_name) - stats_event (source->mount, "server_name", mountinfo->stream_name); + stats_set (source->stats, "server_name", mountinfo->stream_name); else { do { @@ -1272,12 +1320,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) str = "Unspecified name"; } while (0); if (source->format) - stats_event_conv (source->mount, "server_name", str, source->format->charset); + stats_set_conv (source->stats, "server_name", str, source->format->charset); } /* stream description */ if (mountinfo && mountinfo->stream_description) - stats_event (source->mount, "server_description", mountinfo->stream_description); + stats_set (source->stats, "server_description", mountinfo->stream_description); else { do { @@ -1289,12 +1337,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) if (str) break; } while (0); if (str && source->format) - stats_event_conv (source->mount, "server_description", str, source->format->charset); + stats_set_conv (source->stats, "server_description", str, source->format->charset); } /* stream URL */ if (mountinfo && mountinfo->stream_url) - stats_event (source->mount, "server_url", mountinfo->stream_url); + stats_set (source->stats, "server_url", mountinfo->stream_url); else { do { @@ -1306,12 +1354,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) if (str) break; } while (0); if (str && source->format) - stats_event_conv (source->mount, "server_url", str, source->format->charset); + stats_set_conv (source->stats, "server_url", str, source->format->charset); } /* stream genre */ if (mountinfo && mountinfo->stream_genre) - stats_event (source->mount, "genre", mountinfo->stream_genre); + stats_set (source->stats, "genre", mountinfo->stream_genre); else { do { @@ -1324,12 +1372,15 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) str = "various"; } while (0); if (source->format) - stats_event_conv (source->mount, "genre", str, source->format->charset); + stats_set_conv (source->stats, "genre", str, source->format->charset); } /* stream bitrate */ if (mountinfo && mountinfo->bitrate) + { str = mountinfo->bitrate; + stats_set (source->stats, "bitrate", str); + } else { do { @@ -1339,23 +1390,24 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo) if (str) break; str = httpp_getvar (parser, "x-audiocast-bitrate"); } while (0); + if (str) + stats_set (source->stats, "bitrate", str); } - stats_event (source->mount, "bitrate", str); /* handle MIME-type */ if (mountinfo && mountinfo->type) - stats_event (source->mount, "server_type", mountinfo->type); + stats_set (source->stats, "server_type", mountinfo->type); else if (source->format && source->format->contenttype) - stats_event (source->mount, "server_type", source->format->contenttype); + stats_set (source->stats, "server_type", source->format->contenttype); if (mountinfo && mountinfo->subtype) - stats_event (source->mount, "subtype", mountinfo->subtype); + stats_set (source->stats, "subtype", mountinfo->subtype); if (mountinfo && mountinfo->auth) - stats_event (source->mount, "authenticator", mountinfo->auth->type); + stats_set (source->stats, "authenticator", mountinfo->auth->type); else - stats_event (source->mount, "authenticator", NULL); + stats_set (source->stats, "authenticator", NULL); source->limit_rate = 0; if (mountinfo && mountinfo->limit_rate) @@ -1437,11 +1489,12 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy source->min_queue_size = config->min_queue_size; source->timeout = config->source_timeout; source->default_burst_size = config->burst_size; + source->stats = stats_handle (source->mount); len = strlen (config->hostname) + strlen(source->mount) + 16; listen_url = alloca (len); snprintf (listen_url, len, "http://%s:%d%s", config->hostname, config->port, source->mount); - stats_event_flags (source->mount, "listenurl", listen_url, STATS_COUNTERS); + stats_set_flags (source->stats, "listenurl", listen_url, STATS_COUNTERS); source_apply_mount (source, mountinfo); @@ -1450,11 +1503,11 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy if (source->flags & SOURCE_ON_DEMAND) { DEBUG0 ("on_demand set"); - stats_event (source->mount, "on_demand", "1"); - stats_event_args (source->mount, "listeners", "%ld", source->listeners); + stats_set (source->stats, "on_demand", "1"); + stats_set_args (source->stats, "listeners", "%ld", source->listeners); } else - stats_event (source->mount, "on_demand", NULL); + stats_set (source->stats, "on_demand", NULL); if (mountinfo) { @@ -1465,23 +1518,24 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy if (mountinfo->fallback_when_full) DEBUG1 ("fallback_when_full to %u", mountinfo->fallback_when_full); DEBUG1 ("max listeners to %d", mountinfo->max_listeners); - stats_event_args (source->mount, "max_listeners", "%d", mountinfo->max_listeners); - stats_event_flags (source->mount, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN); + stats_set_args (source->stats, "max_listeners", "%d", mountinfo->max_listeners); + stats_set_flags (source->stats, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN); if (mountinfo->hidden) { - stats_event_flags (source->mount, NULL, NULL, STATS_HIDDEN); + stats_set_flags (source->stats, NULL, NULL, STATS_HIDDEN); DEBUG0 ("hidden from public"); } else - stats_event_flags (source->mount, NULL, NULL, 0); + stats_set_flags (source->stats, NULL, NULL, 0); } else { DEBUG0 ("max listeners is not specified"); - stats_event (source->mount, "max_listeners", "unlimited"); - stats_event_flags (source->mount, "cluster_password", NULL, STATS_SLAVE); - stats_event_flags (source->mount, NULL, NULL, STATS_PUBLIC); + stats_set (source->stats, "max_listeners", "unlimited"); + stats_set_flags (source->stats, "cluster_password", NULL, STATS_SLAVE); + stats_set_flags (source->stats, NULL, NULL, STATS_PUBLIC); } + stats_release (source->stats); DEBUG1 ("public set to %d", source->yp_public); DEBUG1 ("queue size to %u", source->queue_size_limit); DEBUG1 ("min queue size to %u", source->min_queue_size); @@ -1506,8 +1560,9 @@ static int source_client_callback (client_t *client) agent = httpp_getvar (source->client->parser, "user-agent"); if (agent) - stats_event (source->mount, "user_agent", agent); + stats_event_flags (source->mount, "user_agent", agent, STATS_COUNTERS); stats_event_inc(NULL, "source_client_connections"); + client_set_queue (client, NULL); source_init (source); client->ops = &source_client_ops; @@ -1594,18 +1649,25 @@ void source_recheck_mounts (int update_all) } else if (update_all) { - stats_event_flags (mount->mountname, NULL, NULL, mount->hidden?STATS_HIDDEN:0); - stats_event_args (mount->mountname, "listenurl", "http://%s:%d%s", + long stats = stats_handle (mount->mountname); + stats_set_flags (stats, NULL, NULL, mount->hidden?STATS_HIDDEN:0); + stats_set_args (stats, "listenurl", "http://%s:%d%s", config->hostname, config->port, mount->mountname); - stats_event (mount->mountname, "listeners", "0"); + stats_set (stats, "listeners", "0"); if (mount->max_listeners < 0) - stats_event (mount->mountname, "max_listeners", "unlimited"); + stats_set (stats, "max_listeners", "unlimited"); else - stats_event_args (mount->mountname, "max_listeners", "%d", mount->max_listeners); + stats_set_args (stats, "max_listeners", "%d", mount->max_listeners); + stats_release (stats); } } else - stats_event (mount->mountname, NULL, NULL); + { + // only delete these stats if no client is maintaining them + source = source_find_mount_raw (mount->mountname); + if (source == NULL) + stats_event (mount->mountname, NULL, NULL); + } mount = mount->next; } @@ -1717,7 +1779,6 @@ static int source_listener_release (source_t *source, client_t *client) rate_reduce (source->format->out_bitrate, 1000); stats_event_dec (NULL, "listeners"); - stats_event_args (source->mount, "listeners", "%lu", source->listeners); /* change of listener numbers, so reduce scope of global sampling */ global_reduce_bitrate_sampling (global.out_bitrate); @@ -1740,7 +1801,7 @@ static int source_listener_release (source_t *source, client_t *client) int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client) { - int loop = 10, bitrate = 0, do_process = 0; + int loop = 10, rate = 0, do_process = 0; int within_limits; source_t *source; mount_proxy *minfo = mountinfo; @@ -1769,16 +1830,19 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl } avl_tree_unlock (global.source_tree); if (minfo && minfo->limit_rate) - bitrate = minfo->limit_rate; + rate = minfo->limit_rate; if (minfo == NULL || minfo->fallback_mount == NULL) { - if (bitrate) + if (rate == 0) + if (sscanf (mount, "%*[^[][%d]", &rate) == 1) + rate = rate * 1000; + if (rate) { fbinfo f; f.flags = FS_FALLBACK; f.mount = (char *)mount; f.fallback = NULL; - f.limit = bitrate/8; + f.limit = rate / 8; if (move_listener (client, &f) == 0) { /* source dead but fallback to file found */ @@ -1910,6 +1974,8 @@ void source_setup_listener (source_t *source, client_t *client) client->shared_data = source; client->queue_pos = 0; client->mount = source->mount; + client->flags &= ~CLIENT_IN_FSERVE; + client->timer_start = client->worker->current_time.tv_sec; client->check_buffer = http_source_listener; // add client to the source diff --git a/src/source.h b/src/source.h index 98d52acc..1f8399ba 100644 --- a/src/source.h +++ b/src/source.h @@ -70,6 +70,7 @@ typedef struct source_tag unsigned long bytes_sent_since_update; unsigned long bytes_read_since_update; int stats_interval; + long stats; time_t last_read; @@ -86,6 +87,7 @@ typedef struct source_tag #define SOURCE_PAUSE_LISTENERS (1<<3) #define SOURCE_TERMINATING (1<<4) #define SOURCE_LISTENERS_SYNC (1<<5) +#define SOURCE_TIMEOUT (1<<6) #define source_available(x) (((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) && ((x)->flags & SOURCE_LISTENERS_SYNC) == 0) #define source_running(x) ((x)->flags & SOURCE_RUNNING) @@ -109,7 +111,7 @@ void source_setup_listener (source_t *source, client_t *client); void source_init (source_t *source); void source_shutdown (source_t *source, int with_fallback); void source_set_fallback (source_t *source, const char *dest_mount); -void source_set_override (const char *mount, const char *dest); +int source_set_override (const char *mount, const char *dest); #define SOURCE_BLOCK_SYNC 01 #define SOURCE_BLOCK_RELEASE 02 diff --git a/src/stats.c b/src/stats.c index 6d007e82..53ced550 100644 --- a/src/stats.c +++ b/src/stats.c @@ -21,6 +21,7 @@ #include #include +#include #include #include "thread/thread.h" @@ -596,7 +597,12 @@ static void process_source_event (stats_event_t *event) } if (event->action == STATS_EVENT_REMOVE && event->name == NULL) { - avl_delete(_stats.source_tree, (void *)snode, _free_source_stats); + int fallback_stream = 0; + avl_tree_wlock (snode->stats_tree); + fallback_stream = _find_node (snode->stats_tree, "fallback") == NULL ? 1 : 0; + avl_tree_unlock (snode->stats_tree); + if (fallback_stream) + avl_delete(_stats.source_tree, (void *)snode, _free_source_stats); avl_tree_unlock (_stats.source_tree); return; } @@ -645,7 +651,7 @@ static int stats_listeners_send (client_t *client) if (refbuf == NULL) { - client->schedule_ms = client->worker->time_ms + 100; + client->schedule_ms = client->worker->time_ms + 60; break; } if (loop == 0 || total > 32768) @@ -667,14 +673,14 @@ static int stats_listeners_send (client_t *client) if (listener->content_len) WARN1 ("content length is %u", listener->content_len); listener->recent_block = NULL; - client->schedule_ms = client->worker->time_ms + 100; + client->schedule_ms = client->worker->time_ms + 50; break; } loop--; } else { - client->schedule_ms = client->worker->time_ms + 250; + client->schedule_ms = client->worker->time_ms + 200; break; /* short write, so stop for now */ } } @@ -712,7 +718,11 @@ static void stats_listener_send (int mask, const char *fmt, ...) while (listener) { - if (listener->mask & mask) + int admuser = listener->mask & STATS_HIDDEN, + hidden = mask & STATS_HIDDEN, + flags = mask & ~STATS_HIDDEN; + + if (admuser || (hidden == 0 && (flags & listener->mask))) _add_stats_to_stats_client (listener->client, fmt, ap); listener = listener->next; } @@ -987,7 +997,6 @@ static void stats_client_release (client_t *client) { event_listener_t *listener, **trail; - INFO0 ("removing stats client"); thread_mutex_lock (&_stats.listeners_lock); listener = _stats.event_listeners; trail = &_stats.event_listeners; @@ -1034,7 +1043,7 @@ void stats_add_listener (client_t *client, int mask) client_set_queue (client, NULL); client->refbuf = refbuf_new (100); snprintf (client->refbuf->data, 100, - "HTTP/1.0 200 OK\r\nCapability: streamlist\r\n\r\n"); + "HTTP/1.0 200 OK\r\nCapability: streamlist stats\r\n\r\n"); client->refbuf->len = strlen (client->refbuf->data); listener->content_len = client->refbuf->len; listener->recent_block = client->refbuf; @@ -1263,6 +1272,8 @@ long stats_handle (const char *mount) { stats_source_t *src_stats; + if (mount == NULL) + return 0; avl_tree_wlock (_stats.source_tree); src_stats = _find_source(_stats.source_tree, mount); if (src_stats == NULL) @@ -1284,11 +1295,14 @@ long stats_handle (const char *mount) } -void stats_lock (long handle) +long stats_lock (long handle, const char *mount) { stats_source_t *src_stats = (stats_source_t *)handle; - if (src_stats) + if (src_stats == NULL) + src_stats = (stats_source_t*)stats_handle (mount); + else avl_tree_wlock (src_stats->stats_tree); + return (long)src_stats; } @@ -1352,29 +1366,88 @@ void stats_set_flags (long handle, const char *name, const char *value, int flag } -void stats_set_conv(long handle, const char *name, const char *value, const char *charset) +static void stats_set_entity_decode (long handle, const char *name, const char *value) { - const char *metadata = value; - xmlBufferPtr conv = xmlBufferCreate (); - - if (charset && value) + xmlParserCtxtPtr parser = xmlNewParserCtxt(); + if (parser) { - xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset); - - if (handle) - { - xmlBufferPtr raw = xmlBufferCreate (); - xmlBufferAdd (raw, (const xmlChar *)value, strlen (value)); - if (xmlCharEncInFunc (handle, conv, raw) > 0) - metadata = (char *)xmlBufferContent (conv); - xmlBufferFree (raw); - xmlCharEncCloseFunc (handle); - } - else - WARN1 ("No charset found for \"%s\"", charset); + xmlChar *decoded = xmlStringDecodeEntities (parser, + (const xmlChar *) value, XML_SUBSTITUTE_BOTH, 0, 0, 0); + stats_set (handle, name, (void*)decoded); + xmlFreeParserCtxt (parser); + xmlFree (decoded); + return; + } + stats_set (handle, name, value); +} + + +void stats_set_conv (long handle, const char *name, const char *value, const char *charset) +{ + if (charset) + { + xmlCharEncodingHandlerPtr encoding = xmlFindCharEncodingHandler (charset); + + if (encoding) + { + xmlBufferPtr in = xmlBufferCreate (); + xmlBufferPtr conv = xmlBufferCreate (); + + xmlBufferCCat (in, value); + if (xmlCharEncInFunc (encoding, conv, in) > 0) + stats_set_entity_decode (handle, name, (void*)xmlBufferContent (conv)); + xmlBufferFree (in); + xmlBufferFree (conv); + xmlCharEncCloseFunc (encoding); + return; + } + WARN1 ("No charset found for \"%s\"", charset); + return; + } + stats_set_entity_decode (handle, name, value); +} + + +void stats_listener_to_xml (client_t *listener, xmlNodePtr parent) +{ + const char *useragent; + char buf[30]; + + xmlNodePtr node = xmlNewChild (parent, NULL, XMLSTR("listener"), NULL); + + snprintf (buf, sizeof (buf), "%lu", listener->connection.id); + xmlSetProp (node, XMLSTR("id"), XMLSTR(buf)); + + xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip)); + + useragent = httpp_getvar (listener->parser, "user-agent"); + if (useragent && xmlCheckUTF8((unsigned char *)useragent)) + { + xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(useragent)); + xmlNewChild (node, NULL, XMLSTR("UserAgent"), str); + xmlFree (str); + } + + if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE) + { + source_t *source = listener->shared_data; + snprintf (buf, sizeof (buf), "%"PRIu64, source->client->queue_pos - listener->queue_pos); + } + else + snprintf (buf, sizeof (buf), "0"); + xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf)); + + if (listener->worker) + { + snprintf (buf, sizeof (buf), "%lu", + (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time)); + xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf)); + } + if (listener->username) + { + xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(listener->username)); + xmlNewChild (node, NULL, XMLSTR("username"), str); + xmlFree (str); } - - stats_set (handle, name, metadata); - xmlBufferFree (conv); } diff --git a/src/stats.h b/src/stats.h index 6914d6b3..536c798c 100644 --- a/src/stats.h +++ b/src/stats.h @@ -58,12 +58,14 @@ xmlDocPtr stats_get_xml(int flags, const char *show_mount); char *stats_get_value(const char *source, const char *name); long stats_handle (const char *mount); -void stats_lock (long handle); +long stats_lock (long handle, const char *mount); void stats_release (long handle); void stats_set (long handle, const char *name, const char *value); void stats_set_args (long handle, const char *name, const char *format, ...); void stats_set_flags (long handle, const char *name, const char *value, int flags); void stats_set_conv (long handle, const char *name, const char *value, const char *charset); +void stats_listener_to_xml (client_t *listener, xmlNodePtr parent); + #endif /* __STATS_H__ */ diff --git a/src/util.c b/src/util.c index 2ac9b1ea..d02ff87c 100644 --- a/src/util.c +++ b/src/util.c @@ -17,6 +17,7 @@ #include #include #include +#include #ifndef _WIN32 #include @@ -40,6 +41,7 @@ #include "refbuf.h" #include "connection.h" #include "client.h" +#include "global.h" #define CATMODULE "util" @@ -309,42 +311,39 @@ char *util_url_escape (const char *src) return dst; } + +static int unescape_code (const char *src) +{ + if (hex (src[0]) == -1 || hex (src[1]) == -1) + return -1; + return (hex (src[0]) << 4) + hex (src[1]); +} + + char *util_url_unescape (const char *src) { int len = strlen(src); char *decoded; - int i; + int i, v; char *dst; - int done = 0; decoded = calloc(1, len + 1); dst = decoded; - for(i=0; i < len; i++) { - switch(src[i]) { - case '%': - if (i+2 >= len || hex(src[i+1]) == -1 || hex(src[i+2]) == -1) - { - /* no matching pattern so assume just the % */ - *dst++ = '%'; - } - else - { - *dst++ = hex(src[i+1]) * 16 + hex(src[i+2]); - i+= 2; - } - break; - case 0: - ERROR0("Fatal internal logic error in util_url_unescape()"); - free(decoded); - return NULL; - default: - *dst++ = src[i]; - break; + for(i=0; i < len; i++) + { + if (src[i] == '%' && i+2 < len) + { + v = unescape_code (src + i +1); + if (v >= 0 && isprint(v)) + { + *dst++ = (char)v; + i += 2; + continue; + } } - if(done) - break; + *dst++ = src[i]; } *dst = 0; /* null terminator */ @@ -352,6 +351,7 @@ char *util_url_unescape (const char *src) return decoded; } + /* Get an absolute path (from the webroot dir) from a URI. Return NULL if the * path contains 'disallowed' sequences like foo/../ (which could be used to * escape from the webroot) or if it cannot be URI-decoded. @@ -637,7 +637,7 @@ char *util_dict_urlencode(util_dict *dict, char delim) return res; } -#ifndef HAVE_LOCALTIME_R +#ifndef HAVE_DECL_LOCALTIME_R struct tm *localtime_r (const time_t *timep, struct tm *result) { static mutex_t localtime_lock; diff --git a/src/util.h b/src/util.h index a1da029c..df9cd456 100644 --- a/src/util.h +++ b/src/util.h @@ -52,7 +52,7 @@ int util_dict_set(util_dict *dict, const char *key, const char *val); const char *util_dict_get(util_dict *dict, const char *key); char *util_dict_urlencode(util_dict *dict, char delim); -#ifndef HAVE_LOCALTIME_R +#ifndef HAVE_DECL_LOCALTIME_R struct tm *localtime_r (const time_t *timep, struct tm *result); #endif char *util_conv_string (const char *string, const char *in_charset, const char *out_charset); diff --git a/src/xslt.c b/src/xslt.c index 840b3781..b19474d4 100644 --- a/src/xslt.c +++ b/src/xslt.c @@ -46,6 +46,7 @@ #include "client.h" #include "stats.h" #include "fserve.h" +#include "util.h" #define CATMODULE "xslt" @@ -280,9 +281,12 @@ int xslt_transform (xmlDocPtr doc, const char *xslfilename, client_t *client) for (i = 0; node && i < arg_count; node = avl_get_next (node)) { http_var_t *param = (http_var_t *)node->key; + char *tmp = util_url_escape (param->value); params[i++] = param->name; - params[i] = (char*)alloca (strlen (param->value) +3); - sprintf (params[i++], "\'%s\'", param->value); + // use alloca for now, should really url esc into a supplied buffer + params[i] = (char*)alloca (strlen (tmp) + 3); + sprintf (params[i++], "\'%s\'", tmp); + free (tmp); } params[i] = NULL; } diff --git a/win32/Makefile.am b/win32/Makefile.am index bee287e0..aa76d60a 100644 --- a/win32/Makefile.am +++ b/win32/Makefile.am @@ -14,3 +14,12 @@ EXTRA_DIST = ConfigTab.cpp ConfigTab.h Icecast2win.clw Icecast2win.cpp \ icecast2_console.dsw icecast2_console.dsp credits.bmp icecast2title.bmp \ icecastService.cpp icecastService.dsp +bin_PROGRAMS = icecast +icecast_DEPENDENCIES = ../src/libicecast.a ../src/net/libicenet.la \ + ../src/thread/libicethread.la ../src/httpp/libicehttpp.la ../src/log/libicelog.la \ + ../src/avl/libiceavl.la ../src/timing/libicetiming.la + +icecast_SOURCES = icecastService.cpp +icecast_CPPFLAGS = -I../src @XIPH_CFLAGS@ @XIPH_CPPFLAGS@ +icecast_LDADD = $(icecast_DEPENDENCIES) @XIPH_LDFLAGS@ @XIPH_LIBS@ @KATE_LIBS@ + diff --git a/win32/icecastService.cpp b/win32/icecastService.cpp index af52b752..0294ff32 100644 --- a/win32/icecastService.cpp +++ b/win32/icecastService.cpp @@ -1,7 +1,4 @@ -#define _WIN32_WINNT 0x0400 -#include #include -#include extern "C" { #include @@ -27,7 +24,6 @@ SERVICE_STATUS_HANDLE hStatus; void ServiceMain(int argc, char** argv); void ControlHandler(DWORD request); -extern "C" int mainService(int argc, char **argv); void installService (const char *path) @@ -112,10 +108,26 @@ void ControlHandler(DWORD request) SetServiceStatus (hStatus, &ServiceStatus); } + +static int run_server (int argc, char *argv[]) +{ + int ret; + + initialize_subsystems(); + + ret = server_init (argc, argv); + if (ret == 0) + server_process(); + + shutdown_subsystems(); + return ret; +} + + void ServiceMain(int argc, char** argv) { ServiceStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; - ServiceStatus.dwWin32ExitCode = 0; + ServiceStatus.dwWin32ExitCode = -1; ServiceStatus.dwServiceSpecificExitCode = 0; ServiceStatus.dwCheckPoint = 0; ServiceStatus.dwWaitHint = 0; @@ -123,7 +135,7 @@ void ServiceMain(int argc, char** argv) hStatus = RegisterServiceCtrlHandler(PACKAGE_STRING, (LPHANDLER_FUNCTION)ControlHandler); if (hStatus == (SERVICE_STATUS_HANDLE)0) { // Registering Control Handler failed - MessageBox (NULL, "RegisterServiceCtrlHandler failed", NULL, MB_SERVICE_NOTIFICATION); + MessageBox (NULL, "RegisterServiceCtrlHandler failed", NULL, MB_SERVICE_NOTIFICATION); return; } @@ -144,19 +156,21 @@ void ServiceMain(int argc, char** argv) else argv2 [2] = argv[1]; - ServiceStatus.dwWin32ExitCode = mainService(argc2, (char **)argv2); + ServiceStatus.dwWin32ExitCode = run_server (argc2, argv2); ServiceStatus.dwCurrentState = SERVICE_STOPPED; SetServiceStatus(hStatus, &ServiceStatus); } -void main(int argc, char *argv[]) +int main (int argc, char *argv[]) { if (argc < 2) { - printf ("Usage: icecastservice [remove] | [install ]\n"); - return; + printf (PACKAGE_STRING "\n\n" + "Usage: icecastService [remove] | [install ]\n" + " icecastService -c icecast.xml\n\n"); + return -1; } if (!strcmp(argv[1], "install")) { @@ -165,29 +179,33 @@ void main(int argc, char *argv[]) else printf ("install requires a path arg as well\n"); Sleep (1000); - return; + return 0; } if (!strcmp(argv[1], "remove") || !strcmp(argv[1], "uninstall")) { removeService(); - return; + return 0; } - if (_chdir(argv[1]) < 0) - { - char buffer [256]; - _snprintf (buffer, sizeof(buffer), "Unable to change to directory %s", argv[1]); - MessageBox (NULL, buffer, NULL, MB_SERVICE_NOTIFICATION); - return; - } + if (strcmp (argv[1], "-c") == 0) + return run_server (argc, argv); + + if (_chdir(argv[1]) < 0) + { + char buffer [256]; + snprintf (buffer, sizeof(buffer), "Unable to change to directory %s", argv[1]); + MessageBox (NULL, buffer, NULL, MB_SERVICE_NOTIFICATION); + return -1; + } SERVICE_TABLE_ENTRY ServiceTable[2]; - ServiceTable[0].lpServiceName = PACKAGE_STRING; + ServiceTable[0].lpServiceName = (char*)PACKAGE_STRING; ServiceTable[0].lpServiceProc = (LPSERVICE_MAIN_FUNCTION)ServiceMain; ServiceTable[1].lpServiceName = NULL; ServiceTable[1].lpServiceProc = NULL; // Start the control dispatcher thread for our service if (StartServiceCtrlDispatcher (ServiceTable) == 0) - MessageBox (NULL, "StartServiceCtrlDispatcher failed", NULL, MB_SERVICE_NOTIFICATION); + MessageBox (NULL, "StartServiceCtrlDispatcher failed", NULL, MB_SERVICE_NOTIFICATION); + return 0; }