1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-06-16 06:15:24 +00:00

bump to kh31

The main changes are listed below but there is a lot of noise from behind the scenes
work like quicker stats updates, code re-arranging and infrequent used code like the
allocation code which is not usually compiled in.

Implement scatter-gather IO, initially for flv wrapping as that will make a lot of
use of it and should save a lot of memory copies. The icy metadata should also be a
candidate for this.

Add better handling to relays with multiple servers and fallbacks. Problem entries
can be skipped when restarting. 

Fixup build system for mingw32 cross compile. VC was getting too annoying but should
still be an option if needed later.  This now allows us to build with newer dependent
libs without much extra work.   The GUI component is not built now (VC specific) but
it was of limited use and most win32 users configure the service.



svn path=/icecast/branches/kh/icecast/; revision=18147
This commit is contained in:
Karl Heyes 2011-12-24 02:35:54 +00:00
parent deba1eba00
commit 1f3c6c2d87
52 changed files with 1448 additions and 679 deletions

View File

@ -3,7 +3,10 @@
AUTOMAKE_OPTIONS = foreign
ACLOCAL_AMFLAGS = -I m4
SUBDIRS = src conf doc web admin win32
SUBDIRS = conf doc web admin src
if WIN32
SUBDIRS += win32
endif
EXTRA_DIST = HACKING config.h.vc6 m4/acx_pthread.m4 m4/ogg.m4 \
m4/theora.m4 m4/vorbis.m4 m4/speex.m4\
@ -19,5 +22,7 @@ debug:
profile:
$(MAKE) all CFLAGS="@PROFILE@"
staticdebug:
$(MAKE) all CFLAGS="-g -DLIBXML_STATIC -DCURL_STATICLIB" LDFLAGS="${LDFLAGS} -all-static"
static:
$(MAKE) all LDFLAGS="${LDFLAGS} -all-static"
$(MAKE) all CFLAGS="-O2 -DLIBXML_STATIC -DCURL_STATICLIB" LDFLAGS="${LDFLAGS} -all-static"

34
NEWS
View File

@ -16,6 +16,40 @@ Feature differences from SVN trunk
any extra tags are show in the conf/icecast.xml.dist file
2.3.2-kh31
. Add generic scattered IO routines, listeners wanting FLV wrapping now use this which
saves a lot of memory copying and therefore load.
. build setup for mingw32. This should help with library updates for us, drop GUI build
. relay restarting fixes included for certain error cases. relays are treaated as
failed if they stop within 60 seconds and skip on to next server if specified.
. a better trigger for on-demand relay if fallback has listeners.
. on reload when changeowner used, allow for listening sockets to be reopend but
prevent closing priviledged ports unless missing from xml
. prevent unescape routine from creating non-printable chars
. ignore per-mount username when source is using shoutcast protocol.
. URL auth can now accept a "ice-username: ..." in the response headers for setting the
username. For setups that use query args for auth
. expand args applied to xsl requests, url encoded now.
. modify k and m in bitrates to be on 1000 not 1024
. accesslog modification to parsing, optionally set <type> in <accesslog> to CLF-ESC
to escape-encode strings instead of using "".
. if falling back to file, the bitrate used takes, in order of priority, incoming
bitrate, limit-rate and then [value] in filename eg /fallback-[128]
. fix a large ogg page bug, flac is probably the only one showing this up currently
. fix a couple of rare crash cases with file serving clients.
. prevent intro file sending when override triggered.
. make access log report query args on the request, but truncate the strings if too long.
. minor change to send limiter
. move incoming rate limiter to source client specific code.
. Add server-wide redirect tags.
. use per-source stats handles for updates, reduces lookups and avoids taking a
shared lock.
. minor log message changes.
. fix fallback trigger if source timeout occurs
. fix possible sources count inconsistency with failed relays
. entity expansion in stats conversion routines added
. fixup debug allocation code. not normally used.
2.3.2-kh30
. stats updates.
Split stats lock into global and source stats locks, reduces contention.

75
admin/viewxml.xsl Normal file
View File

@ -0,0 +1,75 @@
<?xml version="1.0" encoding="ISO-8859-1"?>
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
<xsl:output method="xml" version="1.0" encoding="iso-8859-1" indent="yes"/>
<xsl:template match = "/icestats" >
<xsl:for-each select="source">
<SHOUTCASTSERVER>
<CURRENTLISTENERS><xsl:value-of select="listeners" /></CURRENTLISTENERS>
<PEAKLISTENERS><xsl:value-of select="listener_peak" /></PEAKLISTENERS>
<MAXLISTENERS><xsl:value-of select="max_listeners" /></MAXLISTENERS>
<REPORTEDLISTENERS>NA</REPORTEDLISTENERS>
<AVERAGETIME>NA</AVERAGETIME>
<SERVERGENRE><xsl:value-of select="genre" /></SERVERGENRE>
<SERVERURL><xsl:value-of select="server_url" /></SERVERURL>
<SERVERTITLE><xsl:value-of select="server_name" /></SERVERTITLE>
<SONGTITLE><xsl:if test="artist"><xsl:value-of select="artist" /> - </xsl:if><xsl:value-of select="title" /></SONGTITLE>
<SONGURL><xsl:value-of select="listenurl" /></SONGURL>
<IRC>NA</IRC>
<ICQ>NA</ICQ>
<AIM>NA</AIM>
<WEBHITS>NA</WEBHITS>
<STREAMHITS>NA</STREAMHITS>
<STREAMSTATUS>NA</STREAMSTATUS>
<BITRATE><xsl:value-of select="bitrate" /></BITRATE>
<CONTENT><xsl:value-of select="server_type" /></CONTENT>
<VERSION><xsl:value-of select="server_id" /></VERSION>
<WEBDATA>
<INDEX>NA</INDEX>
<LISTEN>NA</LISTEN>
<PALM7>NA</PALM7>
<LOGIN>NA</LOGIN>
<LOGINFAIL>NA</LOGINFAIL>
<PLAYED>NA</PLAYED>
<COOKIE>NA</COOKIE>
<ADMIN>NA</ADMIN>
<UPDINFO>NA</UPDINFO>
<KICKSRC>NA</KICKSRC>
<KICKDST>NA</KICKDST>
<UNBANDST>NA</UNBANDST>
<BANDST>NA</BANDST>
<VIEWBAN>NA</VIEWBAN>
<UNRIPDST>NA</UNRIPDST>
<RIPDST>NA</RIPDST>
<VIEWRIP>NA</VIEWRIP>
<VIEWXML>NA</VIEWXML>
<VIEWLOG>NA</VIEWLOG>
<INVALID>NA</INVALID>
</WEBDATA>
<LISTENERS>
<xsl:for-each select="listener">
<LISTENER>
<HOSTNAME><xsl:value-of select="IP" /><xsl:if test="username"> (<xsl:value-of select="username" />)</xsl:if></HOSTNAME>
<USERAGENT><xsl:value-of select="UserAgent" /></USERAGENT>
<UNDERRUNS>NA</UNDERRUNS>
<CONNECTTIME><xsl:value-of select="Connected" /></CONNECTTIME>
<POINTER>NA</POINTER>
<UID>NA</UID>
</LISTENER>
</xsl:for-each>
</LISTENERS>
<SONGHISTORY>
<SONG>
<PLAYEDAT>1259797160</PLAYEDAT>
<TITLE>Little Texas - She&#x27;s Got Her Daddy&#x27;s Money</TITLE>
</SONG>
</SONGHISTORY>
</SHOUTCASTSERVER>
</xsl:for-each>
</xsl:template>
</xsl:stylesheet>

View File

@ -1,4 +1,4 @@
AC_INIT([Icecast], [2.3.2-kh30], [karl@xiph.org])
AC_INIT([Icecast], [2.3.2-kh31], [karl@xiph.org])
LT_INIT
AC_PREREQ(2.59)
@ -9,6 +9,7 @@ AM_INIT_AUTOMAKE
AM_CONFIG_HEADER(config.h)
AM_MAINTAINER_MODE
AC_PROG_CXX
AC_PROG_CC
AC_CANONICAL_HOST
AC_PROG_LIBTOOL
@ -25,6 +26,7 @@ else
PROFILE="-pg -g"
AC_DEFINE([_GNU_SOURCE], 1, [Define to include GNU extensions to POSIX])
fi
AM_CONDITIONAL([WIN32], [test x$host_os = xmingw32])
dnl Checks for programs.
@ -34,9 +36,8 @@ dnl Checks for header files.
AC_HEADER_STDC
AC_HEADER_TIME
AC_CHECK_HEADERS([signal.h fnmatch.h limits.h sys/timeb.h])
AC_CHECK_HEADERS([signal.h fnmatch.h limits.h sys/timeb.h malloc.h])
AC_CHECK_HEADERS(pwd.h, AC_DEFINE(CHUID, 1, [Define if you have pwd.h]),,)
AC_CHECK_HEADERS(unistd.h, AC_DEFINE(CHROOT, 1, [Define if you have unistd.h]),,)
dnl Checks for typedefs, structures, and compiler characteristics.
XIPH_C__FUNC__
@ -46,10 +47,19 @@ AC_TYPE_OFF_T
AC_CHECK_TYPES([struct timespec])
dnl Checks for library functions.
AC_CHECK_FUNCS([localtime_r poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync])
AC_CHECK_DECLS([localtime_r],,,
[#include <time.h>
#include <pthread.h>])
AC_CHECK_FUNCS([fnmatch chroot fork poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync])
AC_CHECK_TYPES([struct signalfd_siginfo],
[AC_DEFINE(HAVE_SIGNALFD, 1 ,[Define if signalfd exists])], [],
[#include <sys/signalfd.h>])
if test "x$ac_cv_func_fnmatch" != "xyes"
then
AC_CHECK_LIB(fnmatch, fnmatch, [XIPH_VAR_APPEND([XIPH_LIBS],["-lfnmatch"])],
[ AC_CHECK_LIB(iberty, fnmatch, [XIPH_VAR_APPEND([XIPH_LIBS],["-liberty"])])
])
fi
AC_SEARCH_LIBS(nanosleep, rt posix4,
AC_DEFINE(HAVE_NANOSLEEP, 1, [Define if you have nanosleep]))
AC_SEARCH_LIBS(clock_gettime, rt posix4,
@ -140,8 +150,10 @@ XIPH_PATH_OPENSSL([
[ AC_MSG_NOTICE([SSL disabled!])
])
ICECAST_OPTIONAL="$ICECAST_OPTIONAL auth_cmd.o"
AC_DEFINE([MIMETYPESFILE],"/etc/mime.types", [Default location of mime types file])
if test "x$ac_cv_func_fork" = "xyes"
then
ICECAST_OPTIONAL="$ICECAST_OPTIONAL auth_cmd.o"
fi
AC_DEFINE([ICECAST_TIME_FMT],["%a, %d %b %Y %H:%M:%S %z"], [time format for strftime])
dnl Make substitutions

View File

@ -4,7 +4,11 @@ AUTOMAKE_OPTIONS = foreign
SUBDIRS = avl thread httpp net log timing
bin_PROGRAMS = icecast
if WIN32
noinst_LIBRARIES = libicecast.a
else
bin_PROGRAMS = icecast
endif
noinst_HEADERS = admin.h cfgfile.h logging.h sighandler.h connection.h \
global.h util.h slave.h source.h stats.h refbuf.h client.h \
@ -27,6 +31,10 @@ icecast_DEPENDENCIES = @ICECAST_OPTIONAL@ net/libicenet.la thread/libicethread.l
httpp/libicehttpp.la log/libicelog.la avl/libiceavl.la timing/libicetiming.la
icecast_LDADD = $(icecast_DEPENDENCIES) @XIPH_LIBS@ @KATE_LIBS@
libicecast_a_SOURCES = $(icecast_SOURCES)
libicecast_a_DEPENDENCIES = $(icecast_DEPENDENCIES)
libicecast_a_LIBADD = $(icecast_DEPENDENCIES)
AM_CFLAGS = @XIPH_CFLAGS@
AM_CPPFLAGS = @XIPH_CPPFLAGS@
AM_LDFLAGS = @XIPH_LDFLAGS@ @KATE_LIBS@

View File

@ -61,6 +61,9 @@ static int command_updatemetadata(client_t *client, source_t *source, int respon
static int command_admin_function (client_t *client, int response);
static int command_list_log (client_t *client, int response);
static int command_manage_relay (client_t *client, int response);
#ifdef MY_ALLOC
static int command_alloc(client_t *client);
#endif
static int admin_handle_general_request(client_t *client, const char *command);
@ -84,6 +87,9 @@ static struct admin_command admin_general[] =
{ "manageauth", RAW, { command_manageauth } },
{ "listmounts", RAW, { command_list_mounts } },
{ "function", RAW, { command_admin_function } },
#ifdef MY_ALLOC
{ "alloc", RAW, { command_alloc } },
#endif
{ "streamlist.txt", TEXT, { command_list_mounts } },
{ "streams", TEXT, { command_list_mounts } },
{ "showlog.txt", TEXT, { command_list_log } },
@ -592,50 +598,6 @@ static int command_manage_relay (client_t *client, int response)
}
static void add_listener_node (xmlNodePtr srcnode, client_t *listener)
{
const char *useragent;
char buf[30];
xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL);
snprintf (buf, sizeof (buf), "%lu", listener->connection.id);
xmlSetProp (node, XMLSTR("id"), XMLSTR(buf));
xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip));
useragent = httpp_getvar (listener->parser, "user-agent");
if (useragent && xmlCheckUTF8((unsigned char *)useragent))
{
xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(useragent));
xmlNewChild (node, NULL, XMLSTR("UserAgent"), str);
xmlFree (str);
}
if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE)
{
source_t *source = listener->shared_data;
snprintf (buf, sizeof (buf), "%"PRIu64, source->client->queue_pos - listener->queue_pos);
}
else
snprintf (buf, sizeof (buf), "0");
xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf));
if (listener->worker)
{
snprintf (buf, sizeof (buf), "%lu",
(unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time));
xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf));
}
if (listener->username)
{
xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(listener->username));
xmlNewChild (node, NULL, XMLSTR("username"), str);
xmlFree (str);
}
}
/* populate within srcnode, groups of 0 or more listener tags detailing
* information about each listener connected on the provide source.
*/
@ -649,7 +611,7 @@ void admin_source_listeners (source_t *source, xmlNodePtr srcnode)
while (node)
{
client_t *listener = (client_t *)node->key;
add_listener_node (srcnode, listener);
stats_listener_to_xml (listener, srcnode);
node = avl_get_next (node);
}
}
@ -724,7 +686,7 @@ static int command_show_listeners (client_t *client, source_t *source, int respo
client_t *listener = source_find_client (source, id);
if (listener)
add_listener_node (srcnode, listener);
stats_listener_to_xml (listener, srcnode);
}
thread_mutex_unlock (&source->lock);
@ -1242,3 +1204,33 @@ static int command_updatemetadata(client_t *client, source_t *source, int respon
return admin_send_response (doc, client, response, "updatemetadata.xsl");
}
#ifdef MY_ALLOC
static int command_alloc(client_t *client)
{
xmlDocPtr doc = xmlNewDoc (XMLSTR("1.0"));
xmlNodePtr rootnode = xmlNewDocNode(doc, NULL, XMLSTR("icestats"), NULL);
avl_node *node;
xmlDocSetRootElement(doc, rootnode);
avl_tree_rlock (global.alloc_tree);
node = avl_get_first (global.alloc_tree);
while (node)
{
alloc_node *an = node->key;
char value[25];
xmlNodePtr bnode = xmlNewChild (rootnode, NULL, XMLSTR("block"), NULL);
xmlSetProp (bnode, XMLSTR("name"), XMLSTR(an->name));
snprintf (value, sizeof value, "%d", an->count);
xmlNewChild (bnode, NULL, XMLSTR("count"), XMLSTR(value));
snprintf (value, sizeof value, "%d", an->allocated);
xmlNewChild (bnode, NULL, XMLSTR("allocated"), XMLSTR(value));
node = avl_get_next (node);
}
avl_tree_unlock (global.alloc_tree);
return admin_send_response (doc, client, RAW, "stats.xsl");
}
#endif

View File

@ -22,6 +22,9 @@
#include <string.h>
#include <errno.h>
#include <stdio.h>
#ifdef HAVE_MALLOC_H
#include <malloc.h>
#endif
#include "auth.h"
#include "auth_htpasswd.h"
@ -382,34 +385,52 @@ static void *auth_run_thread (void *arg)
int move_listener (client_t *client, struct _fbinfo *finfo)
{
source_t *source, *prev;
source_t *source;
const char *mount = finfo->mount;
int rate = finfo->limit;
mount_proxy *minfo;
ice_config_t *config = config_get_config();
DEBUG1 ("moving listener to %s", finfo->mount);
avl_tree_rlock (global.source_tree);
source = source_find_mount (finfo->mount);
while (source)
source = source_find_mount_raw (mount);
while (1)
{
minfo = config_find_mount (config, mount);
if (rate == 0 && minfo && minfo->limit_rate)
rate = minfo->limit_rate;
if (source == NULL)
break;
thread_mutex_lock (&source->lock);
if (source_available (source))
{
config_release_config();
avl_tree_unlock (global.source_tree);
source_setup_listener (source, client);
client->flags |= CLIENT_HAS_MOVED;
thread_mutex_unlock (&source->lock);
return 0;
}
prev = source;
if (prev->fallback.mount)
source = source_find_mount (prev->fallback.mount);
else
source = NULL;
thread_mutex_unlock (&prev->lock);
mount = minfo ? minfo->fallback_mount : NULL;
thread_mutex_unlock (&source->lock);
}
config_release_config();
avl_tree_unlock (global.source_tree);
if (client->flags & CLIENT_IS_SLAVE)
return -1;
DEBUG1 ("no source, trying %s as a file", finfo->mount);
if (finfo->flags & FS_OVERRIDE)
{
finfo->mount = finfo->fallback;
finfo->fallback = NULL;
finfo->flags &= ~FS_OVERRIDE;
}
if (finfo->limit == 0)
{
if (rate == 0)
if (sscanf (finfo->mount, "%*[^[][%d]", &rate) == 1)
rate = rate * 1000/8;
finfo->limit = rate;
}
return fserve_setup_client_fb (client, finfo);
}
@ -836,7 +857,7 @@ int auth_check_source (client_t *client, const char *mount)
ret = -1;
if (mountinfo->password)
pass = mountinfo->password;
if (mountinfo->username)
if (mountinfo->username && client->server_conn->shoutcast_compat == 0)
user = mountinfo->username;
}
if (connection_check_pass (client->parser, user, pass) > 0)
@ -858,6 +879,8 @@ void auth_initialise (void)
void auth_shutdown (void)
{
if (allow_auth == 0)
return;
allow_auth = 0;
thread_rwlock_wlock (&auth_lock);
thread_rwlock_unlock (&auth_lock);

View File

@ -13,9 +13,6 @@
#ifndef __AUTH_H__
#define __AUTH_H__
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
struct source_tag;
struct auth_tag;

View File

@ -44,6 +44,7 @@
#include "client.h"
#include "cfgfile.h"
#include "httpp/httpp.h"
#include "global.h"
#include "logging.h"
#define CATMODULE "auth_cmd"

View File

@ -13,10 +13,6 @@
#ifndef __AUTH_CMD_H__
#define __AUTH_CMD_H__
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
int auth_get_cmd_auth (auth_t *, config_options_t *options);
#endif

View File

@ -32,6 +32,7 @@
#include "cfgfile.h"
#include "httpp/httpp.h"
#include "md5.h"
#include "global.h"
#include "logging.h"
#define CATMODULE "auth_htpasswd"

View File

@ -13,9 +13,6 @@
#ifndef __AUTH_HTPASSWD_H__
#define __AUTH_HTPASSWD_H__
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
int auth_get_htpasswd_auth (auth_t *auth, config_options_t *options);

View File

@ -86,6 +86,7 @@
#include "cfgfile.h"
#include "httpp/httpp.h"
#include "mpeg.h"
#include "global.h"
#include "logging.h"
#define CATMODULE "auth_url"
@ -218,6 +219,17 @@ static int handle_returned_header (void *ptr, size_t size, size_t nmemb, void *s
if (eol)
*eol = '\0';
}
if (strncasecmp (ptr, "ice-username: ", 14) == 0)
{
int len = strcspn ((char*)ptr+14, "\r\n");
char *name = malloc (len+1);
if (name)
{
snprintf (name, len+1, "%s", (char *)ptr+14);
free (client->username);
client->username = name;
}
}
if (strncasecmp (ptr, "Location: ", 10) == 0)
{
int len = strcspn ((char*)ptr+10, "\r\n");

View File

@ -13,9 +13,6 @@
#ifndef __AUTH_URL_H__
#define __AUTH_URL_H__
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
int auth_get_url_auth (auth_t *authenticator, config_options_t *options);

View File

@ -62,11 +62,13 @@
#define CONFIG_DEFAULT_LOG_DIR "/usr/local/icecast/logs"
#define CONFIG_DEFAULT_WEBROOT_DIR "/usr/local/icecast/webroot"
#define CONFIG_DEFAULT_ADMINROOT_DIR "/usr/local/icecast/admin"
#define MIMETYPESFILE "/etc/mime.types"
#else
#define CONFIG_DEFAULT_BASE_DIR ".\\"
#define CONFIG_DEFAULT_LOG_DIR ".\\logs"
#define CONFIG_DEFAULT_WEBROOT_DIR ".\\webroot"
#define CONFIG_DEFAULT_ADMINROOT_DIR ".\\admin"
#define MIMETYPESFILE ".\\mime.types"
#endif
static ice_config_t _current_configuration;
@ -157,9 +159,9 @@ int config_get_bitrate (xmlNodePtr node, void *x)
return 1;
sscanf ((char*)str, "%"SCNd64 "%c", p, &metric);
if (metric == 'k' || metric == 'K')
(*p) *= 1024;
(*p) *= 1000;
if (metric == 'm' || metric == 'M')
(*p) *= 1024*1024;
(*p) *= 10000000;
xmlFree (str);
}
return 0;
@ -228,6 +230,19 @@ void config_init_configuration(ice_config_t *configuration)
}
redirect_host *config_clear_redirect (redirect_host *redir)
{
redirect_host *next = NULL;
if (redir)
{
next = redir->next;
xmlFree (redir->server);
free (redir);
}
return next;
}
relay_server *config_clear_relay (relay_server *relay)
{
relay_server *next = relay->next;
@ -353,6 +368,9 @@ void config_clear(ice_config_t *c)
while (c->relay)
c->relay = config_clear_relay (c->relay);
while (c->redirect_hosts)
c->redirect_hosts = config_clear_redirect (c->redirect_hosts);
while (c->mounts)
{
mount_proxy *to_go = c->mounts;
@ -602,11 +620,13 @@ static int _parse_security (xmlNodePtr node, void *arg)
static int _parse_accesslog (xmlNodePtr node, void *arg)
{
access_log *log = arg;
struct access_log *log = arg;
char *type = NULL;
struct cfg_tag icecast_tags[] =
{
{ "name", config_get_str, &log->name },
{ "ip", config_get_bool, &log->log_ip },
{ "type", config_get_str, &type },
{ "archive", config_get_bool, &log->archive },
{ "exclude_ext", config_get_str, &log->exclude_ext },
{ "display", config_get_int, &log->display },
@ -615,9 +635,16 @@ static int _parse_accesslog (xmlNodePtr node, void *arg)
};
log->logid = -1;
return parse_xml_tags (node, icecast_tags);
log->type = LOG_ACCESS_CLF;
if (parse_xml_tags (node, icecast_tags))
return 2;
if (type && strcmp (type, "CLF-ESC") == 0)
log->type = LOG_ACCESS_CLF_ESC;
xmlFree (type);
return 0;
}
static int _parse_errorlog (xmlNodePtr node, void *arg)
{
error_log *log = arg;
@ -659,7 +686,6 @@ static int _parse_logging (xmlNodePtr node, void *arg)
struct cfg_tag icecast_tags[] =
{
{ "accesslog", _parse_accesslog, &config->access_log },
{ "errorlog", _parse_errorlog, &config->error_log },
{ "playlistlog", _parse_playlistlog, &config->playlist_log },
{ "accesslog", config_get_str, &config->access_log.name },
{ "accesslog_ip", config_get_bool, &config->access_log.log_ip },
@ -667,7 +693,8 @@ static int _parse_logging (xmlNodePtr node, void *arg)
config_get_str, &config->access_log.exclude_ext },
{ "accesslog_lines",
config_get_int, &config->access_log.display },
{ "errorlog", config_get_str, &config->error_log },
{ "errorlog", _parse_errorlog, &config->error_log },
{ "errorlog", config_get_str, &config->error_log.name },
{ "errorlog_lines", config_get_int, &config->error_log.display },
{ "loglevel", config_get_int, &config->error_log.level },
{ "playlistlog", config_get_str, &config->playlist_log },
@ -678,6 +705,7 @@ static int _parse_logging (xmlNodePtr node, void *arg)
{ NULL, NULL, NULL }
};
config->access_log.type = LOG_ACCESS_CLF;
config->access_log.logid = -1;
config->access_log.display = 100;
config->access_log.archive = -1;
@ -830,8 +858,10 @@ static int _parse_mount (xmlNodePtr node, void *arg)
mount->url_ogg_meta = 1;
mount->source_timeout = config->source_timeout;
mount->file_seekable = 1;
mount->access_log.type = LOG_ACCESS_CLF;
mount->access_log.logid = -1;
mount->access_log.log_ip = 1;
mount->fallback_override = 1;
if (parse_xml_tags (node, icecast_tags))
return -1;
@ -963,6 +993,36 @@ static int _parse_relay (xmlNodePtr node, void *arg)
return 0;
}
static int _parse_redirect (xmlNodePtr node, void *arg)
{
ice_config_t *config = arg;
redirect_host *redir = calloc (1, sizeof (*redir));
struct cfg_tag icecast_tags[] =
{
{ "host", config_get_str, &redir->server },
{ "port", config_get_int, &redir->port },
{ NULL, NULL, NULL },
};
do
{
redir->port = 8000;
if (parse_xml_tags (node, icecast_tags))
break;
if (redir->server == NULL || redir->port < 1 || redir->port > 65536)
break;
redir->next = config->redirect_hosts;
config->redirect_hosts = redir;
return 0;
} while (0);
free (redir);
return -1;
}
static int _parse_limits (xmlNodePtr node, void *arg)
{
ice_config_t *config = arg;
@ -1095,6 +1155,7 @@ static int _parse_root (xmlNodePtr node, ice_config_t *config)
{ "master-ssl-port", config_get_int, &config->master_ssl_port },
{ "master-redirect", config_get_bool, &config->master_redirect },
{ "max-redirect-slaves",config_get_int, &config->max_redirects },
{ "redirect", _parse_redirect, config },
{ "shoutcast-mount", config_get_str, &config->shoutcast_mount },
{ "listen-socket", _parse_listen_sock, config },
{ "limits", _parse_limits, config },

View File

@ -30,18 +30,32 @@ typedef struct _listener_t listener_t;
#include "auth.h"
#include "compat.h"
typedef struct
typedef struct _redirect_host
{
struct _redirect_host *next;
time_t next_update;
char *server;
int port;
} redirect_host;
typedef struct access_log
{
char *name;
int logid;
int log_ip;
int type;
int archive;
int display;
int size;
char *exclude_ext;
} access_log;
typedef struct
#define LOG_ACCESS_CLF 0
#define LOG_ACCESS_CLF_ESC 1
typedef struct error_log
{
char *name;
int logid;
@ -51,7 +65,7 @@ typedef struct
int level;
} error_log;
typedef struct
typedef struct playlist_log
{
char *name;
int logid;
@ -131,7 +145,7 @@ typedef struct _mount_proxy {
unsigned int max_stream_duration;
unsigned int max_listener_duration;
access_log access_log;
struct access_log access_log;
char *redirect;
char *stream_name;
@ -175,13 +189,14 @@ typedef struct _relay_server_master
char *mount;
int port;
int timeout;
int skip;
} relay_server_master;
typedef struct _relay_server
{
struct _relay_server *next, *new_details;
struct source_tag *source;
relay_server_master *masters;
relay_server_master *masters, *in_use;
char *username;
char *password;
char *localmount;
@ -240,8 +255,6 @@ typedef struct ice_config_tag
listener_t *listen_sock;
unsigned int listen_sock_count;
ice_master_details *master;
char *master_server;
int master_server_port;
int master_update_interval;
@ -252,6 +265,7 @@ typedef struct ice_config_tag
int master_ssl_port;
int master_redirect;
int max_redirects;
struct _redirect_host *redirect_hosts;
relay_server *relay;
@ -267,12 +281,12 @@ typedef struct ice_config_tag
char *cert_file;
char *webroot_dir;
char *adminroot_dir;
aliases *aliases;
struct _aliases *aliases;
unsigned slaves_count;
access_log access_log;
error_log error_log;
playlist_log playlist_log;
struct access_log access_log;
struct error_log error_log;
struct playlist_log playlist_log;
int chroot;
int chuid;

View File

@ -36,18 +36,24 @@
#endif
/* Make sure we define 64 bit types */
#ifdef _WIN32
#if defined(_WIN32) || defined(_MINGW32_)
# define PATH_SEPARATOR "\\"
# define size_t unsigned int
# define ssize_t int
# define uint32_t unsigned int
# define fseeko fseek
# include <malloc.h> // for alloca
//# define alloca _alloca
# define SCN_OFF_T "ld"
# define PRI_OFF_T "ld"
#else
# define PATH_SEPARATOR "/"
# if defined(HAVE_INTTYPES_H)
#endif
#if defined(HAVE_INTTYPES_H)
# include <inttypes.h>
# elif defined(HAVE_STDINT_H)
#elif defined(HAVE_STDINT_H)
# include <stdint.h>
# endif
#endif
#endif /* __COMPAT_H__ */

View File

@ -31,11 +31,17 @@
#ifdef _MSC_VER
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#ifdef HAVE_SIGNALFD
#include <sys/signalfd.h>
#include <signal.h>
@ -218,6 +224,7 @@ void connection_initialize(void)
void connection_shutdown(void)
{
connection_listen_sockets_close (NULL, 1);
thread_spin_destroy (&_connection_lock);
}
@ -342,6 +349,126 @@ int connection_send (connection_t *con, const void *buf, size_t len)
return bytes;
}
#ifdef WIN32
#define IO_VECTOR_LEN(x) ((x)->len)
#define IO_VECTOR_BASE(x) ((x)->buf)
#else
#define IO_VECTOR_LEN(x) ((x)->iov_len)
#define IO_VECTOR_BASE(x) ((x)->iov_base)
#endif
void connection_bufs_init (struct connection_bufs *v, short start)
{
memset (v, 0, sizeof (struct connection_bufs));
if (start && start < 500)
{
v->block = calloc (start, sizeof (IOVEC));
v->max = start;
}
}
void connection_bufs_release (struct connection_bufs *v)
{
free (v->block);
memset (v, 0, sizeof (struct connection_bufs));
}
void connection_bufs_flush (struct connection_bufs *v)
{
v->count = 0;
v->total = 0;
}
int connection_bufs_append (struct connection_bufs *v, void *buf, unsigned int len)
{
if (v->count >= v->max)
{
int len = v->max + 16;
IOVEC *arr = realloc (v->block, (len*sizeof(IOVEC)));
v->max = len;
v->block = arr;
}
IO_VECTOR_BASE (v->block + v->count) = buf;
IO_VECTOR_LEN (v->block + v->count) = len;
v->count++;
v->total += len;
return v->total;
}
static int connbufs_locate_start (struct connection_bufs *vects, int skip, IOVEC *old_value, int *offp)
{
int sum = 0, i = vects->count;
IOVEC *p = vects->block;
if (skip < vects->total)
{
for (; i; i--)
{
if (sum + IO_VECTOR_LEN(p) > skip)
{
int offset = skip - sum;
if (offset)
{
*old_value = *p;
IO_VECTOR_BASE(p) += offset;
IO_VECTOR_LEN(p) -= offset;
}
*offp = offset;
return p - vects->block;
}
sum += IO_VECTOR_LEN(p);
p++;
}
}
return -1;
}
int connection_bufs_send (connection_t *con, struct connection_bufs *vectors, int skip)
{
IOVEC *p = vectors->block, old_vals;
int i = vectors->count, offset = 0, ret = -1;
i = connbufs_locate_start (vectors, skip, &old_vals, &offset);
p = vectors->block + i;
if (i >= 0)
{
if (not_ssl_connection (con))
{
ret = sock_writev (con->sock, p, vectors->count - i);
if (ret < 0 && !sock_recoverable (sock_error()))
con->error = 1;
}
#ifdef HAVE_OPENSSL
else
{
IOVEC *io = p;
int bytes = 0;
for (; i < vectors->count; i++, io++)
{
int v = connection_send_ssl (con, IO_VECTOR_BASE(io), IO_VECTOR_LEN(io));
if (v > 0) bytes += v;
if (v < IO_VECTOR_LEN(io)) break;
}
if (bytes > 0) ret = bytes;
}
#endif
if (offset)
*p = old_vals;
if (ret > 0)
con->sent_bytes += ret;
}
return ret;
}
static void add_generic_text (avl_tree *t, const char *ip, time_t now)
{
char *str = strdup (ip);
@ -641,7 +768,7 @@ static sock_t wait_for_serversock (void)
}
#else
fd_set rfds;
struct timeval tv, *p=NULL;
struct timeval tv;
int i, ret;
sock_t max = SOCK_ERROR;
@ -697,7 +824,7 @@ static client_t *accept_client (void)
int i, num;
refbuf_t *r;
if (sock_set_blocking (sock, 0) || sock_set_nodelay (sock))
if (sock_set_blocking (sock, 0)) // || sock_set_nodelay (sock))
{
WARN0 ("failed to set tcp options on client connection, dropping");
break;
@ -885,7 +1012,8 @@ static int http_client_request (client_t *client)
if (agent && avl_get_by_key (useragents.contents, (char *)agent, &result) == 0)
{
INFO1 ("dropping client because useragent is %s", agent);
INFO2 ("dropping client at %s because useragent is %s",
client->connection.ip, agent);
return -1;
}
}
@ -977,8 +1105,7 @@ static void *connection_thread (void *arg)
useragents.filename = strdup (config->agentfile);
get_ssl_certificate (config);
if (config->chuid == 0)
connection_setup_sockets (config);
connection_setup_sockets (config);
header_timeout = config->header_timeout;
config_release_config ();
@ -1020,6 +1147,7 @@ static void *connection_thread (void *arg)
return NULL;
}
void connection_thread_startup ()
{
#ifdef HAVE_SIGNALFD
@ -1034,6 +1162,7 @@ void connection_thread_startup ()
conn_tid = thread_create ("connection", connection_thread, NULL, THREAD_ATTACHED);
}
void connection_thread_shutdown ()
{
if (conn_tid)
@ -1375,41 +1504,71 @@ static int _handle_get_request (client_t *client)
}
/* close any open listening sockets and reopen new listener sockets based on the settings
* in the configuration.
/* close any open listening sockets
*/
void connection_listen_sockets_close (ice_config_t *config, int all_sockets)
{
if (global.serversock)
{
int old = 0, new = 0, cur = global.server_sockets;
for (; old < cur; old++)
{
// close all listening sockets unless privileged ones are to stay open
// and it is still present in the config.
if (config && all_sockets == 0 && global.server_conn [old]->port < 1024)
{
listener_t *listener = config->listen_sock;
while (listener && listener->port != global.server_conn [old]->port)
listener = listener->next;
if (listener)
{
INFO2 ("Leaving port %d (%s) open", listener->port,
listener->bind_address ? listener->bind_address : "");
if (new < old)
{
global.server_conn [new] = global.server_conn [old];
global.serversock [new] = global.serversock [old];
new++;
}
continue;
}
}
INFO1 ("Closing port %d", global.server_conn [old]->port);
sock_close (global.serversock [old]);
global.serversock [old] = SOCK_ERROR;
config_clear_listener (global.server_conn [old]);
global.server_sockets--;
}
if (global.server_sockets == 0)
{
free (global.serversock);
global.serversock = NULL;
free (global.server_conn);
global.server_conn = NULL;
}
}
}
int connection_setup_sockets (ice_config_t *config)
{
int count = 0;
listener_t *listener, **prev;
void *tmp;
global_lock();
/* place sockets away from config, so we don't need to take config lock
* in the accept loop. */
if (global.serversock)
{
for (; count < global.server_sockets; count++)
{
sock_close (global.serversock [count]);
config_clear_listener (global.server_conn [count]);
}
free (global.serversock);
global.serversock = NULL;
free (global.server_conn);
global.server_conn = NULL;
}
if (config == NULL)
{
global_unlock();
if (global.server_sockets >= config->listen_sock_count)
return 0;
}
global_lock();
count = 0;
global.serversock = calloc (config->listen_sock_count, sizeof (sock_t));
global.server_conn = calloc (config->listen_sock_count, sizeof (listener_t*));
tmp = realloc (global.serversock, (config->listen_sock_count*sizeof (sock_t)));
if (tmp) global.serversock = tmp;
tmp = realloc (global.server_conn, (config->listen_sock_count*sizeof (listener_t*)));
if (tmp) global.server_conn = tmp;
listener = config->listen_sock;
prev = &config->listen_sock;
count = global.server_sockets;
while (listener)
{
int successful = 0;
@ -1429,6 +1588,8 @@ int connection_setup_sockets (ice_config_t *config)
sock_set_send_buffer (sock, listener->so_sndbuf);
sock_set_blocking (sock, 0);
successful = 1;
if (count >= config->listen_sock_count)
abort();
global.serversock [count] = sock;
global.server_conn [count] = listener;
listener->refcount++;

View File

@ -13,6 +13,10 @@
#ifndef __CONNECTION_H__
#define __CONNECTION_H__
#ifdef HAVE_WINSOCK2_H
#include <winsock2.h>
#endif
#include <sys/types.h>
#include <time.h>
#ifdef HAVE_OPENSSL
@ -46,6 +50,15 @@ struct connection_tag
char *ip;
};
struct connection_bufs
{
short count, max;
int total;
IOVEC *block;
};
#ifdef HAVE_OPENSSL
#define not_ssl_connection(x) ((x)->ssl==NULL)
#else
@ -63,6 +76,14 @@ void connection_uses_ssl (connection_t *con);
void connection_add_banned_ip (const char *ip, int duration);
void connection_release_banned_ip (const char *ip);
void connection_stats (void);
void connection_bufs_init (struct connection_bufs *vectors, short start);
void connection_bufs_release (struct connection_bufs *v);
void connection_bufs_flush (struct connection_bufs *v);
int connection_bufs_append (struct connection_bufs *vectors, void *buf, unsigned int len);
int connection_bufs_read (connection_t *con, struct connection_bufs *vecs, int skip);
int connection_bufs_send (connection_t *con, struct connection_bufs *vecs, int skip);
#ifdef HAVE_OPENSSL
int connection_read_ssl (connection_t *con, void *buf, size_t len);
int connection_send_ssl (connection_t *con, const void *buf, size_t len);
@ -76,6 +97,7 @@ int connection_check_relay_pass(http_parser_t *parser);
int connection_check_admin_pass(http_parser_t *parser);
void connection_close_sigfd (void);
void connection_listen_sockets_close (struct ice_config_tag *config, int all_sockets);
extern int connection_running;

View File

@ -64,14 +64,17 @@ void event_config_read (void)
config_set_config (&new_config, &old_config);
config_release_config();
connection_thread_shutdown();
redirector_clearall();
config = config_get_config();
yp_recheck_config (config);
fserve_recheck_mime_types (config);
stats_global (config);
workers_adjust (config->workers_count);
connection_listen_sockets_close (config, 0);
redirector_setup (config);
config_release_config();
connection_thread_shutdown();
slave_restart();
config_clear (&old_config);
}

207
src/flv.c
View File

@ -32,6 +32,7 @@
#include "logging.h"
#include "mpeg.h"
#include "format_mp3.h"
#include "global.h"
#define CATMODULE "flv"
@ -105,11 +106,13 @@ static int flv_mpX_hdr (struct mpeg_sync *mp, unsigned char *frame, unsigned int
flv->tag[15] |= 0x1;
}
memcpy (mp->raw->data + mp->raw_offset, &flv->tag[0], 16);
connection_bufs_append (&flv->bufs, mp->raw->data + mp->raw_offset, 16);
flv->samples += mp->sample_count;
flv->prev_ms = (int64_t)(flv->samples / (mp->samplerate/1000.0));
// The extra byte is for the flv audio id, usually 0x2F
flv->prev_tagsize = (len + FLVHEADER + 1);
mp->raw_offset += 16;
connection_bufs_append (&flv->bufs, frame, len);
return 0;
}
@ -132,11 +135,13 @@ static int flv_aac_hdr (struct mpeg_sync *mp, unsigned char *frame, unsigned int
flv_hdr (flv, len + 2);
// a single frame (headerless) follows this
memcpy (mp->raw->data + mp->raw_offset, &flv->tag[0], 17);
connection_bufs_append (&flv->bufs, mp->raw->data + mp->raw_offset, 17);
flv->samples += mp->sample_count;
flv->prev_ms = (int64_t)(flv->samples / (mp->samplerate/1000.0));
// frame length + FLVHEADER + AVHEADER
flv->prev_tagsize = (len + 11 + 2);
mp->raw_offset += 17;
connection_bufs_append (&flv->bufs, frame, len);
return 0;
}
@ -171,6 +176,7 @@ static int flv_aac_firsthdr (struct mpeg_sync *mp, unsigned char *frame, unsigne
flv->tag[15] = 0xAF; // AAC audio, need these codes first
flv->tag[16] = 0x0;
memcpy (mp->raw->data, &flv->tag[0], 11+4+2+c);
connection_bufs_append (&flv->bufs, mp->raw->data, 11+4+2+c);
mp->raw_offset = 11+4+2+c;
flv->prev_tagsize = 11 + 2 + c;
flv->tag[16] = 0x01; // as per spec. headerless frame follows this
@ -184,31 +190,125 @@ static int flv_aac_firsthdr (struct mpeg_sync *mp, unsigned char *frame, unsigne
static int send_flv_buffer (client_t *client, struct flv *flv)
{
int ret = 0;
char *buf = flv->mpeg_sync.raw->data + flv->block_pos;
unsigned int len = flv->mpeg_sync.raw_offset - flv->block_pos;
unsigned int len = flv->bufs.total - flv->block_pos;
if (len > 0)
{
if (len > 1400)
len = 1400;
ret = client_send_bytes (client, buf, len);
ret = connection_bufs_send (&client->connection, &flv->bufs, flv->block_pos);
if (ret < (int)len)
client->schedule_ms += (ret ? 50 : 250);
client->schedule_ms += (ret > 0 ? 50 : 200);
if (ret > 0)
flv->block_pos += ret;
}
if (flv->block_pos == flv->mpeg_sync.raw_offset)
if (flv->block_pos == flv->bufs.total)
{
flv->block_pos = flv->mpeg_sync.raw_offset = 0;
connection_bufs_flush (&flv->bufs);
}
return ret;
}
void flv_write_metadata (struct flv *flv, refbuf_t *scmeta, const char *mount)
{
/* the first assoc block is shoutcast meta, second is flv meta */
int len;
struct flvmeta *flvm;
unsigned char prev_type = flv->tag[4];
refbuf_t *flvmeta = NULL;
int meta_copied = 0;
refbuf_t *raw = flv->mpeg_sync.raw;
char *src, *dst = raw->data + flv->mpeg_sync.raw_offset;
if (scmeta)
flvmeta = scmeta->associated;
if (flvmeta == NULL)
{
char *value = stats_get_value (mount, "server_name");
flvmeta = flv_meta_allocate (200);
if (value)
flv_meta_append_string (flvmeta, "name", value);
free (value);
value = stats_get_value (flv->mpeg_sync.mount, "title");
if (value)
flv_meta_append_string (flvmeta, "title", value);
else
flv_meta_append_string (flvmeta, "title", "");
free (value);
value = stats_get_value (mount, "audio_codecid");
if (value)
{
int id = atoi (value);
if (id == 2 || id == 10)
flv_meta_append_number (flvmeta, "audiocodecid", (double)id);
free (value);
}
value = stats_get_value (mount, "ice-bitrate");
if (value)
{
double rate = (double)atoi (value);
flv_meta_append_number (flvmeta, "audiodatarate", rate);
free (value);
}
value = stats_get_value (mount, "ice-samplerate");
if (value)
{
double rate = (double)atoi (value);
flv_meta_append_number (flvmeta, "audiosamplerate", rate);
free (value);
}
value = stats_get_value (mount, "ice-channels");
if (value)
{
int chann = atoi (value);
flv_meta_append_bool (flvmeta, "stereo", chann == 2 ? 1 : 0);
free (value);
}
flv_meta_append_string (flvmeta, NULL, NULL);
flvm = (struct flvmeta *)flvmeta->data;
meta_copied = flvm->meta_pos - sizeof (*flvm);
}
else
flvm = (struct flvmeta *)flvmeta->data;
len = flvm->meta_pos - sizeof (*flvm);
src = (char *)flvm + sizeof (*flvm);
if (meta_copied + 15 > raw->len)
{
int newlen = raw->len + 1024;
void *p = realloc (raw->data, newlen);
if (p == NULL)
return;
raw->data = p;
raw->len = newlen;
}
flv->tag[4] = 18; // metadata
flv_hdr (flv, len);
memcpy (dst, &flv->tag[0], 15);
connection_bufs_append (&flv->bufs, dst, 15);
flv->mpeg_sync.raw_offset += 15;
dst += 15;
if (meta_copied)
{
memcpy (dst, src, len);
connection_bufs_append (&flv->bufs, dst, len);
flv->mpeg_sync.raw_offset += len;
refbuf_release (flvmeta);
}
else
connection_bufs_append (&flv->bufs, src, len);
flv->prev_tagsize = len + 11;
flv->tag[4] = prev_type;
}
int write_flv_buf_to_client (client_t *client)
{
refbuf_t *ref = client->refbuf, *scmeta = ref->associated;
mp3_client_data *client_mp3 = client->format_data;
struct flv *flv = client_mp3->specific;
int ret, meta_to_free = 0;
int ret;
if (client->pos >= ref->len)
{
@ -216,99 +316,27 @@ int write_flv_buf_to_client (client_t *client)
client->pos = ref->len;
return -1;
}
/* check for metadata updates and insert if needed */
if (flv->seen_metadata != scmeta)
{
/* the first assoc block is shoutcast meta, second is flv meta */
int len;
char *src, *dst = flv->mpeg_sync.raw->data;
refbuf_t *flvmeta = NULL;
struct flvmeta *flvm;
if (scmeta)
flvmeta = scmeta->associated;
if (flvmeta == NULL)
{
char *value = stats_get_value (client->mount, "server_name");
flvmeta = flv_meta_allocate (200);
meta_to_free = 1;
if (value)
flv_meta_append_string (flvmeta, "name", value);
free (value);
value = stats_get_value (flv->mpeg_sync.mount, "title");
if (value)
flv_meta_append_string (flvmeta, "title", value);
else
flv_meta_append_string (flvmeta, "title", "");
free (value);
flv_meta_append_string (flvmeta, "title", value);
value = stats_get_value (client->mount, "audio_codecid");
if (value)
{
int id = atoi (value);
if (id == 2 || id == 10)
flv_meta_append_number (flvmeta, "audiocodecid", (double)id);
free (value);
}
value = stats_get_value (client->mount, "ice-bitrate");
if (value)
{
double rate = (double)atoi (value);
flv_meta_append_number (flvmeta, "audiodatarate", rate);
free (value);
}
value = stats_get_value (client->mount, "ice-samplerate");
if (value)
{
double rate = (double)atoi (value);
flv_meta_append_number (flvmeta, "audiosamplerate", rate);
free (value);
}
value = stats_get_value (client->mount, "ice-channels");
if (value)
{
int chann = atoi (value);
flv_meta_append_bool (flvmeta, "stereo", chann == 2 ? 1 : 0);
free (value);
}
flv_meta_append_string (flvmeta, NULL, NULL);
}
flvm = (struct flvmeta *)flvmeta->data;
src = (char *)flvm + sizeof (*flvm);
len = flvm->meta_pos - sizeof (*flvm);
if (len + 15 < flv->mpeg_sync.raw->len)
{
unsigned char prev_type = flv->tag[4];
flv->tag[4] = 18; // metadata
flv_hdr (flv, len);
memcpy (dst, &flv->tag[0], 15);
memcpy (dst+15, src, len);
flv->mpeg_sync.raw_offset = len + 15;
flv->prev_tagsize = len + 11;
flv->tag[4] = prev_type;
flv->seen_metadata = ref->associated;
if (meta_to_free) refbuf_release (flvmeta);
return send_flv_buffer (client, flv);
}
if (meta_to_free) refbuf_release (flvmeta);
flv->seen_metadata = ref->associated;
}
if (flv->mpeg_sync.raw_offset == 0)
{
int unprocessed = mpeg_complete_frames (&flv->mpeg_sync, ref, client->pos);
if (unprocessed < 0)
return -1;
if (unprocessed > 0)
ref->len += unprocessed; /* output was truncated, so revert changes */
if (flv->seen_metadata != scmeta)
flv_write_metadata (flv, scmeta, client->mount);
}
ret = send_flv_buffer (client, flv);
if (flv->mpeg_sync.raw_offset == 0)
{
client->pos = ref->len;
client->queue_pos += client->refbuf->len;
if (flv->seen_metadata != scmeta)
flv->seen_metadata = scmeta;
}
return ret;
}
@ -458,7 +486,8 @@ void flv_create_client_data (format_plugin_t *plugin, client_t *client)
"\r\n"
"FLV\x1\x4%c%c%c\x9", 0,0,0);
flv->mpeg_sync.raw = refbuf_new (4096);
// only flv headers in here, allows for up to 64 frames per read block, expandable
flv->mpeg_sync.raw = refbuf_new (1024);
flv->tag[4] = 8; // Audio details only
if (plugin->type == FORMAT_TYPE_AAC)
{
@ -474,6 +503,7 @@ void flv_create_client_data (format_plugin_t *plugin, client_t *client)
client->respcode = 200;
client->refbuf->len = bytes;
connection_bufs_init (&flv->bufs, 10);
}
@ -481,5 +511,6 @@ void free_flv_client_data (struct flv *flv)
{
flv->mpeg_sync.mount = NULL;
mpeg_cleanup (&flv->mpeg_sync);
connection_bufs_release (&flv->bufs);
}

View File

@ -25,6 +25,7 @@ struct flv
int64_t samples;
refbuf_t *seen_metadata;
mpeg_sync mpeg_sync;
struct connection_bufs bufs;
unsigned char tag[30];
};

View File

@ -27,6 +27,7 @@ typedef struct source_tag source_t;
#include "format_ogg.h"
#include "client.h"
#include "stats.h"
#include "global.h"
#define CATMODULE "format-flac"
#include "logging.h"

View File

@ -31,6 +31,7 @@ typedef struct source_tag source_t;
#include "format_kate.h"
#include "client.h"
#include "stats.h"
#include "global.h"
#define CATMODULE "format-kate"
#include "logging.h"

View File

@ -27,6 +27,7 @@ typedef struct source_tag source_t;
#include "format_ogg.h"
#include "client.h"
#include "stats.h"
#include "global.h"
#define CATMODULE "format-midi"
#include "logging.h"

View File

@ -41,6 +41,7 @@
#include "format_mp3.h"
#include "flv.h"
#include "mpeg.h"
#include "global.h"
#define CATMODULE "format-mp3"
@ -64,8 +65,9 @@ static int mpeg_process_buffer (client_t *client, format_plugin_t *plugin);
/* client format flags */
#define CLIENT_IN_METADATA CLIENT_FORMAT_BIT
#define CLIENT_USING_BLANK_META (CLIENT_FORMAT_BIT<<1)
#define CLIENT_INTERNAL_FORMAT (CLIENT_FORMAT_BIT << 4)
#define CLIENT_IN_METADATA (CLIENT_INTERNAL_FORMAT)
#define CLIENT_USING_BLANK_META (CLIENT_INTERNAL_FORMAT<<1)
static refbuf_t blank_meta = { 0, 1, NULL, NULL, "\001StreamTitle='';", 17 };
@ -138,7 +140,10 @@ static void mp3_set_tag (format_plugin_t *plugin, const char *tag, const char *i
if (in_value)
{
value = util_conv_string (in_value, charset, plugin->charset);
if (charset == NULL && plugin->charset)
charset = plugin->charset;
if (charset && (strcasecmp (charset, "utf-8") && strcasecmp (charset, "utf8")))
value = util_conv_string (in_value, charset, "UTF8");
if (value == NULL)
value = strdup (in_value);
}
@ -450,6 +455,8 @@ static int send_stream_metadata (client_t *client, refbuf_t *refbuf)
* to merge them into 1 write call */
block_len = refbuf->len - client->pos;
if (block_len > client_mp3->interval)
block_len = client_mp3->interval; // handle small intervals
merge = alloca (block_len + meta_len);
memcpy (merge, metadata, meta_len);
@ -502,7 +509,7 @@ static int format_mp3_write_buf_to_client (client_t *client)
ret = client_send_bytes (client, buf, len);
if (ret < len)
client->schedule_ms += 50;
client->schedule_ms += 80;
if (ret > 0)
{
client_mp3->since_meta_block += ret;
@ -629,30 +636,33 @@ static int validate_mpeg (source_t *source, refbuf_t *refbuf)
if (source_mp3->inline_metadata_interval > 0)
{
/* inline metadata may need reading before the rest of the mpeg data */
if (source_mp3->build_metadata_len == 0 && source_mp3->offset > unprocessed)
{
source_mp3->offset -= unprocessed;
}
else
if (source_mp3->inline_metadata_interval <= source_mp3->offset)
{
// reached meta but we have a frame fragment, so keep it for later
leftover = refbuf_new (unprocessed);
memcpy (leftover->data, refbuf->data + refbuf->len, unprocessed);
mpeg_data_insert (mpeg_sync, leftover); /* will need to merge this after metadata */
return 0;
mpeg_data_insert (mpeg_sync, leftover);
client->pos = 0;
return refbuf->len ? 0 : -1;
}
// not reached the metadata block so save and rewind for completing the read
source_mp3->offset -= unprocessed;
}
/* make sure the new block has a minimum of queue_block_size */
if (unprocessed < source_mp3->queue_block_size)
len = source_mp3->queue_block_size;
else
len = unprocessed + 2000;
len = unprocessed + 1000;
leftover = refbuf_new (len);
memcpy (leftover->data, refbuf->data + refbuf->len, unprocessed);
source_mp3->read_data = leftover;
source_mp3->read_count = unprocessed;
client->pos = unprocessed;
}
else
client->pos = 0;
if (source->format->read_bytes < 2500)
stats_event_args (source->mount, "audio_codecid", "%d", (mpeg_sync->layer ? 2 : 10));
return refbuf->len ? 0 : -1;
@ -670,6 +680,8 @@ static refbuf_t *mp3_get_no_meta (source_t *source)
return NULL;
refbuf = source_mp3->read_data;
refbuf->len = source_mp3->read_count;
source_mp3->read_count = 0;
source_mp3->read_data = NULL;
if (client->format_data && validate_mpeg (source, refbuf) < 0)
@ -708,6 +720,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source)
/* fill the buffer with the read data */
bytes = source_mp3->read_count;
refbuf->len = 0;
while (bytes > 0)
{
unsigned int metadata_remaining;
@ -771,7 +784,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source)
source_mp3->build_metadata_len = 0;
}
/* the data we have just read may of just been metadata */
if (refbuf->len == 0)
if (refbuf->len <= 0)
{
refbuf_release (refbuf);
return NULL;

View File

@ -44,6 +44,7 @@
#include "format_flac.h"
#include "format_kate.h"
#include "format_skeleton.h"
#include "global.h"
#define CATMODULE "format-ogg"
#include "logging.h"
@ -524,7 +525,9 @@ static int send_ogg_headers (client_t *client, refbuf_t *headers)
unsigned len = refbuf->len - client_data->pos;
int ret = -1;
if (len < 8000 && client->connection.error == 0)
if (len > 8192)
len = 8192;
if (client->connection.error == 0)
ret = client_send_bytes (client, data, len);
if (ret > 0)
{

View File

@ -28,6 +28,7 @@ typedef struct source_tag source_t;
#include "format_skeleton.h"
#include "client.h"
#include "stats.h"
#include "global.h"
#define CATMODULE "format-skeleton"
#include "logging.h"

View File

@ -26,6 +26,7 @@ typedef struct source_tag source_t;
#include "format_speex.h"
#include "refbuf.h"
#include "client.h"
#include "global.h"
#define CATMODULE "format-speex"
#include "logging.h"

View File

@ -27,6 +27,7 @@
#include "format_theora.h"
#include "client.h"
#include "stats.h"
#include "global.h"
#define CATMODULE "format-theora"
#include "logging.h"

View File

@ -30,6 +30,7 @@
#include "format_ogg.h"
#include "stats.h"
#include "format.h"
#include "global.h"
#define CATMODULE "format-vorbis"
#include "logging.h"

View File

@ -14,6 +14,7 @@
#include <config.h>
#endif
#include "compat.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -27,15 +28,21 @@
#include <sys/poll.h>
#endif
#ifndef _WIN32
#include <unistd.h>
#include <sys/time.h>
#include <sys/socket.h>
#define SCN_OFF_T SCNdMAX
#define PRI_OFF_T PRIdMAX
#else
#ifdef _MSC_VER
#include <winsock2.h>
#include <windows.h>
#else
#include <unistd.h>
#include <sys/time.h>
# ifdef HAVE_SYS_SOCKET_H
# include <sys/socket.h>
# endif
# ifndef SCN_OFF_T
# define SCN_OFF_T SCNdMAX
# endif
# ifndef PRI_OFF_T
# define PRI_OFF_T PRIdMAX
# endif
#endif
#include "thread/thread.h"
@ -53,7 +60,6 @@
#include "cfgfile.h"
#include "util.h"
#include "admin.h"
#include "compat.h"
#include "slave.h"
#include "fserve.h"
@ -178,6 +184,7 @@ char *fserve_content_type (const char *path)
return type;
}
static int _compare_fh(void *arg, void *a, void *b)
{
fh_node *x = a, *y = b;
@ -189,6 +196,7 @@ static int _compare_fh(void *arg, void *a, void *b)
return 0;
}
static int _delete_fh (void *mapping)
{
fh_node *fh = mapping;
@ -217,12 +225,33 @@ static int _delete_fh (void *mapping)
}
static void remove_fh_from_cache (fh_node *fh)
{
avl_tree_wlock (fh_cache);
avl_delete (fh_cache, fh, NULL);
avl_tree_unlock (fh_cache);
fh->peak = 0;
}
static void remove_from_fh (fh_node *fh, client_t *client)
{
avl_delete (fh->clients, client, NULL);
}
static fh_node *find_fh (fbinfo *finfo)
{
fh_node fh, *result = NULL;
if (finfo->mount == NULL)
finfo->mount = "";
memcpy (&fh.finfo, finfo, sizeof (fbinfo));
if (avl_get_by_key (fh_cache, &fh, (void**)&result) == 0)
return result;
return NULL;
}
/* find/create handle and return it with the structure in a locked state */
static fh_node *open_fh (fbinfo *finfo, client_t *client)
{
@ -254,10 +283,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client)
if (result->format)
{
if (result->format->create_client_data && client->format_data == NULL)
{
result->format->create_client_data (result->format, client);
if (result->format->write_buf_to_client)
client->check_buffer = result->format->write_buf_to_client;
}
}
}
DEBUG2 ("refcount now %d for %s", result->refcount, result->finfo.mount);
@ -271,9 +299,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client)
if (client)
{
if (finfo->flags & FS_FALLBACK)
DEBUG1 ("lookup of fallback file \"%s\"", finfo->mount);
INFO2 ("lookup of fallback file \"%s\" (%d)", finfo->mount, finfo->limit);
else
DEBUG1 ("lookup of \"%s\"", finfo->mount);
INFO1 ("lookup of \"%s\"", finfo->mount);
}
fh->fp = fopen (fullpath, "rb");
if (fh->fp == NULL)
@ -292,7 +320,8 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client)
{
char *contenttype = fserve_content_type (fullpath);
stats_event (finfo->mount, "fallback", "file");
stats_event_flags (finfo->mount, "fallback", "file", STATS_COUNTERS|STATS_HIDDEN);
stats_event_flags (finfo->mount, "outgoing_kbitrate", "0", STATS_COUNTERS|STATS_HIDDEN);
fh->format = calloc (1, sizeof (format_plugin_t));
fh->format->type = format_get_type (contenttype);
free (contenttype);
@ -305,10 +334,9 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client)
return NULL;
}
if (fh->format->create_client_data && client->format_data == NULL)
{
fh->format->create_client_data (fh->format, client);
if (fh->format->write_buf_to_client)
client->check_buffer = fh->format->write_buf_to_client;
}
}
free (fullpath);
}
@ -321,8 +349,8 @@ static fh_node *open_fh (fbinfo *finfo, client_t *client)
{
if (finfo->mount && (finfo->flags & FS_FALLBACK))
{
stats_event_flags (fh->finfo.mount, "listeners", "1", STATS_HIDDEN);
stats_event_flags (fh->finfo.mount, "listener_peak", "1", STATS_HIDDEN);
stats_event_flags (fh->finfo.mount, "listeners", "1", STATS_GENERAL|STATS_HIDDEN);
stats_event_flags (fh->finfo.mount, "listener_peak", "1", STATS_GENERAL|STATS_HIDDEN);
}
avl_insert (fh->clients, client);
}
@ -595,20 +623,16 @@ static void fh_release (fh_node *fh)
if (fh->refcount > 1)
{
fh->refcount--;
stats_event_args (fh->finfo.mount, "listeners", "%ld", fh->refcount);
thread_mutex_unlock (&fh->lock);
return;
}
/* now we will probably remove the fh, but to prevent a deadlock with
* open_fh, we drop the node lock and acquire the tree and node locks
* in that order and only remove if the refcount is still 0 */
thread_mutex_unlock (&fh->lock);
avl_tree_wlock (fh_cache);
thread_mutex_lock (&fh->lock);
if (fh->refcount > 1)
thread_mutex_unlock (&fh->lock);
else
avl_delete (fh_cache, fh, _delete_fh);
avl_tree_unlock (fh_cache);
stats_event (fh->finfo.mount, "fallback", NULL);
stats_event (fh->finfo.mount, NULL, NULL);
if (fh->peak)
remove_fh_from_cache (fh);
// leave as locked
_delete_fh (fh);
}
@ -686,10 +710,10 @@ static void fserve_move_listener (client_t *client)
remove_from_fh (fh, client);
if (fh->refcount == 1)
stats_event (fh->finfo.mount, NULL, NULL);
f.flags = fh->finfo.flags;
f.flags = fh->finfo.flags|FS_OVERRIDE;
f.limit = fh->finfo.limit;
f.mount = fh->finfo.fallback;
f.fallback = NULL;
f.fallback = fh->finfo.mount;
client->intro_offset = -1;
move_listener (client, &f);
fh_release (fh);
@ -857,14 +881,17 @@ static int throttled_file_send (client_t *client)
return 0;
}
if (client->flags & CLIENT_WANTS_FLV) /* increase limit for flv clients as wrapping takes more space */
limit = (unsigned long)(limit * 1.05);
limit = (unsigned long)(limit * 1.01);
if (secs)
rate = (client->counter+1400)/secs;
// DEBUG3 ("counter %lld, duration %ld, limit %u", client->counter, secs, rate);
thread_mutex_lock (&fh->lock);
if (rate > limit || secs < 3)
{
client->schedule_ms += 1000/(limit/1400);
if (limit >= 1400)
client->schedule_ms += 1000/(limit/1400);
else
client->schedule_ms += 50; // should not happen but guard against it
rate_add (fh->format->out_bitrate, 0, worker->time_ms);
if (secs > 2)
{
@ -907,7 +934,10 @@ static int throttled_file_send (client_t *client)
rate_add (fh->format->out_bitrate, bytes, worker->time_ms);
thread_mutex_unlock (&fh->lock);
global_add_bitrates (global.out_bitrate, bytes, worker->time_ms);
client->schedule_ms += (1000/(limit/1400*2));
if (limit > 2800)
client->schedule_ms += (1000/(limit/1400*2));
else
client->schedule_ms += 50;
/* progessive slowdown if max bandwidth is exceeded. */
if (throttle_sends > 1)
@ -928,12 +958,23 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo)
{
if (finfo)
{
mount_proxy *minfo;
fh_node *fh;
if (finfo->flags & FS_FALLBACK && finfo->limit == 0)
return -1;
fh = open_fh (finfo, client);
if (fh == NULL)
return -1;
minfo = config_find_mount (config_get_config(), finfo->mount);
if (minfo && minfo->max_listeners >= 0 && fh->refcount > minfo->max_listeners)
{
config_release_config();
remove_from_fh (fh, client);
fh_release (fh);
client->shared_data = NULL;
return client_send_403redirect (client, finfo->mount, "max listeners reached");
}
config_release_config();
client->shared_data = fh;
if (fh->finfo.limit)
@ -957,9 +998,12 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo)
client->ops = &buffer_content_ops;
client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
client->flags |= CLIENT_IN_FSERVE;
if (client->flags & CLIENT_ACTIVE)
{
client->schedule_ms = client->worker->time_ms;
if (finfo && finfo->flags & FS_FALLBACK)
return 0; // prevent a recursive loop
return client->ops->process (client);
}
else
@ -979,7 +1023,7 @@ int fserve_setup_client (client_t *client)
}
void fserve_set_override (const char *mount, const char *dest)
int fserve_set_override (const char *mount, const char *dest)
{
fh_node fh, *result;
@ -987,15 +1031,23 @@ void fserve_set_override (const char *mount, const char *dest)
fh.finfo.mount = (char *)mount;
fh.finfo.fallback = NULL;
avl_tree_rlock (fh_cache);
avl_tree_wlock (fh_cache);
if (avl_get_by_key (fh_cache, &fh, (void**)&result) == 0)
{
char *tmp = result->finfo.fallback;
thread_mutex_lock (&result->lock);
avl_delete (fh_cache, result, NULL);
avl_tree_unlock (fh_cache);
result->finfo.flags |= FS_OVERRIDE;
result->finfo.fallback = strdup (dest);
thread_mutex_unlock (&result->lock);
free (tmp);
INFO2 ("move clients from %s to %s", mount, dest);
return 1;
}
avl_tree_unlock (fh_cache);
return 0;
}
static int _delete_mapping(void *mapping) {
@ -1110,13 +1162,16 @@ int fserve_kill_client (client_t *client, const char *mount, int response)
xmlDocSetRootElement(doc, node);
snprintf (buf, sizeof(buf), "Client %d not found", id);
avl_tree_rlock (fh_cache);
while (1)
{
fh_node *fh = open_fh (&finfo, NULL);
avl_node *node;
fh_node *fh = find_fh (&finfo);
if (fh)
{
avl_node *node = avl_get_first (fh->clients);
thread_mutex_lock (&fh->lock);
avl_tree_unlock (fh_cache);
node = avl_get_first (fh->clients);
while (node)
{
client_t *listener = (client_t *)node->key;
@ -1130,61 +1185,46 @@ int fserve_kill_client (client_t *client, const char *mount, int response)
}
node = avl_get_next (node);
}
fh_release (fh);
thread_mutex_unlock (&fh->lock);
avl_tree_rlock (fh_cache);
}
if (loop == 0) break;
loop--;
if (loop == 1) finfo.flags = FS_FALLBACK;
}
avl_tree_unlock (fh_cache);
xmlNewChild (node, NULL, XMLSTR("message"), XMLSTR(buf));
xmlNewChild (node, NULL, XMLSTR("return"), XMLSTR(v));
return admin_send_response (doc, client, response, "response.xsl");
}
int fserve_list_clients_xml (xmlNodePtr srcnode, fbinfo *finfo)
int fserve_list_clients_xml (xmlNodePtr parent, fbinfo *finfo)
{
int ret = 0;
fh_node *fh = open_fh (finfo, NULL);
fh_node *fh;
avl_node *anode;
if (fh)
avl_tree_rlock (fh_cache);
fh = find_fh (finfo);
if (fh == NULL)
{
avl_node *anode = avl_get_first (fh->clients);
while (anode)
{
client_t *listener = (client_t *)anode->key;
char buf [100];
xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL);
const char *useragent;
snprintf (buf, sizeof (buf), "%lu", listener->connection.id);
xmlSetProp (node, XMLSTR("id"), XMLSTR(buf));
xmlNewChild (node, NULL, XMLSTR("ip"), XMLSTR(listener->connection.ip));
useragent = httpp_getvar (listener->parser, "user-agent");
if (useragent)
{
xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(useragent));
xmlNewChild (node, NULL, XMLSTR("useragent"), str);
xmlFree (str);
}
xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR( "0"));
snprintf (buf, sizeof (buf), "%lu",
(unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time));
xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf));
if (listener->username)
{
xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(listener->username));
xmlNewChild (node, NULL, XMLSTR("username"), str);
xmlFree (str);
}
ret++;
anode = avl_get_next (anode);
}
fh_release (fh);
avl_tree_unlock (fh_cache);
return 0;
}
thread_mutex_lock (&fh->lock);
avl_tree_unlock (fh_cache);
anode = avl_get_first (fh->clients);
while (anode)
{
client_t *listener = (client_t *)anode->key;
stats_listener_to_xml (listener, parent);
ret++;
anode = avl_get_next (anode);
}
thread_mutex_unlock (&fh->lock);
return ret;
}
@ -1208,17 +1248,33 @@ int fserve_list_clients (client_t *client, const char *mount, int response, int
xmlSetProp(srcnode, XMLSTR("mount"), XMLSTR(mount));
ret = fserve_list_clients_xml (srcnode, &finfo);
if (ret == 0)
if (ret == 0 && finfo.flags&FS_FALLBACK)
{
finfo.flags = 0;
finfo.flags = 0; // retry
ret = fserve_list_clients_xml (srcnode, &finfo);
}
if (ret)
{
char buf[20];
snprintf (buf, sizeof(buf), "%u", ret);
char buf [20];
snprintf (buf, sizeof(buf), "%d", ret);
xmlNewChild (srcnode, NULL, XMLSTR("listeners"), XMLSTR(buf));
return admin_send_response (doc, client, response, "listclients.xsl");
}
xmlFree (doc);
return client_send_400 (client, "mount does not exist");
}
int fserve_query_count (fbinfo *finfo)
{
int ret = 0;
fh_node *fh;
avl_tree_rlock (fh_cache);
fh = find_fh (finfo);
if (fh)
ret = fh->refcount;
avl_tree_unlock (fh_cache);
return ret;
}

View File

@ -29,6 +29,7 @@ typedef struct _fbinfo
#define FS_USE_ADMIN 01
#define FS_FALLBACK 02
#define FS_FALLBACK_EOF 04
#define FS_OVERRIDE 010
void fserve_initialize(void);
void fserve_shutdown(void);
@ -38,10 +39,12 @@ void fserve_recheck_mime_types (ice_config_t *config);
int fserve_setup_client (client_t *client);
int fserve_setup_client_fb (client_t *client, fbinfo *finfo);
void fserve_set_override (const char *mount, const char *dest);
int fserve_set_override (const char *mount, const char *dest);
int fserve_list_clients (client_t *client, const char *mount, int response, int show_listeners);
int fserve_list_clients_xml (xmlNodePtr srcnode, fbinfo *finfo);
int fserve_kill_client (client_t *client, const char *mount, int response);
int fserve_query_count (fbinfo *finfo);
extern int fserve_running;

View File

@ -55,11 +55,11 @@ void global_shutdown(void)
thread_mutex_destroy(&_global_mutex);
thread_spin_destroy (&global.spinlock);
avl_tree_free(global.source_tree, NULL);
rate_free (global.out_bitrate);
global.out_bitrate = NULL;
#ifdef MY_ALLOC
avl_tree_free(global.alloc_tree, free_alloc_node);
#endif
rate_free (global.out_bitrate);
global.out_bitrate = NULL;
}
void global_lock(void)

View File

@ -13,7 +13,6 @@
#ifndef __GLOBAL_H__
#define __GLOBAL_H__
#include "config.h"
#define ICE_LISTEN_QUEUE 64
@ -25,6 +24,7 @@
#include "thread/thread.h"
#include "net/sock.h"
#include "compat.h"
#include "avl/avl.h"
typedef struct ice_global_tag
{
@ -64,6 +64,11 @@ typedef struct ice_global_tag
extern unsigned int throttle_sends;
extern void initialize_subsystems (void);
extern void shutdown_subsystems (void);
extern void server_process (void);
extern int server_init (int argc, char *argv[]);
#ifdef MY_ALLOC
#define calloc(x,y) my_calloc(__func__,__LINE__,x,y)

View File

@ -37,7 +37,7 @@ void fatal_error (const char *perr);
int errorlog = 0;
int playlistlog = 0;
#ifdef _WIN32
#ifdef _MSC_VER
/* Since strftime's %z option on win32 is different, we need
to go through a few loops to get the same info as %z */
int get_clf_time (char *buffer, unsigned len, struct tm *t)
@ -107,12 +107,13 @@ int get_clf_time (char *buffer, unsigned len, struct tm *t)
*/
void logging_access_id (access_log *accesslog, client_t *client)
{
char datebuf[128];
char reqbuf[1024];
const char *req;
struct tm thetime;
time_t now;
time_t stayed;
const char *referrer, *user_agent, *username, *ip = "-";
const char *referrer, *user_agent, *username = NULL, *ip = "-";
char datebuf[50];
char reqbuf[128];
if (client->flags & CLIENT_SKIP_ACCESSLOG)
return;
@ -121,41 +122,56 @@ void logging_access_id (access_log *accesslog, client_t *client)
localtime_r (&now, &thetime);
/* build the data */
#ifdef _WIN32
#ifdef _MSC_VER
memset(datebuf, '\000', sizeof(datebuf));
get_clf_time(datebuf, sizeof(datebuf)-1, &thetime);
#else
strftime (datebuf, sizeof(datebuf), LOGGING_FORMAT_CLF, &thetime);
#endif
req = httpp_getvar (client->parser, HTTPP_VAR_RAWURI);
if (req == NULL)
req = httpp_getvar (client->parser, HTTPP_VAR_URI);
/* build the request */
snprintf (reqbuf, sizeof(reqbuf), "%s %s %s/%s",
httpp_getvar (client->parser, HTTPP_VAR_REQ_TYPE),
httpp_getvar (client->parser, HTTPP_VAR_URI),
httpp_getvar (client->parser, HTTPP_VAR_REQ_TYPE), req,
httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL),
httpp_getvar (client->parser, HTTPP_VAR_VERSION));
stayed = now - client->connection.con_time;
if (client->username == NULL)
username = "-";
else
username = client->username;
username = client->username;
referrer = httpp_getvar (client->parser, "referer");
if (referrer == NULL)
referrer = "-";
user_agent = httpp_getvar (client->parser, "user-agent");
if (user_agent == NULL)
user_agent = "-";
if (accesslog->log_ip)
ip = client->connection.ip;
log_write_direct (accesslog->logid,
"%s - %s [%s] \"%s\" %d %" PRIu64 " \"%s\" \"%s\" %lu",
ip, username,
datebuf, reqbuf, client->respcode, client->connection.sent_bytes,
referrer, user_agent, (unsigned long)stayed);
if (accesslog->type == LOG_ACCESS_CLF_ESC)
{
char *un = client->username ? util_url_escape (username) : strdup ("-"),
*rq = util_url_escape (reqbuf),
*rf = referrer ? util_url_escape (referrer) : strdup ("-"),
*ua = user_agent ? util_url_escape (user_agent) : strdup ("-");
log_write_direct (accesslog->logid,
"%s - %s %s %s %d %" PRIu64 " %.50s %.50s %lu",
ip, un, datebuf, rq, client->respcode, client->connection.sent_bytes,
rf, ua, (unsigned long)stayed);
free (ua);
free (rf);
free (rq);
free (un);
}
else
{
if (client->username == NULL) username = "-";
if (referrer == NULL) referrer = "-";
if (user_agent == NULL) user_agent = "-";
log_write_direct (accesslog->logid,
"%s - %s [%s] \"%s\" %d %" PRIu64 " \"%.50s\" \"%.50s\" %lu",
ip, username, datebuf, reqbuf, client->respcode, client->connection.sent_bytes,
referrer, user_agent, (unsigned long)stayed);
}
client->respcode = -1;
}
@ -183,7 +199,7 @@ void logging_playlist(const char *mount, const char *metadata, long listeners)
localtime_r (&now, &thetime);
/* build the data */
#ifdef _WIN32
#ifdef _MSC_VER
memset(datebuf, '\000', sizeof(datebuf));
get_clf_time(datebuf, sizeof(datebuf)-1, &thetime);
#else

View File

@ -85,7 +85,7 @@ extern int errorlog;
#define LOGGING_FORMAT_CLF "%d/%b/%Y:%H:%M:%S %z"
void logging_access_id (access_log *accesslog, client_t *client);
void logging_access_id (struct access_log *accesslog, client_t *client);
void logging_access(client_t *client);
void logging_playlist(const char *mount, const char *metadata, long listeners);
int restart_logging (ice_config_t *config);

View File

@ -20,9 +20,6 @@
#include <errno.h>
#ifdef WIN32
#define _WIN32_WINNT 0x0400
/* For getpid() */
//#include <winsock2.h>
#include <process.h>
#include <windows.h>
#endif
@ -93,7 +90,7 @@ static void _print_usage(void)
}
void _initialize_subsystems(void)
void initialize_subsystems(void)
{
log_initialize();
errorlog = log_open_file (stderr);
@ -112,10 +109,10 @@ void _initialize_subsystems(void)
#endif
}
void _shutdown_subsystems(void)
void shutdown_subsystems(void)
{
connection_shutdown();
auth_shutdown();
slave_shutdown();
fserve_shutdown();
stats_shutdown();
@ -212,9 +209,17 @@ static int _server_proc_init(void)
return 1;
}
/* this is the heart of the beast */
static void _server_proc(void)
void server_process (void)
{
INFO1 ("%s server started", ICECAST_VERSION_STRING);
global.running = ICE_RUNNING;
/* Do this after logging init */
auth_initialise ();
if (background)
{
fclose (stdin);
@ -222,8 +227,8 @@ static void _server_proc(void)
fclose (stderr);
}
slave_initialize();
connection_setup_sockets (NULL);
INFO0("Shutting down");
auth_shutdown();
}
/* chroot the process. Watch out - we need to do this before starting other
@ -258,7 +263,7 @@ static void _ch_root_uid_setup(void)
}
#endif
#ifdef CHROOT
#ifdef HAVE_CHROOT
if (conf->chroot)
{
if(getuid()) /* root check */
@ -303,115 +308,97 @@ static void _ch_root_uid_setup(void)
#endif
}
#ifdef WIN32_SERVICE
int mainService(int argc, char **argv)
#else
int main(int argc, char **argv)
#endif
int server_init (int argc, char *argv[])
{
int res, ret;
int ret;
char filename[512];
char pbuf[1024];
/* parse the '-c icecast.xml' option
** only, so that we can read a configfile
*/
res = _parse_config_opts(argc, argv, filename, 512);
if (res == 1) {
#if !defined(_WIN32) || defined(_CONSOLE)
/* startup all the modules */
_initialize_subsystems();
#endif
/* parse the config file */
config_get_config();
ret = config_initial_parse_file(filename);
config_release_config();
if (ret < 0) {
snprintf(pbuf, sizeof(pbuf)-1,
"FATAL: error parsing config file (%s)", filename);
_fatal_error(pbuf);
switch (ret) {
case CONFIG_EINSANE:
_fatal_error("filename was null or blank");
break;
case CONFIG_ENOROOT:
_fatal_error("no root element found");
break;
case CONFIG_EBADROOT:
_fatal_error("root element is not <icecast>");
break;
default:
_fatal_error("XML config parsing error");
break;
}
#if !defined(_WIN32) || defined(_CONSOLE)
_shutdown_subsystems();
#endif
switch (_parse_config_opts (argc, argv, filename, 512))
{
case -1:
_print_usage();
return -1;
}
} else if (res == -1) {
_print_usage();
return -1;
default:
/* parse the config file */
config_get_config();
ret = config_initial_parse_file(filename);
config_release_config();
if (ret < 0)
{
snprintf (pbuf, sizeof(pbuf),
"FATAL: error parsing config file (%s)", filename);
_fatal_error (pbuf);
switch (ret)
{
case CONFIG_EINSANE:
_fatal_error("filename was null or blank");
break;
case CONFIG_ENOROOT:
_fatal_error("no root element found");
break;
case CONFIG_EBADROOT:
_fatal_error("root element is not <icecast>");
break;
default:
_fatal_error("XML config parsing error");
break;
}
return -1;
}
}
/* override config file options with commandline options */
config_parse_cmdline(argc, argv);
/* Bind socket, before we change userid */
if(!_server_proc_init()) {
if (_server_proc_init() == 0)
{
_fatal_error("Server startup failed. Exiting");
_shutdown_subsystems();
return -1;
}
_ch_root_uid_setup(); /* Change user id and root if requested/possible */
fserve_initialize();
#ifdef CHUID
/* We'll only have getuid() if we also have setuid(), it's reasonable to
* assume */
if(!getuid()) /* Running as root! Don't allow this */
if (getuid() == 0) /* Running as root! Don't allow this */
{
fprintf(stderr, "ERROR: You should not run icecast2 as root\n");
fprintf(stderr, "Use the changeowner directive in the config file\n");
_shutdown_subsystems();
return 1;
fprintf (stderr, "ERROR: You should not run icecast2 as root\n");
fprintf (stderr, "Use the changeowner directive in the config file\n");
return -1;
}
#endif
/* setup default signal handlers */
sighandler_initialize();
if (start_logging (config_get_config_unlocked()) < 0)
{
_fatal_error("FATAL: Could not start logging");
_shutdown_subsystems();
return -1;
}
return 0;
}
INFO1 ("%s server started", ICECAST_VERSION_STRING);
/* REM 3D Graphics */
#ifndef _WIN32
int main (int argc, char *argv[])
{
initialize_subsystems();
/* let her rip */
global.running = ICE_RUNNING;
if (server_init (argc, argv) == 0)
server_process();
/* Do this after logging init */
auth_initialise ();
shutdown_subsystems();
_server_proc();
INFO0("Shutting down");
#if !defined(_WIN32) || defined(_CONSOLE)
_shutdown_subsystems();
#endif
if (pidfile)
{
remove (pidfile);
free (pidfile);
}
return 0;
}
#endif

View File

@ -42,6 +42,7 @@
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include "global.h"
static void MD5Transform(uint32_t buf[4], uint32_t const in[HASH_LEN]);

View File

@ -22,6 +22,7 @@
#include "compat.h"
#include "mpeg.h"
#include "format_mp3.h"
#include "global.h"
#define CATMODULE "mpeg"
#include "logging.h"
@ -70,8 +71,6 @@ static int handle_aac_frame (struct mpeg_sync *mp, unsigned char *p, int len)
if (mp->frame_callback)
if (mp->frame_callback (mp, s, raw_frame_len) < 0)
return -1;
memcpy (mp->raw->data + mp->raw_offset, s, raw_frame_len);
mp->raw_offset += raw_frame_len;
}
return frame_len;
}
@ -163,8 +162,6 @@ static int handle_mpeg_frame (struct mpeg_sync *mp, unsigned char *p, int remain
if (mp->frame_callback)
if (mp->frame_callback (mp, p, frame_len) < 0)
return -1;
memcpy (mp->raw->data + mp->raw_offset, p, frame_len);
mp->raw_offset += frame_len;
}
return frame_len;
}
@ -242,31 +239,31 @@ static int check_for_mp3 (struct mpeg_sync *mp, unsigned char *p, unsigned remai
samplerate = get_mpegframe_samplerate (p);
if (samplerate == 0)
return -1;
while (checking)
do
{
int frame_len = get_mpeg_frame_length (mp, fh);
int frame_len;
if (remaining <= 4)
return 0;
if (fh [0] != 255 || fh [1] != p[1])
return -1;
frame_len = get_mpeg_frame_length (mp, fh);
if (frame_len <= 0 || frame_len > 3000)
{
//DEBUG2 ("checking frame %d, but len %d invalid", 5-checking, frame_len);
return -1;
}
if (frame_len+4 >= remaining)
if (frame_len > remaining)
{
//DEBUG3 ("checking frame %d, but need more data (%d,%d)", 5-checking, frame_len, remaining);
return 0;
}
if (samplerate != get_mpegframe_samplerate (fh+frame_len))
if (samplerate != get_mpegframe_samplerate (fh))
return -1;
if (fh[frame_len] != 255 || fh[frame_len+1] != p[1])
{
//DEBUG4 ("checking frame %d, but code is %x %x %x", 5-checking, fh[frame_len], fh[frame_len+1], fh[frame_len+2]);
return -1;
}
//DEBUG4 ("frame %d checked, next header codes are %x %x %x", 5-checking, fh[frame_len], fh[frame_len+1], fh[frame_len+2]);
remaining -= frame_len;
fh += frame_len;
checking--;
}
} while (--checking);
mp->samplerate = samplerate;
if (((p[3] & 0xC0) >> 6) == 3)
mp->channels = 1;
@ -375,20 +372,23 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset)
mp->sample_count = 0;
if (mp->surplus)
{
int new_len = mp->surplus->len + new_block->len;
unsigned char *p = realloc (mp->surplus->data, new_len);
if (offset >= mp->surplus->len)
offset -= mp->surplus->len;
else
{
int new_len = mp->surplus->len + new_block->len;
unsigned char *p = realloc (mp->surplus->data, new_len);
memcpy (p+mp->surplus->len, new_block->data, new_block->len);
mp->surplus->data = new_block->data;
new_block->data = (void*)p;
new_block->len = new_len;
memcpy (p+mp->surplus->len, new_block->data, new_block->len);
mp->surplus->data = new_block->data;
new_block->data = (void*)p;
new_block->len = new_len;
}
refbuf_release (mp->surplus);
mp->surplus = NULL;
}
start = (unsigned char *)new_block->data + offset;
remaining = new_block->len - offset;
if (mp->raw)
mp->raw_offset = 0;
while (1)
{
end = (unsigned char*)new_block->data + new_block->len;
@ -398,7 +398,6 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset)
break;
if (!is_sync_byte (mp, start))
{
// need to resync
int ret = find_align_sync (mp, start, remaining);
if (ret == 0)
break; // no sync in the rest, so dump it
@ -422,7 +421,7 @@ int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset)
{
if (remaining > 20000)
return -1;
new_block->len = 0;
new_block->len = offset;
return remaining;
}
}
@ -474,5 +473,6 @@ void mpeg_cleanup (mpeg_sync *mpsync)
{
refbuf_release (mpsync->surplus);
refbuf_release (mpsync->raw);
mpsync->mount = NULL;
}
}

View File

@ -20,12 +20,12 @@
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "global.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "logging.h"
#include "event.h"
#include "global.h"
#define CATMODULE "sighandler"

View File

@ -60,22 +60,14 @@
#include "format.h"
#include "event.h"
#include "yp.h"
#include "slave.h"
#define CATMODULE "slave"
typedef struct _redirect_host
{
struct _redirect_host *next;
time_t next_update;
char *server;
int port;
} redirect_host;
static void _slave_thread(void);
static void redirector_add (const char *server, int port, int interval);
static redirect_host *find_slave_host (const char *server, int port);
static void redirector_clearall (void);
static int relay_startup (client_t *client);
static int relay_read (client_t *client);
static void relay_release (client_t *client);
@ -162,7 +154,6 @@ void slave_update_all_mounts (void)
void slave_restart (void)
{
restart_connection_thread = 1;
redirector_clearall();
slave_update_all_mounts ();
streamlist_check = 0;
}
@ -370,6 +361,8 @@ static int open_relay_connection (client_t *client, relay_server *relay, relay_s
else
INFO3 ("connecting to %s:%d for %s", server, port, relay->localmount);
con->con_time = time (NULL);
relay->in_use = master;
streamsock = sock_connect_wto_bind (server, port, bind, timeout);
free (bind);
if (connection_init (con, streamsock, server) < 0)
@ -446,6 +439,8 @@ static int open_relay_connection (client_t *client, relay_server *relay, relay_s
if (parser)
httpp_destroy (parser);
connection_close (con);
con->con_time = time (NULL);
if (relay->in_use) relay->in_use->skip = 1;
return -1;
}
@ -462,6 +457,11 @@ int open_relay (relay_server *relay)
{
int ret;
if (master->skip)
{
INFO3 ("skipping %s:%d for %s", master->ip, master->port, relay->localmount);
continue;
}
thread_mutex_unlock (&src->lock);
ret = open_relay_connection (client, relay, master);
thread_mutex_lock (&src->lock);
@ -531,6 +531,7 @@ static void *start_relay_stream (void *arg)
src->yp_public = -1;
}
relay->in_use = NULL;
INFO2 ("listener count remaining on %s is %d", src->mount, src->listeners);
src->flags &= ~SOURCE_PAUSE_LISTENERS;
thread_mutex_unlock (&src->lock);
@ -968,6 +969,7 @@ static void slave_startup (void)
update_settings = 0;
update_all_mounts = 0;
redirector_setup (config);
update_master_as_slave (config);
stats_global (config);
workers_adjust (config->workers_count);
@ -1058,7 +1060,7 @@ relay_server *slave_find_relay (relay_server *relays, const char *mount)
/* drop all redirection details.
*/
static void redirector_clearall (void)
void redirector_clearall (void)
{
thread_rwlock_wlock (&slaves_lock);
while (redirectors)
@ -1073,6 +1075,21 @@ static void redirector_clearall (void)
thread_rwlock_unlock (&slaves_lock);
}
void redirector_setup (ice_config_t *config)
{
redirect_host *redir = config->redirect_hosts;
thread_rwlock_wlock (&slaves_lock);
while (redir)
{
redirector_add (redir->server, redir->port, 0);
redir = redir->next;
}
thread_rwlock_unlock (&slaves_lock);
}
/* Add new redirectors or update any existing ones
*/
void redirector_update (client_t *client)
@ -1096,7 +1113,15 @@ void redirector_update (client_t *client)
redirect = find_slave_host (rserver, rport);
if (redirect == NULL)
{
redirector_add (rserver, rport, interval);
ice_config_t *config = config_get_config();
unsigned int allowed = config->max_redirects;
config_release_config();
if (global.redirect_count < allowed)
redirector_add (rserver, rport, interval);
else
INFO2 ("redirect to slave limit reached (%d, %d)", global.redirect_count, allowed);
}
else
{
@ -1126,18 +1151,7 @@ static redirect_host *find_slave_host (const char *server, int port)
static void redirector_add (const char *server, int port, int interval)
{
ice_config_t *config = config_get_config();
unsigned int allowed = config->max_redirects;
redirect_host *redirect;
config_release_config();
if (global.redirect_count >= allowed)
{
INFO2 ("redirect to slave limit reached (%d, %d)", global.redirect_count, allowed);
return;
}
redirect = calloc (1, sizeof (redirect_host));
redirect_host *redirect = calloc (1, sizeof (redirect_host));
if (redirect == NULL)
abort();
redirect->server = strdup (server);
@ -1174,6 +1188,17 @@ static relay_server *get_relay_details (client_t *client)
}
static void relay_reset (relay_server *relay)
{
relay_server_master *server = relay->masters;
source_clear_source (relay->source);
for (; server; server = server->next)
server->skip = 0;
INFO1 ("servers to be retried on %s", relay->localmount);
}
static int relay_read (client_t *client)
{
relay_server *relay = get_relay_details (client);
@ -1185,30 +1210,31 @@ static int relay_read (client_t *client)
if (relay->cleanup) relay->running = 0;
if (relay->running == 0)
source->flags &= ~SOURCE_RUNNING;
if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 2000000)
if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 1000000)
source->flags &= ~SOURCE_RUNNING;
return source_read (source);
}
if ((source->flags & SOURCE_TERMINATING) == 0)
{
/* this section is for once through code */
int fallback = 1;
if (client->connection.con_time)
{
if (relay->running)
if (relay->running && relay->in_use)
fallback = 0;
if (client->worker->current_time.tv_sec - client->connection.con_time < 3)
if (client->worker->current_time.tv_sec - client->connection.con_time < 60)
{
/* force a delayed restart if stream cannot be maintained for 3 seconds, by
* which time any listeners in restart wait would of come back on */
source->flags &= ~SOURCE_PAUSE_LISTENERS;
client->connection.con_time = 0;
fallback = 1;
/* force a server skip if a stream cannot be maintained for 1 min */
WARN1 ("stream for %s died too quickly, skipping server for now", relay->localmount);
if (relay->in_use) relay->in_use->skip = 1;
}
else
relay_reset (relay); // spent some time on this so give other servers a chance
if (source->flags & SOURCE_TIMEOUT)
{
WARN1 ("stream for %s timed out, skipping server for now", relay->localmount);
if (relay->in_use) relay->in_use->skip = 1;
}
global_lock();
global.sources--;
stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock();
global_reduce_bitrate_sampling (global.out_bitrate);
}
/* don't pause listeners if relay shutting down */
if (relay->running == 0)
@ -1219,11 +1245,26 @@ static int relay_read (client_t *client)
if (source->termination_count && source->termination_count <= source->listeners)
{
client->schedule_ms = client->worker->time_ms + 150;
DEBUG3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount);
if (client->worker->current_time.tv_sec - client->timer_start > 2)
{
client->schedule_ms += 400;
WARN3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount);
}
else
DEBUG3 ("counts are %lu and %lu (%s)", source->termination_count, source->listeners, source->mount);
thread_mutex_unlock (&source->lock);
return 0;
}
DEBUG1 ("all listeners have now been checked on %s", relay->localmount);
if (client->connection.con_time)
{
global_lock();
global.sources--;
stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock();
global_reduce_bitrate_sampling (global.out_bitrate);
}
client->timer_start = 0;
client->parser = NULL;
free (source->fallback.mount);
source->fallback.mount = NULL;
@ -1241,6 +1282,7 @@ static int relay_read (client_t *client)
return 0; /* listeners may be paused, recheck and let them leave this stream */
}
INFO1 ("shutting down relay %s", relay->localmount);
stats_event_args (source->mount, "listeners", "%lu", source->listeners);
thread_mutex_unlock (&source->lock);
stats_event (relay->localmount, NULL, NULL);
slave_update_all_mounts();
@ -1249,11 +1291,11 @@ static int relay_read (client_t *client)
do {
if (relay->running)
{
if (client->connection.con_time)
if (client->connection.con_time && relay->in_use)
{
INFO1 ("standing by to restart relay on %s", relay->localmount);
if (relay->on_demand && source->listeners == 0)
source_clear_source (source);
relay_reset (relay);
break;
}
else
@ -1268,8 +1310,11 @@ static int relay_read (client_t *client)
client->schedule_ms = client->worker->time_ms + 3600000;
}
source->flags &= ~SOURCE_ON_DEMAND;
source_clear_source (source);
relay_reset (relay);
} while (0);
stats_event_args (source->mount, "listeners", "%lu", source->listeners);
client->connection.con_time = 0;
source->stats = 0;
stats_event (relay->localmount, NULL, NULL);
slave_update_all_mounts();
@ -1343,37 +1388,45 @@ static int relay_startup (client_t *client)
if (relay->on_demand)
{
source_t *src = relay->source;
int start_relay = src->listeners; // 0 or non-zero
source_t *source = relay->source;
int fallback_def = 0, start_relay = source->listeners; // 0 or non-zero
mount_proxy *mountinfo = config_find_mount (config_get_config(), source->mount);
src->flags |= SOURCE_ON_DEMAND;
if (worker->current_time.tv_sec % 10 == 0)
source->flags |= SOURCE_ON_DEMAND;
if (mountinfo && mountinfo->fallback_mount)
{
mount_proxy *mountinfo = config_find_mount (config_get_config(), src->mount);
if (mountinfo && mountinfo->fallback_mount)
source_t *fallback;
avl_tree_rlock (global.source_tree);
fallback = source_find_mount (mountinfo->fallback_mount);
fallback_def = 1;
if (fallback)
{
source_t *fallback;
avl_tree_rlock (global.source_tree);
fallback = source_find_mount (mountinfo->fallback_mount);
if (fallback)
{
if (strcmp (fallback->mount, src->mount) != 0)
{
// if there are listeners not already being moved
if (fallback->listeners && fallback->fallback.mount == NULL)
start_relay = 1;
}
}
if (strcmp (fallback->mount, source->mount) != 0 && fallback->listeners)
start_relay = 1;
avl_tree_unlock (global.source_tree);
}
config_release_config();
else
{
fbinfo finfo;
avl_tree_unlock (global.source_tree);
finfo.flags = FS_FALLBACK;
finfo.mount = (char *)mountinfo->fallback_mount;
finfo.fallback = NULL;
// need to check for listeners on the fallback
if (fserve_query_count (&finfo) > 0)
start_relay = 1; // force a start if there is fallback defined
}
}
config_release_config();
if (start_relay == 0)
{
client->schedule_ms = worker->time_ms + 60000;
client->schedule_ms = worker->time_ms + (fallback_def ? (relay->interval*1000) : 60000);
return 0;
}
INFO1 ("Detected listeners on relay %s", relay->localmount);
INFO1 ("starting on-demand relay %s", relay->localmount);
}
/* limit the number of relays starting up at the same time */

View File

@ -22,6 +22,8 @@ void slave_update_all_mounts (void);
void slave_rebuild_mounts (void);
relay_server *slave_find_relay (relay_server *relays, const char *mount);
int redirect_client (const char *mountpoint, client_t *client);
void redirector_clearall (void);
void redirector_setup (ice_config_t *config);
void redirector_update (struct _client_tag *client);
relay_server *relay_free (relay_server *relay);

View File

@ -149,8 +149,10 @@ source_t *source_reserve (const char *mount)
src->listener_send_trigger = 10000;
src->format = calloc (1, sizeof(format_plugin_t));
src->clients = avl_tree_new (client_compare, NULL);
src->stats = stats_handle (mount);
thread_mutex_create (&src->lock);
stats_release (src->stats);
avl_insert (global.source_tree, src);
@ -361,22 +363,22 @@ static void update_source_stats (source_t *source)
unsigned long kbytes_read = source->bytes_read_since_update/1024;
source->format->sent_bytes += kbytes_sent*1024;
stats_event_args (source->mount, "outgoing_kbitrate", "%ld",
source->stats = stats_lock (source->stats, source->mount);
stats_set_args (source->stats, "outgoing_kbitrate", "%ld",
(long)(8 * rate_avg (source->format->out_bitrate))/1024);
stats_event_args (source->mount, "incoming_bitrate", "%ld", (8 * incoming_rate));
stats_event_args (source->mount, "total_bytes_read",
"%"PRIu64, source->format->read_bytes);
stats_event_args (source->mount, "total_bytes_sent",
"%"PRIu64, source->format->sent_bytes);
stats_event_args (source->mount, "total_mbytes_sent",
stats_set_args (source->stats, "incoming_bitrate", "%ld", (8 * incoming_rate));
stats_set_args (source->stats, "total_bytes_read", "%"PRIu64, source->format->read_bytes);
stats_set_args (source->stats, "total_bytes_sent", "%"PRIu64, source->format->sent_bytes);
stats_set_args (source->stats, "total_mbytes_sent",
"%"PRIu64, source->format->sent_bytes/(1024*1024));
stats_event_args (source->mount, "queue_size", "%u", source->queue_size);
stats_set_args (source->stats, "queue_size", "%u", source->queue_size);
if (source->client->connection.con_time)
{
worker_t *worker = source->client->worker;
stats_event_args (source->mount, "connected", "%"PRIu64,
stats_set_args (source->stats, "connected", "%"PRIu64,
(uint64_t)(worker->current_time.tv_sec - source->client->connection.con_time));
}
stats_release (source->stats);
stats_event_add (NULL, "stream_kbytes_sent", kbytes_sent);
stats_event_add (NULL, "stream_kbytes_read", kbytes_read);
@ -443,17 +445,6 @@ int source_read (source_t *source)
if (source_change_worker (source))
return 1;
}
if (source->limit_rate)
{
if (source->limit_rate < (8 * source->incoming_rate))
{
rate_add (source->format->in_bitrate, 0, current);
source->skip_duration += 300;
client->schedule_ms += 150;
thread_mutex_unlock (&source->lock);
return 0;
}
}
fds = util_timed_wait_for_fd (client->connection.sock, 0);
if (fds < 0)
{
@ -473,13 +464,14 @@ int source_read (source_t *source)
DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
source->timeout, (long)current);
WARN1 ("Disconnecting %s due to socket timeout", source->mount);
source->flags &= ~(SOURCE_RUNNING|SOURCE_PAUSE_LISTENERS);
source->flags &= ~SOURCE_RUNNING;
source->flags |= SOURCE_TIMEOUT;
skip = 0;
break;
}
source->skip_duration = (int)(source->skip_duration * 1.6);
if (source->skip_duration > 700)
source->skip_duration = 700;
source->skip_duration = (int)(source->skip_duration * 1.3);
if (source->skip_duration > 400)
source->skip_duration = 400;
break;
}
source->skip_duration = (long)(source->skip_duration * 0.9);
@ -589,7 +581,18 @@ static int source_client_read (client_t *client)
INFO1 ("streaming duration expired on %s", source->mount);
}
if (source_running (source))
{
if (source->limit_rate)
{
source->incoming_rate = (long)rate_avg (source->format->in_bitrate);
if (source->limit_rate < (8 * source->incoming_rate))
{
rate_add (source->format->in_bitrate, 0, client->worker->current_time.tv_sec);
client->schedule_ms += 200;
}
}
return source_read (source);
}
else
{
if ((source->flags & SOURCE_TERMINATING) == 0)
@ -612,6 +615,7 @@ static int source_client_read (client_t *client)
return 0;
}
INFO1 ("no more listeners on %s", source->mount);
stats_event_args (source->mount, "listeners", "%lu", source->listeners);
client->connection.discon_time = 0;
client->ops = &source_client_halt_ops;
free (source->fallback.mount);
@ -705,10 +709,11 @@ static int locate_start_on_queue (source_t *source, client_t *client)
}
static int http_source_intro (client_t *client)
static int http_source_introfile (client_t *client)
{
source_t *source = client->shared_data;
//DEBUG2 ("client intro_pos is %ld, sent bytes is %ld", client->intro_offset, client->connection.sent_bytes);
if (format_file_read (client, source->format, source->intro_file) < 0)
{
if (source->stream_data_tail)
@ -726,6 +731,21 @@ static int http_source_intro (client_t *client)
}
static int http_source_intro (client_t *client)
{
/* we only need to send the intro if nothing else has been sent */
if (client->connection.sent_bytes > 0)
{
client_set_queue (client, NULL);
client->check_buffer = source_queue_advance;
return source_queue_advance (client);
}
client->intro_offset = 0;
client->check_buffer = http_source_introfile;
return http_source_introfile (client);
}
static int http_source_listener (client_t *client)
{
refbuf_t *refbuf = client->refbuf;
@ -818,11 +838,17 @@ static int wait_for_restart (client_t *client)
{
source_t *source = client->shared_data;
if (source_running (source) || (source->flags & SOURCE_PAUSE_LISTENERS) == 0 || client->connection.error)
if (client->worker->current_time.tv_sec - client->timer_start > 15)
client->connection.error = 1; // in here too long, drop client
if (source_running (source) || client->connection.error ||
(source->flags & SOURCE_PAUSE_LISTENERS) == 0 ||
(source->flags & (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC)))
{
client->ops = &listener_client_ops;
return 0;
}
if (source->flags & SOURCE_LISTENERS_SYNC)
client->schedule_ms = client->worker->time_ms + 100;
else
@ -895,6 +921,7 @@ int listener_waiting_on_source (source_t *source, client_t *client)
client->ops = &listener_pause_ops;
client->flags |= CLIENT_HAS_MOVED;
client->schedule_ms = client->worker->time_ms + 60;
client->timer_start = client->worker->current_time.tv_sec;
return 0;
}
return -1;
@ -911,7 +938,7 @@ static int send_listener (source_t *source, client_t *client)
{
int bytes;
int loop = 8; /* max number of iterations in one go */
long total_written = 0;
long total_written = 0, limiter = source->listener_send_trigger;
int ret = 0, lag;
if (source->flags & SOURCE_LISTENERS_SYNC)
@ -941,6 +968,9 @@ static int send_listener (source_t *source, client_t *client)
lag = source->client->queue_pos - client->queue_pos;
if (source->incoming_rate && lag < source->incoming_rate)
limiter = source->incoming_rate/2;
/* progessive slowdown if nearing max bandwidth. */
if (global.max_rate)
{
@ -961,9 +991,6 @@ static int send_listener (source_t *source, client_t *client)
client->schedule_ms += 150;
}
}
if (source->incoming_rate > 1 && lag < (source->incoming_rate<<1))
loop = lag / (source->incoming_rate>>1);
while (1)
{
/* jump out if client connection has died */
@ -974,7 +1001,7 @@ static int send_listener (source_t *source, client_t *client)
}
/* lets not send too much to one client in one go, but don't
sleep for too long if more data can be sent */
if (loop == 0 || total_written > source->listener_send_trigger)
if (loop == 0 || total_written > limiter)
{
client->schedule_ms = client->worker->time_ms + 15;
break;
@ -1035,6 +1062,7 @@ void source_init (source_t *source)
stats_event_flags (source->mount, "total_bytes_read", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "outgoing_kbitrate", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "incoming_bitrate", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "queue_size", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "connected", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "source_ip", source->client->connection.ip, STATS_COUNTERS);
@ -1055,7 +1083,7 @@ void source_init (source_t *source)
if (str)
{
_parse_audio_info (source, str);
stats_event (source->mount, "audio_info", str);
stats_event_flags (source->mount, "audio_info", str, STATS_GENERAL);
}
}
source->format->in_bitrate = rate_setup (60, 1);
@ -1094,9 +1122,10 @@ void source_init (source_t *source)
}
void source_set_override (const char *mount, const char *dest)
int source_set_override (const char *mount, const char *dest)
{
source_t *source;
int ret = 0;
avl_tree_rlock (global.source_tree);
source = source_find_mount (mount);
@ -1111,13 +1140,20 @@ void source_set_override (const char *mount, const char *dest)
source->fallback.mount = strdup (dest);
source->termination_count = source->listeners;
source->flags |= SOURCE_LISTENERS_SYNC;
ret = 1;
}
thread_mutex_unlock (&source->lock);
}
avl_tree_unlock (global.source_tree);
if (ret)
INFO2 ("moving from %s to %s", mount, dest);
}
else
fserve_set_override (mount, dest);
avl_tree_unlock (global.source_tree);
{
avl_tree_unlock (global.source_tree);
ret = fserve_set_override (mount, dest);
}
return ret;
}
@ -1127,26 +1163,37 @@ void source_set_fallback (source_t *source, const char *dest_mount)
client_t *client = source->client;
time_t connected = client->worker->current_time.tv_sec - client->connection.con_time;
if (dest_mount == NULL)
if (dest_mount == NULL || source->listeners == 0)
{
INFO1 ("Skipping fallback on %s, no listeners", source->mount);
return;
if (connected > 20)
bitrate = (int)(rate_avg (source->format->in_bitrate) * 1.02);
}
if (connected > 40)
bitrate = (int)rate_avg (source->format->in_bitrate);
//bitrate = (int)(rate_avg (source->format->in_bitrate) * 1.02);
if (bitrate == 0 && source->limit_rate)
bitrate = source->limit_rate;
source->fallback.flags = FS_FALLBACK;
source->fallback.mount = strdup (dest_mount);
source->fallback.limit = bitrate;
INFO4 ("fallback set on %s to %s(%d) with %d listeners", source->mount, dest_mount,
source->fallback.limit, source->listeners);
if (source->listeners)
{
source->fallback.flags = FS_FALLBACK;
source->fallback.mount = strdup (dest_mount);
source->fallback.limit = bitrate;
INFO4 ("fallback set on %s to %s(%d) with %d listeners", source->mount, dest_mount,
source->fallback.limit, source->listeners);
}
else
INFO4 ("Skipping fallback to %s on %s (bitrate %d, listeners %d)", dest_mount,
source->mount, bitrate, source->listeners);
}
void source_shutdown (source_t *source, int with_fallback)
{
mount_proxy *mountinfo;
INFO1("Source \"%s\" exiting", source->mount);
source->flags &= ~SOURCE_ON_DEMAND;
source->flags &= ~(SOURCE_ON_DEMAND|SOURCE_TIMEOUT);
source->termination_count = source->listeners;
mountinfo = config_find_mount (config_get_config(), source->mount);
if (source->client->connection.con_time)
@ -1163,6 +1210,7 @@ void source_shutdown (source_t *source, int with_fallback)
if (mountinfo && with_fallback && global.running == ICE_RUNNING)
source_set_fallback (source, mountinfo->fallback_mount);
config_release_config();
source->client->timer_start = source->client->worker->current_time.tv_sec;
source->flags |= (SOURCE_TERMINATING | SOURCE_LISTENERS_SYNC);
}
@ -1186,13 +1234,13 @@ static void _parse_audio_info (source_t *source, const char *s)
char name[100], value[200];
int n = sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
if (n == 2 && strncmp (name, "ice-", 4) == 0)
if (n == 2 && (strncmp (name, "ice-", 4) == 0 || strncmp (name, "bitrate=", 7) == 0))
{
char *esc = util_url_unescape (value);
if (esc)
{
util_dict_set (source->audio_info, name, esc);
stats_event (source->mount, name, esc);
stats_event_flags (source->mount, name, esc, STATS_COUNTERS);
}
free (esc);
}
@ -1215,7 +1263,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
INFO2 ("Applying mount information for \"%s\" from \"%s\"",
source->mount, mountinfo->mountname);
stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners);
/* if a setting is available in the mount details then use it, else
* check the parser details. */
@ -1246,7 +1294,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
} while (0);
val = atoi (str);
}
stats_event_args (source->mount, "public", "%d", val);
stats_set_args (source->stats, "public", "%d", val);
if (source->yp_public != val)
{
DEBUG1 ("YP changed to %d", val);
@ -1259,7 +1307,7 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
/* stream name */
if (mountinfo && mountinfo->stream_name)
stats_event (source->mount, "server_name", mountinfo->stream_name);
stats_set (source->stats, "server_name", mountinfo->stream_name);
else
{
do {
@ -1272,12 +1320,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
str = "Unspecified name";
} while (0);
if (source->format)
stats_event_conv (source->mount, "server_name", str, source->format->charset);
stats_set_conv (source->stats, "server_name", str, source->format->charset);
}
/* stream description */
if (mountinfo && mountinfo->stream_description)
stats_event (source->mount, "server_description", mountinfo->stream_description);
stats_set (source->stats, "server_description", mountinfo->stream_description);
else
{
do {
@ -1289,12 +1337,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
if (str) break;
} while (0);
if (str && source->format)
stats_event_conv (source->mount, "server_description", str, source->format->charset);
stats_set_conv (source->stats, "server_description", str, source->format->charset);
}
/* stream URL */
if (mountinfo && mountinfo->stream_url)
stats_event (source->mount, "server_url", mountinfo->stream_url);
stats_set (source->stats, "server_url", mountinfo->stream_url);
else
{
do {
@ -1306,12 +1354,12 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
if (str) break;
} while (0);
if (str && source->format)
stats_event_conv (source->mount, "server_url", str, source->format->charset);
stats_set_conv (source->stats, "server_url", str, source->format->charset);
}
/* stream genre */
if (mountinfo && mountinfo->stream_genre)
stats_event (source->mount, "genre", mountinfo->stream_genre);
stats_set (source->stats, "genre", mountinfo->stream_genre);
else
{
do {
@ -1324,12 +1372,15 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
str = "various";
} while (0);
if (source->format)
stats_event_conv (source->mount, "genre", str, source->format->charset);
stats_set_conv (source->stats, "genre", str, source->format->charset);
}
/* stream bitrate */
if (mountinfo && mountinfo->bitrate)
{
str = mountinfo->bitrate;
stats_set (source->stats, "bitrate", str);
}
else
{
do {
@ -1339,23 +1390,24 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
if (str) break;
str = httpp_getvar (parser, "x-audiocast-bitrate");
} while (0);
if (str)
stats_set (source->stats, "bitrate", str);
}
stats_event (source->mount, "bitrate", str);
/* handle MIME-type */
if (mountinfo && mountinfo->type)
stats_event (source->mount, "server_type", mountinfo->type);
stats_set (source->stats, "server_type", mountinfo->type);
else
if (source->format && source->format->contenttype)
stats_event (source->mount, "server_type", source->format->contenttype);
stats_set (source->stats, "server_type", source->format->contenttype);
if (mountinfo && mountinfo->subtype)
stats_event (source->mount, "subtype", mountinfo->subtype);
stats_set (source->stats, "subtype", mountinfo->subtype);
if (mountinfo && mountinfo->auth)
stats_event (source->mount, "authenticator", mountinfo->auth->type);
stats_set (source->stats, "authenticator", mountinfo->auth->type);
else
stats_event (source->mount, "authenticator", NULL);
stats_set (source->stats, "authenticator", NULL);
source->limit_rate = 0;
if (mountinfo && mountinfo->limit_rate)
@ -1437,11 +1489,12 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy
source->min_queue_size = config->min_queue_size;
source->timeout = config->source_timeout;
source->default_burst_size = config->burst_size;
source->stats = stats_handle (source->mount);
len = strlen (config->hostname) + strlen(source->mount) + 16;
listen_url = alloca (len);
snprintf (listen_url, len, "http://%s:%d%s", config->hostname, config->port, source->mount);
stats_event_flags (source->mount, "listenurl", listen_url, STATS_COUNTERS);
stats_set_flags (source->stats, "listenurl", listen_url, STATS_COUNTERS);
source_apply_mount (source, mountinfo);
@ -1450,11 +1503,11 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy
if (source->flags & SOURCE_ON_DEMAND)
{
DEBUG0 ("on_demand set");
stats_event (source->mount, "on_demand", "1");
stats_event_args (source->mount, "listeners", "%ld", source->listeners);
stats_set (source->stats, "on_demand", "1");
stats_set_args (source->stats, "listeners", "%ld", source->listeners);
}
else
stats_event (source->mount, "on_demand", NULL);
stats_set (source->stats, "on_demand", NULL);
if (mountinfo)
{
@ -1465,23 +1518,24 @@ void source_update_settings (ice_config_t *config, source_t *source, mount_proxy
if (mountinfo->fallback_when_full)
DEBUG1 ("fallback_when_full to %u", mountinfo->fallback_when_full);
DEBUG1 ("max listeners to %d", mountinfo->max_listeners);
stats_event_args (source->mount, "max_listeners", "%d", mountinfo->max_listeners);
stats_event_flags (source->mount, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN);
stats_set_args (source->stats, "max_listeners", "%d", mountinfo->max_listeners);
stats_set_flags (source->stats, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN);
if (mountinfo->hidden)
{
stats_event_flags (source->mount, NULL, NULL, STATS_HIDDEN);
stats_set_flags (source->stats, NULL, NULL, STATS_HIDDEN);
DEBUG0 ("hidden from public");
}
else
stats_event_flags (source->mount, NULL, NULL, 0);
stats_set_flags (source->stats, NULL, NULL, 0);
}
else
{
DEBUG0 ("max listeners is not specified");
stats_event (source->mount, "max_listeners", "unlimited");
stats_event_flags (source->mount, "cluster_password", NULL, STATS_SLAVE);
stats_event_flags (source->mount, NULL, NULL, STATS_PUBLIC);
stats_set (source->stats, "max_listeners", "unlimited");
stats_set_flags (source->stats, "cluster_password", NULL, STATS_SLAVE);
stats_set_flags (source->stats, NULL, NULL, STATS_PUBLIC);
}
stats_release (source->stats);
DEBUG1 ("public set to %d", source->yp_public);
DEBUG1 ("queue size to %u", source->queue_size_limit);
DEBUG1 ("min queue size to %u", source->min_queue_size);
@ -1506,8 +1560,9 @@ static int source_client_callback (client_t *client)
agent = httpp_getvar (source->client->parser, "user-agent");
if (agent)
stats_event (source->mount, "user_agent", agent);
stats_event_flags (source->mount, "user_agent", agent, STATS_COUNTERS);
stats_event_inc(NULL, "source_client_connections");
client_set_queue (client, NULL);
source_init (source);
client->ops = &source_client_ops;
@ -1594,18 +1649,25 @@ void source_recheck_mounts (int update_all)
}
else if (update_all)
{
stats_event_flags (mount->mountname, NULL, NULL, mount->hidden?STATS_HIDDEN:0);
stats_event_args (mount->mountname, "listenurl", "http://%s:%d%s",
long stats = stats_handle (mount->mountname);
stats_set_flags (stats, NULL, NULL, mount->hidden?STATS_HIDDEN:0);
stats_set_args (stats, "listenurl", "http://%s:%d%s",
config->hostname, config->port, mount->mountname);
stats_event (mount->mountname, "listeners", "0");
stats_set (stats, "listeners", "0");
if (mount->max_listeners < 0)
stats_event (mount->mountname, "max_listeners", "unlimited");
stats_set (stats, "max_listeners", "unlimited");
else
stats_event_args (mount->mountname, "max_listeners", "%d", mount->max_listeners);
stats_set_args (stats, "max_listeners", "%d", mount->max_listeners);
stats_release (stats);
}
}
else
stats_event (mount->mountname, NULL, NULL);
{
// only delete these stats if no client is maintaining them
source = source_find_mount_raw (mount->mountname);
if (source == NULL)
stats_event (mount->mountname, NULL, NULL);
}
mount = mount->next;
}
@ -1717,7 +1779,6 @@ static int source_listener_release (source_t *source, client_t *client)
rate_reduce (source->format->out_bitrate, 1000);
stats_event_dec (NULL, "listeners");
stats_event_args (source->mount, "listeners", "%lu", source->listeners);
/* change of listener numbers, so reduce scope of global sampling */
global_reduce_bitrate_sampling (global.out_bitrate);
@ -1740,7 +1801,7 @@ static int source_listener_release (source_t *source, client_t *client)
int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client)
{
int loop = 10, bitrate = 0, do_process = 0;
int loop = 10, rate = 0, do_process = 0;
int within_limits;
source_t *source;
mount_proxy *minfo = mountinfo;
@ -1769,16 +1830,19 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl
}
avl_tree_unlock (global.source_tree);
if (minfo && minfo->limit_rate)
bitrate = minfo->limit_rate;
rate = minfo->limit_rate;
if (minfo == NULL || minfo->fallback_mount == NULL)
{
if (bitrate)
if (rate == 0)
if (sscanf (mount, "%*[^[][%d]", &rate) == 1)
rate = rate * 1000;
if (rate)
{
fbinfo f;
f.flags = FS_FALLBACK;
f.mount = (char *)mount;
f.fallback = NULL;
f.limit = bitrate/8;
f.limit = rate / 8;
if (move_listener (client, &f) == 0)
{
/* source dead but fallback to file found */
@ -1910,6 +1974,8 @@ void source_setup_listener (source_t *source, client_t *client)
client->shared_data = source;
client->queue_pos = 0;
client->mount = source->mount;
client->flags &= ~CLIENT_IN_FSERVE;
client->timer_start = client->worker->current_time.tv_sec;
client->check_buffer = http_source_listener;
// add client to the source

View File

@ -70,6 +70,7 @@ typedef struct source_tag
unsigned long bytes_sent_since_update;
unsigned long bytes_read_since_update;
int stats_interval;
long stats;
time_t last_read;
@ -86,6 +87,7 @@ typedef struct source_tag
#define SOURCE_PAUSE_LISTENERS (1<<3)
#define SOURCE_TERMINATING (1<<4)
#define SOURCE_LISTENERS_SYNC (1<<5)
#define SOURCE_TIMEOUT (1<<6)
#define source_available(x) (((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) && ((x)->flags & SOURCE_LISTENERS_SYNC) == 0)
#define source_running(x) ((x)->flags & SOURCE_RUNNING)
@ -109,7 +111,7 @@ void source_setup_listener (source_t *source, client_t *client);
void source_init (source_t *source);
void source_shutdown (source_t *source, int with_fallback);
void source_set_fallback (source_t *source, const char *dest_mount);
void source_set_override (const char *mount, const char *dest);
int source_set_override (const char *mount, const char *dest);
#define SOURCE_BLOCK_SYNC 01
#define SOURCE_BLOCK_RELEASE 02

View File

@ -21,6 +21,7 @@
#include <libxml/xmlmemory.h>
#include <libxml/parser.h>
#include <libxml/parserInternals.h>
#include <libxml/tree.h>
#include "thread/thread.h"
@ -596,7 +597,12 @@ static void process_source_event (stats_event_t *event)
}
if (event->action == STATS_EVENT_REMOVE && event->name == NULL)
{
avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
int fallback_stream = 0;
avl_tree_wlock (snode->stats_tree);
fallback_stream = _find_node (snode->stats_tree, "fallback") == NULL ? 1 : 0;
avl_tree_unlock (snode->stats_tree);
if (fallback_stream)
avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
avl_tree_unlock (_stats.source_tree);
return;
}
@ -645,7 +651,7 @@ static int stats_listeners_send (client_t *client)
if (refbuf == NULL)
{
client->schedule_ms = client->worker->time_ms + 100;
client->schedule_ms = client->worker->time_ms + 60;
break;
}
if (loop == 0 || total > 32768)
@ -667,14 +673,14 @@ static int stats_listeners_send (client_t *client)
if (listener->content_len)
WARN1 ("content length is %u", listener->content_len);
listener->recent_block = NULL;
client->schedule_ms = client->worker->time_ms + 100;
client->schedule_ms = client->worker->time_ms + 50;
break;
}
loop--;
}
else
{
client->schedule_ms = client->worker->time_ms + 250;
client->schedule_ms = client->worker->time_ms + 200;
break; /* short write, so stop for now */
}
}
@ -712,7 +718,11 @@ static void stats_listener_send (int mask, const char *fmt, ...)
while (listener)
{
if (listener->mask & mask)
int admuser = listener->mask & STATS_HIDDEN,
hidden = mask & STATS_HIDDEN,
flags = mask & ~STATS_HIDDEN;
if (admuser || (hidden == 0 && (flags & listener->mask)))
_add_stats_to_stats_client (listener->client, fmt, ap);
listener = listener->next;
}
@ -987,7 +997,6 @@ static void stats_client_release (client_t *client)
{
event_listener_t *listener, **trail;
INFO0 ("removing stats client");
thread_mutex_lock (&_stats.listeners_lock);
listener = _stats.event_listeners;
trail = &_stats.event_listeners;
@ -1034,7 +1043,7 @@ void stats_add_listener (client_t *client, int mask)
client_set_queue (client, NULL);
client->refbuf = refbuf_new (100);
snprintf (client->refbuf->data, 100,
"HTTP/1.0 200 OK\r\nCapability: streamlist\r\n\r\n");
"HTTP/1.0 200 OK\r\nCapability: streamlist stats\r\n\r\n");
client->refbuf->len = strlen (client->refbuf->data);
listener->content_len = client->refbuf->len;
listener->recent_block = client->refbuf;
@ -1263,6 +1272,8 @@ long stats_handle (const char *mount)
{
stats_source_t *src_stats;
if (mount == NULL)
return 0;
avl_tree_wlock (_stats.source_tree);
src_stats = _find_source(_stats.source_tree, mount);
if (src_stats == NULL)
@ -1284,11 +1295,14 @@ long stats_handle (const char *mount)
}
void stats_lock (long handle)
long stats_lock (long handle, const char *mount)
{
stats_source_t *src_stats = (stats_source_t *)handle;
if (src_stats)
if (src_stats == NULL)
src_stats = (stats_source_t*)stats_handle (mount);
else
avl_tree_wlock (src_stats->stats_tree);
return (long)src_stats;
}
@ -1352,29 +1366,88 @@ void stats_set_flags (long handle, const char *name, const char *value, int flag
}
void stats_set_conv(long handle, const char *name, const char *value, const char *charset)
static void stats_set_entity_decode (long handle, const char *name, const char *value)
{
const char *metadata = value;
xmlBufferPtr conv = xmlBufferCreate ();
if (charset && value)
xmlParserCtxtPtr parser = xmlNewParserCtxt();
if (parser)
{
xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset);
if (handle)
{
xmlBufferPtr raw = xmlBufferCreate ();
xmlBufferAdd (raw, (const xmlChar *)value, strlen (value));
if (xmlCharEncInFunc (handle, conv, raw) > 0)
metadata = (char *)xmlBufferContent (conv);
xmlBufferFree (raw);
xmlCharEncCloseFunc (handle);
}
else
WARN1 ("No charset found for \"%s\"", charset);
xmlChar *decoded = xmlStringDecodeEntities (parser,
(const xmlChar *) value, XML_SUBSTITUTE_BOTH, 0, 0, 0);
stats_set (handle, name, (void*)decoded);
xmlFreeParserCtxt (parser);
xmlFree (decoded);
return;
}
stats_set (handle, name, value);
}
void stats_set_conv (long handle, const char *name, const char *value, const char *charset)
{
if (charset)
{
xmlCharEncodingHandlerPtr encoding = xmlFindCharEncodingHandler (charset);
if (encoding)
{
xmlBufferPtr in = xmlBufferCreate ();
xmlBufferPtr conv = xmlBufferCreate ();
xmlBufferCCat (in, value);
if (xmlCharEncInFunc (encoding, conv, in) > 0)
stats_set_entity_decode (handle, name, (void*)xmlBufferContent (conv));
xmlBufferFree (in);
xmlBufferFree (conv);
xmlCharEncCloseFunc (encoding);
return;
}
WARN1 ("No charset found for \"%s\"", charset);
return;
}
stats_set_entity_decode (handle, name, value);
}
void stats_listener_to_xml (client_t *listener, xmlNodePtr parent)
{
const char *useragent;
char buf[30];
xmlNodePtr node = xmlNewChild (parent, NULL, XMLSTR("listener"), NULL);
snprintf (buf, sizeof (buf), "%lu", listener->connection.id);
xmlSetProp (node, XMLSTR("id"), XMLSTR(buf));
xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip));
useragent = httpp_getvar (listener->parser, "user-agent");
if (useragent && xmlCheckUTF8((unsigned char *)useragent))
{
xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(useragent));
xmlNewChild (node, NULL, XMLSTR("UserAgent"), str);
xmlFree (str);
}
if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE)
{
source_t *source = listener->shared_data;
snprintf (buf, sizeof (buf), "%"PRIu64, source->client->queue_pos - listener->queue_pos);
}
else
snprintf (buf, sizeof (buf), "0");
xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf));
if (listener->worker)
{
snprintf (buf, sizeof (buf), "%lu",
(unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time));
xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf));
}
if (listener->username)
{
xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(listener->username));
xmlNewChild (node, NULL, XMLSTR("username"), str);
xmlFree (str);
}
stats_set (handle, name, metadata);
xmlBufferFree (conv);
}

View File

@ -58,12 +58,14 @@ xmlDocPtr stats_get_xml(int flags, const char *show_mount);
char *stats_get_value(const char *source, const char *name);
long stats_handle (const char *mount);
void stats_lock (long handle);
long stats_lock (long handle, const char *mount);
void stats_release (long handle);
void stats_set (long handle, const char *name, const char *value);
void stats_set_args (long handle, const char *name, const char *format, ...);
void stats_set_flags (long handle, const char *name, const char *value, int flags);
void stats_set_conv (long handle, const char *name, const char *value, const char *charset);
void stats_listener_to_xml (client_t *listener, xmlNodePtr parent);
#endif /* __STATS_H__ */

View File

@ -17,6 +17,7 @@
#include <sys/types.h>
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
#ifndef _WIN32
#include <sys/time.h>
@ -40,6 +41,7 @@
#include "refbuf.h"
#include "connection.h"
#include "client.h"
#include "global.h"
#define CATMODULE "util"
@ -309,42 +311,39 @@ char *util_url_escape (const char *src)
return dst;
}
static int unescape_code (const char *src)
{
if (hex (src[0]) == -1 || hex (src[1]) == -1)
return -1;
return (hex (src[0]) << 4) + hex (src[1]);
}
char *util_url_unescape (const char *src)
{
int len = strlen(src);
char *decoded;
int i;
int i, v;
char *dst;
int done = 0;
decoded = calloc(1, len + 1);
dst = decoded;
for(i=0; i < len; i++) {
switch(src[i]) {
case '%':
if (i+2 >= len || hex(src[i+1]) == -1 || hex(src[i+2]) == -1)
{
/* no matching pattern so assume just the % */
*dst++ = '%';
}
else
{
*dst++ = hex(src[i+1]) * 16 + hex(src[i+2]);
i+= 2;
}
break;
case 0:
ERROR0("Fatal internal logic error in util_url_unescape()");
free(decoded);
return NULL;
default:
*dst++ = src[i];
break;
for(i=0; i < len; i++)
{
if (src[i] == '%' && i+2 < len)
{
v = unescape_code (src + i +1);
if (v >= 0 && isprint(v))
{
*dst++ = (char)v;
i += 2;
continue;
}
}
if(done)
break;
*dst++ = src[i];
}
*dst = 0; /* null terminator */
@ -352,6 +351,7 @@ char *util_url_unescape (const char *src)
return decoded;
}
/* Get an absolute path (from the webroot dir) from a URI. Return NULL if the
* path contains 'disallowed' sequences like foo/../ (which could be used to
* escape from the webroot) or if it cannot be URI-decoded.
@ -637,7 +637,7 @@ char *util_dict_urlencode(util_dict *dict, char delim)
return res;
}
#ifndef HAVE_LOCALTIME_R
#ifndef HAVE_DECL_LOCALTIME_R
struct tm *localtime_r (const time_t *timep, struct tm *result)
{
static mutex_t localtime_lock;

View File

@ -52,7 +52,7 @@ int util_dict_set(util_dict *dict, const char *key, const char *val);
const char *util_dict_get(util_dict *dict, const char *key);
char *util_dict_urlencode(util_dict *dict, char delim);
#ifndef HAVE_LOCALTIME_R
#ifndef HAVE_DECL_LOCALTIME_R
struct tm *localtime_r (const time_t *timep, struct tm *result);
#endif
char *util_conv_string (const char *string, const char *in_charset, const char *out_charset);

View File

@ -46,6 +46,7 @@
#include "client.h"
#include "stats.h"
#include "fserve.h"
#include "util.h"
#define CATMODULE "xslt"
@ -280,9 +281,12 @@ int xslt_transform (xmlDocPtr doc, const char *xslfilename, client_t *client)
for (i = 0; node && i < arg_count; node = avl_get_next (node))
{
http_var_t *param = (http_var_t *)node->key;
char *tmp = util_url_escape (param->value);
params[i++] = param->name;
params[i] = (char*)alloca (strlen (param->value) +3);
sprintf (params[i++], "\'%s\'", param->value);
// use alloca for now, should really url esc into a supplied buffer
params[i] = (char*)alloca (strlen (tmp) + 3);
sprintf (params[i++], "\'%s\'", tmp);
free (tmp);
}
params[i] = NULL;
}

View File

@ -14,3 +14,12 @@ EXTRA_DIST = ConfigTab.cpp ConfigTab.h Icecast2win.clw Icecast2win.cpp \
icecast2_console.dsw icecast2_console.dsp credits.bmp icecast2title.bmp \
icecastService.cpp icecastService.dsp
bin_PROGRAMS = icecast
icecast_DEPENDENCIES = ../src/libicecast.a ../src/net/libicenet.la \
../src/thread/libicethread.la ../src/httpp/libicehttpp.la ../src/log/libicelog.la \
../src/avl/libiceavl.la ../src/timing/libicetiming.la
icecast_SOURCES = icecastService.cpp
icecast_CPPFLAGS = -I../src @XIPH_CFLAGS@ @XIPH_CPPFLAGS@
icecast_LDADD = $(icecast_DEPENDENCIES) @XIPH_LDFLAGS@ @XIPH_LIBS@ @KATE_LIBS@

View File

@ -1,7 +1,4 @@
#define _WIN32_WINNT 0x0400
#include <windows.h>
#include <stdio.h>
#include <direct.h>
extern "C" {
#include <config.h>
@ -27,7 +24,6 @@ SERVICE_STATUS_HANDLE hStatus;
void ServiceMain(int argc, char** argv);
void ControlHandler(DWORD request);
extern "C" int mainService(int argc, char **argv);
void installService (const char *path)
@ -112,10 +108,26 @@ void ControlHandler(DWORD request)
SetServiceStatus (hStatus, &ServiceStatus);
}
static int run_server (int argc, char *argv[])
{
int ret;
initialize_subsystems();
ret = server_init (argc, argv);
if (ret == 0)
server_process();
shutdown_subsystems();
return ret;
}
void ServiceMain(int argc, char** argv)
{
ServiceStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS;
ServiceStatus.dwWin32ExitCode = 0;
ServiceStatus.dwWin32ExitCode = -1;
ServiceStatus.dwServiceSpecificExitCode = 0;
ServiceStatus.dwCheckPoint = 0;
ServiceStatus.dwWaitHint = 0;
@ -123,7 +135,7 @@ void ServiceMain(int argc, char** argv)
hStatus = RegisterServiceCtrlHandler(PACKAGE_STRING, (LPHANDLER_FUNCTION)ControlHandler);
if (hStatus == (SERVICE_STATUS_HANDLE)0) {
// Registering Control Handler failed
MessageBox (NULL, "RegisterServiceCtrlHandler failed", NULL, MB_SERVICE_NOTIFICATION);
MessageBox (NULL, "RegisterServiceCtrlHandler failed", NULL, MB_SERVICE_NOTIFICATION);
return;
}
@ -144,19 +156,21 @@ void ServiceMain(int argc, char** argv)
else
argv2 [2] = argv[1];
ServiceStatus.dwWin32ExitCode = mainService(argc2, (char **)argv2);
ServiceStatus.dwWin32ExitCode = run_server (argc2, argv2);
ServiceStatus.dwCurrentState = SERVICE_STOPPED;
SetServiceStatus(hStatus, &ServiceStatus);
}
void main(int argc, char *argv[])
int main (int argc, char *argv[])
{
if (argc < 2)
{
printf ("Usage: icecastservice [remove] | [install <path>]\n");
return;
printf (PACKAGE_STRING "\n\n"
"Usage: icecastService [remove] | [install <path>]\n"
" icecastService -c icecast.xml\n\n");
return -1;
}
if (!strcmp(argv[1], "install"))
{
@ -165,29 +179,33 @@ void main(int argc, char *argv[])
else
printf ("install requires a path arg as well\n");
Sleep (1000);
return;
return 0;
}
if (!strcmp(argv[1], "remove") || !strcmp(argv[1], "uninstall"))
{
removeService();
return;
return 0;
}
if (_chdir(argv[1]) < 0)
{
char buffer [256];
_snprintf (buffer, sizeof(buffer), "Unable to change to directory %s", argv[1]);
MessageBox (NULL, buffer, NULL, MB_SERVICE_NOTIFICATION);
return;
}
if (strcmp (argv[1], "-c") == 0)
return run_server (argc, argv);
if (_chdir(argv[1]) < 0)
{
char buffer [256];
snprintf (buffer, sizeof(buffer), "Unable to change to directory %s", argv[1]);
MessageBox (NULL, buffer, NULL, MB_SERVICE_NOTIFICATION);
return -1;
}
SERVICE_TABLE_ENTRY ServiceTable[2];
ServiceTable[0].lpServiceName = PACKAGE_STRING;
ServiceTable[0].lpServiceName = (char*)PACKAGE_STRING;
ServiceTable[0].lpServiceProc = (LPSERVICE_MAIN_FUNCTION)ServiceMain;
ServiceTable[1].lpServiceName = NULL;
ServiceTable[1].lpServiceProc = NULL;
// Start the control dispatcher thread for our service
if (StartServiceCtrlDispatcher (ServiceTable) == 0)
MessageBox (NULL, "StartServiceCtrlDispatcher failed", NULL, MB_SERVICE_NOTIFICATION);
MessageBox (NULL, "StartServiceCtrlDispatcher failed", NULL, MB_SERVICE_NOTIFICATION);
return 0;
}