From 61316a25d7dafdc568398991f5fa541bbeee830f Mon Sep 17 00:00:00 2001 From: Jack Moffitt Date: Mon, 10 Sep 2001 02:21:46 +0000 Subject: [PATCH] Initial revision svn path=/trunk/icecast/; revision=1996 --- AUTHORS | 1 + COPYING | 1 + Makefile.am | 16 + README | 1 + TODO | 18 ++ acinclude.m4 | 195 ++++++++++++ autogen.sh | 61 ++++ conf/icecast.xml | 35 +++ configure.in | 106 +++++++ src/Makefile.am | 29 ++ src/TODO | 1 + src/client.c | 46 +++ src/client.h | 28 ++ src/config.c | 296 ++++++++++++++++++ src/config.h | 55 ++++ src/configtest.c | 58 ++++ src/connection.c | 505 ++++++++++++++++++++++++++++++ src/connection.h | 23 ++ src/format.c | 31 ++ src/format.h | 40 +++ src/format_vorbis.c | 120 +++++++ src/format_vorbis.h | 11 + src/global.c | 40 +++ src/global.h | 28 ++ src/logging.c | 71 +++++ src/logging.h | 80 +++++ src/main.c | 233 ++++++++++++++ src/os.h | 16 + src/refbuf.c | 115 +++++++ src/refbuf.h | 44 +++ src/sighandler.c | 57 ++++ src/sighandler.h | 8 + src/source.c | 364 ++++++++++++++++++++++ src/source.h | 26 ++ src/stats.c | 740 ++++++++++++++++++++++++++++++++++++++++++++ src/stats.h | 75 +++++ src/util.c | 58 ++++ src/util.h | 6 + 38 files changed, 3638 insertions(+) create mode 100644 AUTHORS create mode 100644 COPYING create mode 100644 Makefile.am create mode 100644 README create mode 100644 TODO create mode 100644 acinclude.m4 create mode 100755 autogen.sh create mode 100644 conf/icecast.xml create mode 100644 configure.in create mode 100644 src/Makefile.am create mode 100644 src/TODO create mode 100644 src/client.c create mode 100644 src/client.h create mode 100644 src/config.c create mode 100644 src/config.h create mode 100644 src/configtest.c create mode 100644 src/connection.c create mode 100644 src/connection.h create mode 100644 src/format.c create mode 100644 src/format.h create mode 100644 src/format_vorbis.c create mode 100644 src/format_vorbis.h create mode 100644 src/global.c create mode 100644 src/global.h create mode 100644 src/logging.c create mode 100644 src/logging.h create mode 100644 src/main.c create mode 100644 src/os.h create mode 100644 src/refbuf.c create mode 100644 src/refbuf.h create mode 100644 src/sighandler.c create mode 100644 src/sighandler.h create mode 100644 src/source.c create mode 100644 src/source.h create mode 100644 src/stats.c create mode 100644 src/stats.h create mode 100644 src/util.c create mode 100644 src/util.h diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 00000000..f7ecf3e2 --- /dev/null +++ b/AUTHORS @@ -0,0 +1 @@ +Jack Moffitt diff --git a/COPYING b/COPYING new file mode 100644 index 00000000..f95c2d9d --- /dev/null +++ b/COPYING @@ -0,0 +1 @@ + diff --git a/Makefile.am b/Makefile.am new file mode 100644 index 00000000..547160bb --- /dev/null +++ b/Makefile.am @@ -0,0 +1,16 @@ +## Process this file with automake to produce Makefile.in + +AUTOMAKE_OPTIONS = foreign dist-zip + +SUBDIRS = src + +EXTRA_DIST = README AUTHORS COPYING + +# SCCS Definitions (for BitKeeper) +GET = true + +debug: + $(MAKE) all CFLAGS="@DEBUG@" + +profile: + $(MAKE) all CFLAGS="@PROFILE@" diff --git a/README b/README new file mode 100644 index 00000000..b23682ee --- /dev/null +++ b/README @@ -0,0 +1 @@ +This is still experimental. diff --git a/TODO b/TODO new file mode 100644 index 00000000..f3e4011a --- /dev/null +++ b/TODO @@ -0,0 +1,18 @@ +BUGS +---- +- stats get off? + + + +FEATURES +-------- +- pull out vorbis comments. and send to stats. + +- directory server GUID checks + directory server does GET /GUID-asldjfasldfjalsdkfjasldkfj HTTP/1.0 + and either gets a 404 if it's wrong, or a 200 if it's correct. + +- adding new stats type, event. events don't modify the global stats tree, + ie, source /1234.ogg disconnected + +- stats.xml, a users requests this, and gets an xml dump of the current stats. diff --git a/acinclude.m4 b/acinclude.m4 new file mode 100644 index 00000000..c2f80c9b --- /dev/null +++ b/acinclude.m4 @@ -0,0 +1,195 @@ +# Configure paths for libogg +# Jack Moffitt 10-21-2000 +# Shamelessly stolen from Owen Taylor and Manish Singh + +dnl AM_PATH_OGG([ACTION-IF-FOUND [, ACTION-IF-NOT-FOUND]]) +dnl Test for libogg, and define OGG_CFLAGS and OGG_LIBS +dnl +AC_DEFUN(AM_PATH_OGG, +[dnl +dnl Get the cflags and libraries +dnl +AC_ARG_WITH(ogg-prefix,[ --with-ogg-prefix=PFX Prefix where libogg is installed (optional)], ogg_prefix="$withval", ogg_prefix="") +AC_ARG_ENABLE(oggtest, [ --disable-oggtest Do not try to compile and run a test Ogg program],, enable_oggtest=yes) + + if test "x$ogg_prefix" != "xNONE" ; then + ogg_args="$ogg_args --prefix=$ogg_prefix" + OGG_CFLAGS="-I$ogg_prefix/include" + OGG_LIBS="-L$ogg_prefix/lib" + elif test "$prefix" != ""; then + ogg_args="$ogg_args --prefix=$prefix" + OGG_CFLAGS="-I$prefix/include" + OGG_LIBS="-L$prefix/lib" + fi + + OGG_LIBS="$OGG_LIBS -logg" + + AC_MSG_CHECKING(for Ogg) + no_ogg="" + + + if test "x$enable_oggtest" = "xyes" ; then + ac_save_CFLAGS="$CFLAGS" + ac_save_LIBS="$LIBS" + CFLAGS="$CFLAGS $OGG_CFLAGS" + LIBS="$LIBS $OGG_LIBS" +dnl +dnl Now check if the installed Ogg is sufficiently new. +dnl + rm -f conf.oggtest + AC_TRY_RUN([ +#include +#include +#include +#include + +int main () +{ + system("touch conf.oggtest"); + return 0; +} + +],, no_ogg=yes,[echo $ac_n "cross compiling; assumed OK... $ac_c"]) + CFLAGS="$ac_save_CFLAGS" + LIBS="$ac_save_LIBS" + fi + + if test "x$no_ogg" = "x" ; then + AC_MSG_RESULT(yes) + ifelse([$1], , :, [$1]) + else + AC_MSG_RESULT(no) + if test -f conf.oggtest ; then + : + else + echo "*** Could not run Ogg test program, checking why..." + CFLAGS="$CFLAGS $OGG_CFLAGS" + LIBS="$LIBS $OGG_LIBS" + AC_TRY_LINK([ +#include +#include +], [ return 0; ], + [ echo "*** The test program compiled, but did not run. This usually means" + echo "*** that the run-time linker is not finding Ogg or finding the wrong" + echo "*** version of Ogg. If it is not finding Ogg, you'll need to set your" + echo "*** LD_LIBRARY_PATH environment variable, or edit /etc/ld.so.conf to point" + echo "*** to the installed location Also, make sure you have run ldconfig if that" + echo "*** is required on your system" + echo "***" + echo "*** If you have an old version installed, it is best to remove it, although" + echo "*** you may also be able to get things to work by modifying LD_LIBRARY_PATH"], + [ echo "*** The test program failed to compile or link. See the file config.log for the" + echo "*** exact error that occured. This usually means Ogg was incorrectly installed" + echo "*** or that you have moved Ogg since it was installed. In the latter case, you" + echo "*** may want to edit the ogg-config script: $OGG_CONFIG" ]) + CFLAGS="$ac_save_CFLAGS" + LIBS="$ac_save_LIBS" + fi + OGG_CFLAGS="" + OGG_LIBS="" + ifelse([$2], , :, [$2]) + fi + AC_SUBST(OGG_CFLAGS) + AC_SUBST(OGG_LIBS) + rm -f conf.oggtest +]) +# Configure paths for libvorbis +# Jack Moffitt 10-21-2000 +# Shamelessly stolen from Owen Taylor and Manish Singh + +dnl AM_PATH_VORBIS([ACTION-IF-FOUND [, ACTION-IF-NOT-FOUND]]) +dnl Test for libvorbis, and define VORBIS_CFLAGS and VORBIS_LIBS +dnl +AC_DEFUN(AM_PATH_VORBIS, +[dnl +dnl Get the cflags and libraries +dnl +AC_ARG_WITH(vorbis-prefix,[ --with-vorbis-prefix=PFX Prefix where libvorbis is installed (optional)], vorbis_prefix="$withval", vorbis_prefix="") +AC_ARG_ENABLE(vorbistest, [ --disable-vorbistest Do not try to compile and run a test Vorbis program],, enable_vorbistest=yes) + + if test "x$vorbis_prefix" != "xNONE" ; then + vorbis_args="$vorbis_args --prefix=$vorbis_prefix" + VORBIS_CFLAGS="-I$vorbis_prefix/include" + VORBIS_LIBDIR="-L$vorbis_prefix/lib" + elif test "$prefix" != ""; then + vorbis_args="$vorbis_args --prefix=$prefix" + VORBIS_CFLAGS="-I$prefix/include" + VORBIS_LIBDIR="-L$prefix/lib" + fi + + VORBIS_LIBS="$VORBIS_LIBDIR -lvorbis -lm" + VORBISFILE_LIBS="-lvorbisfile" + VORBISENC_LIBS="-lvorbisenc" + + AC_MSG_CHECKING(for Vorbis) + no_vorbis="" + + + if test "x$enable_vorbistest" = "xyes" ; then + ac_save_CFLAGS="$CFLAGS" + ac_save_LIBS="$LIBS" + CFLAGS="$CFLAGS $VORBIS_CFLAGS" + LIBS="$LIBS $VORBIS_LIBS $OGG_LIBS" +dnl +dnl Now check if the installed Vorbis is sufficiently new. +dnl + rm -f conf.vorbistest + AC_TRY_RUN([ +#include +#include +#include +#include + +int main () +{ + system("touch conf.vorbistest"); + return 0; +} + +],, no_vorbis=yes,[echo $ac_n "cross compiling; assumed OK... $ac_c"]) + CFLAGS="$ac_save_CFLAGS" + LIBS="$ac_save_LIBS" + fi + + if test "x$no_vorbis" = "x" ; then + AC_MSG_RESULT(yes) + ifelse([$1], , :, [$1]) + else + AC_MSG_RESULT(no) + if test -f conf.vorbistest ; then + : + else + echo "*** Could not run Vorbis test program, checking why..." + CFLAGS="$CFLAGS $VORBIS_CFLAGS" + LIBS="$LIBS $VORBIS_LIBS $OGG_LIBS" + AC_TRY_LINK([ +#include +#include +], [ return 0; ], + [ echo "*** The test program compiled, but did not run. This usually means" + echo "*** that the run-time linker is not finding Vorbis or finding the wrong" + echo "*** version of Vorbis. If it is not finding Vorbis, you'll need to set your" + echo "*** LD_LIBRARY_PATH environment variable, or edit /etc/ld.so.conf to point" + echo "*** to the installed location Also, make sure you have run ldconfig if that" + echo "*** is required on your system" + echo "***" + echo "*** If you have an old version installed, it is best to remove it, although" + echo "*** you may also be able to get things to work by modifying LD_LIBRARY_PATH"], + [ echo "*** The test program failed to compile or link. See the file config.log for the" + echo "*** exact error that occured. This usually means Vorbis was incorrectly installed" + echo "*** or that you have moved Vorbis since it was installed." ]) + CFLAGS="$ac_save_CFLAGS" + LIBS="$ac_save_LIBS" + fi + VORBIS_CFLAGS="" + VORBIS_LIBS="" + VORBISFILE_LIBS="" + VORBISENC_LIBS="" + ifelse([$2], , :, [$2]) + fi + AC_SUBST(VORBIS_CFLAGS) + AC_SUBST(VORBIS_LIBS) + AC_SUBST(VORBISFILE_LIBS) + AC_SUBST(VORBISENC_LIBS) + rm -f conf.vorbistest +]) diff --git a/autogen.sh b/autogen.sh new file mode 100755 index 00000000..74ad6f3f --- /dev/null +++ b/autogen.sh @@ -0,0 +1,61 @@ +#!/bin/sh +# Run this to set up the build system: configure, makefiles, etc. +# (based on the version in enlightenment's cvs) + +package="icecast" + +srcdir=`dirname $0` +test -z "$srcdir" && srcdir=. + +cd "$srcdir" +DIE=0 + +(autoconf --version) < /dev/null > /dev/null 2>&1 || { + echo + echo "You must have autoconf installed to compile $package." + echo "Download the appropriate package for your distribution," + echo "or get the source tarball at ftp://ftp.gnu.org/pub/gnu/" + DIE=1 +} + +(automake --version) < /dev/null > /dev/null 2>&1 || { + echo + echo "You must have automake installed to compile $package." + echo "Download the appropriate package for your system, + echo "or get the source from one of the GNU ftp sites" + echo "listed in http://www.gnu.org/order/ftp.html" + DIE=1 +} + +(libtool --version) < /dev/null > /dev/null 2>&1 || { + echo + echo "You must have libtool installed to compile $package." + echo "Download the appropriate package for your system, + echo "or get the source from one of the GNU ftp sites" + echo "listed in http://www.gnu.org/order/ftp.html" + DIE=1 +} + +if test "$DIE" -eq 1; then + exit 1 +fi + +if test -z "$*"; then + echo "I am going to run ./configure with no arguments - if you wish " + echo "to pass any to it, please specify them on the $0 command line." +fi + +echo "Generating configuration files for $package, please wait...." + +echo " aclocal $ACLOCAL_FLAGS" +aclocal $ACLOCAL_FLAGS +#echo " autoheader" +#autoheader +echo " libtoolize --automake" +libtoolize --automake +echo " automake --add-missing" +automake --add-missing +echo " autoconf" +autoconf + +$srcdir/configure "$@" && echo diff --git a/conf/icecast.xml b/conf/icecast.xml new file mode 100644 index 00000000..136fabaa --- /dev/null +++ b/conf/icecast.xml @@ -0,0 +1,35 @@ + + Jack's House + jack@icecast.org + + + 100 + 2 + 5 + 15 + + + hackme + + + 5 + + yp.icecast.org + 15 + + + + i.cantcode.com + 8000 + + + + /usr/local/icecast + /tmp + + + + access.log + error.log + + diff --git a/configure.in b/configure.in new file mode 100644 index 00000000..e869e17c --- /dev/null +++ b/configure.in @@ -0,0 +1,106 @@ +dnl Process this file with autoconf to produce a configure script. +AC_INIT(src/main.c) + +AM_INIT_AUTOMAKE(icecast,2.0) + +AC_PROG_CC +AC_CANONICAL_HOST +AM_PROG_LIBTOOL + +dnl Set some options based on environment + +if test -z "$GCC"; then + case $host in + *-*-irix*) + DEBUG="-g -signed" + CFLAGS="-O2 -w -signed" + PROFILE="-p -g3 -O2 -signed" + ;; + sparc-sun-solaris*) + DEBUG="-v -g" + CFLAGS="-xO4 -fast -w -fsimple -native -xcg92" + PROFILE="-v -xpg -g -xO4 -fast -native -fsimple -xcg92 -Dsuncc" + ;; + *) + DEBUG="-g" + CFLAGS="-O" + PROFILE="-g -p" + ;; + esac +else + case $host in + *-*-linux*) + DEBUG="-g -Wall -fsigned-char -D_REENTRANT -D_GNU_SOURCE" + CFLAGS="-O20 -ffast-math -fsigned-char -D_REENTRANT -D_GNU_SOURCE" + PROFILE="-Wall -W -pg -g -O20 -ffast-math -fsigned-char -D_REENTRANT -D_GNU_SOURCE" + ;; + sparc-sun-*) + DEBUG="-g -Wall -fsigned-char -mv8" + CFLAGS="-O20 -ffast-math -fsigned-char -mv8" + PROFILE="-pg -g -O20 -fsigned-char -mv8" + ;; + *) + DEBUG="-g -Wall -fsigned-char" + CFLAGS="-O20 -fsigned-char" + PROFILE="-O20 -g -pg -fsigned-char" + ;; + esac +fi + +dnl Checks for programs. + +dnl Checks for libraries. + +dnl IPV6 +AC_SEARCH_LIBS(inet_pton, socket, AC_DEFINE(HAVE_IPV6, 1, [Define if you have IPV6 support])) +AC_SEARCH_LIBS(getipnodebyname, nsl, + AC_DEFINE(HAVE_GETIPNODEBYNAME, 1, + [Define if you have the getipnodebyname function]) +) + +dnl Checks for header files. +AC_HEADER_STDC + +dnl Checks for typedefs, structures, and compiler characteristics. +AC_C_CONST + +dnl Check for types + +dnl Checks for library functions. + +dnl -- configure options -- + +AC_ARG_WITH(xml-config, + [ --with-xml-config=PATH use xml-config in PATH to find libxml ], + [if ! test -x "$with_xml_config" + then + AC_MSG_ERROR([$with_xml_config cannot be executed]) + fi + XMLCONFIG="$with_xml_config"] +) +if test -z "$XMLCONFIG" +then + AC_CHECK_PROGS(XMLCONFIG, [xml2-config xml-config]) +fi +if test -n "$XMLCONFIG" +then + LIBS="$LIBS `$XMLCONFIG --libs`" + CPPFLAGS="$CPPFLAGS `$XMLCONFIG --cflags`" + AC_CHECK_FUNC(xmlParseFile,, [AC_MSG_ERROR([There was a problem linking with libxml])]) +else + AC_MSG_ERROR([xml-config could not be found]) +fi + +AM_PATH_OGG(LIBS="$LIBS $OGG_LIBS", AC_MSG_ERROR(must have Ogg installed!)) +AM_PATH_VORBIS(LIBS="$LIBS $VORBIS_LIBS", AC_MSG_ERROR(must have Vorbis installed!)) + +dnl Make substitutions + +AC_SUBST(LIBTOOL_DEPS) +AC_SUBST(OPT) +AC_SUBST(LIBS) +AC_SUBST(DEBUG) +AC_SUBST(CFLAGS) +AC_SUBST(PROFILE) + +AC_OUTPUT(Makefile src/Makefile src/avl/Makefile src/httpp/Makefile src/thread/Makefile src/log/Makefile src/net/Makefile src/timing/Makefile) diff --git a/src/Makefile.am b/src/Makefile.am new file mode 100644 index 00000000..5fbddb7f --- /dev/null +++ b/src/Makefile.am @@ -0,0 +1,29 @@ +## Process this with automake to create Makefile.in + +AUTOMAKE_OPTIONS = foreign + +SUBDIRS = avl thread httpp net log timing + +bin_PROGRAMS = icecast + +noinst_HEADERS = config.h os.h logging.h sighandler.h connection.h global.h\ + util.h source.h stats.h refbuf.h client.h format.h format_vorbis.h +icecast_SOURCES = config.c main.c logging.c sighandler.c connection.c global.c\ + util.c source.c stats.c refbuf.c client.c format.c format_vorbis.c + +icecast_LDADD = net/libicenet.la thread/libicethread.la httpp/libicehttpp.la\ + log/libicelog.la avl/libiceavl.la timing/libicetiming.la +LIBS = -lpthread + +INCLUDES = -I$(srcdir)/net -I$(srcdir)/thread -I$(srcdir)/avl -I$(srcdir)/httpp \ + -I$(srcdir)/log -I$(srcdir)/timing + +# SCCS stuff (for BitKeeper) +GET = true + +debug: + $(MAKE) all CFLAGS="@DEBUG@" + +profile: + $(MAKE) all CFLAGS="@PROFILE@" + diff --git a/src/TODO b/src/TODO new file mode 100644 index 00000000..bf0c587d --- /dev/null +++ b/src/TODO @@ -0,0 +1 @@ +need a shutdown function in case anything else in the code needs to have icecast gracefully shutdown. \ No newline at end of file diff --git a/src/client.c b/src/client.c new file mode 100644 index 00000000..21f332ba --- /dev/null +++ b/src/client.c @@ -0,0 +1,46 @@ +/* client.c +** +** client interface implementation +** +*/ + +#include +#include + +#include "thread.h" +#include "avl.h" +#include "httpp.h" + +#include "connection.h" +#include "refbuf.h" + +#include "client.h" +#include "logging.h" + +client_t *client_create(connection_t *con, http_parser_t *parser) +{ + client_t *client = (client_t *)malloc(sizeof(client_t)); + + client->con = con; + client->parser = parser; + client->queue = NULL; + client->pos = 0; + + return client; +} + +void client_destroy(client_t *client) +{ + refbuf_t *refbuf; + + /* write log entry */ + logging_access(client); + + connection_close(client->con); + httpp_destroy(client->parser); + + while ((refbuf = refbuf_queue_remove(&client->queue))) + refbuf_release(refbuf); + + free(client); +} diff --git a/src/client.h b/src/client.h new file mode 100644 index 00000000..3c4c4749 --- /dev/null +++ b/src/client.h @@ -0,0 +1,28 @@ +/* client.h +** +** client data structions and function definitions +** +*/ +#ifndef __CLIENT_H__ +#define __CLIENT_H__ + +typedef struct _client_tag +{ + /* the clients connection */ + connection_t *con; + /* the clients http headers */ + http_parser_t *parser; + + /* http response code for this client */ + int respcode; + + /* buffer queue */ + refbuf_queue_t *queue; + /* position in first buffer */ + unsigned long pos; +} client_t; + +client_t *client_create(connection_t *con, http_parser_t *parser); +void client_destroy(client_t *client); + +#endif /* __CLIENT_H__ */ diff --git a/src/config.c b/src/config.c new file mode 100644 index 00000000..5f2b2bb8 --- /dev/null +++ b/src/config.c @@ -0,0 +1,296 @@ +#include +#include +#include +#include +#include +#include "config.h" + +#define CONFIG_DEFAULT_LOCATION "Earth" +#define CONFIG_DEFAULT_ADMIN "icemaster@localhost" +#define CONFIG_DEFAULT_CLIENT_LIMIT 256 +#define CONFIG_DEFAULT_SOURCE_LIMIT 16 +#define CONFIG_DEFAULT_THREADPOOL_SIZE 4 +#define CONFIG_DEFAULT_CLIENT_TIMEOUT 30 +#define CONFIG_DEFAULT_HEADER_TIMEOUT 15 +#define CONFIG_DEFAULT_SOURCE_PASSWORD "changeme" +#define CONFIG_DEFAULT_TOUCH_FREQ 5 +#define CONFIG_DEFAULT_HOSTNAME "localhost" +#define CONFIG_DEFAULT_PORT 8888 +#define CONFIG_DEFAULT_BASE_DIR "/usr/local/icecast" +#define CONFIG_DEFAULT_LOG_DIR "/usr/local/icecast/logs" +#define CONFIG_DEFAULT_ACCESS_LOG "access.log" +#define CONFIG_DEFAULT_ERROR_LOG "error.log" + +ice_config_t _configuration; +char *_config_filename; + +static void _set_defaults(void); +static void _parse_root(xmlDocPtr doc, xmlNodePtr node); +static void _parse_limits(xmlDocPtr doc, xmlNodePtr node); +static void _parse_directory(xmlDocPtr doc, xmlNodePtr node); +static void _parse_paths(xmlDocPtr doc, xmlNodePtr node); +static void _parse_logging(xmlDocPtr doc, xmlNodePtr node); +static void _add_server(xmlDocPtr doc, xmlNodePtr node); + +void config_initialize(void) +{ + memset(&_configuration, 0, sizeof(ice_config_t)); + _set_defaults(); + _config_filename = NULL; +} + +void config_shutdown(void) +{ + if (_config_filename) free(_config_filename); + + if (_configuration.location) free(_configuration.location); + if (_configuration.admin) free(_configuration.admin); + if (_configuration.source_password) free(_configuration.source_password); + if (_configuration.hostname) free(_configuration.hostname); + if (_configuration.base_dir) free(_configuration.base_dir); + if (_configuration.log_dir) free(_configuration.log_dir); + if (_configuration.access_log) free(_configuration.access_log); + if (_configuration.error_log) free(_configuration.error_log); + + memset(&_configuration, 0, sizeof(ice_config_t)); +} + +int config_parse_file(const char *filename) +{ + xmlDocPtr doc; + xmlNodePtr node; + + if (filename == NULL || strcmp(filename, "") == 0) return CONFIG_EINSANE; + + _config_filename = (char *)strdup(filename); + + doc = xmlParseFile(_config_filename); + if (doc == NULL) { + return -1; + } + + node = xmlDocGetRootElement(doc); + if (node == NULL) { + xmlFreeDoc(doc); + return CONFIG_ENOROOT; + } + + if (strcmp(node->name, "icecast") != 0) { + xmlFreeDoc(doc); + return CONFIG_EBADROOT; + } + + xmlDocDump(stdout, doc); + + _parse_root(doc, node->xmlChildrenNode); + + xmlFreeDoc(doc); + + return 0; +} + +int config_parse_cmdline(int arg, char **argv) +{ + return 0; +} + +int config_rehash(void) +{ + return 0; +} + +ice_config_t *config_get_config(void) +{ + return &_configuration; +} + +static void _set_defaults(void) +{ + _configuration.location = (char *)strdup(CONFIG_DEFAULT_LOCATION); + _configuration.admin = (char *)strdup(CONFIG_DEFAULT_ADMIN); + _configuration.client_limit = CONFIG_DEFAULT_CLIENT_LIMIT; + _configuration.source_limit = CONFIG_DEFAULT_SOURCE_LIMIT; + _configuration.threadpool_size = CONFIG_DEFAULT_THREADPOOL_SIZE; + _configuration.client_timeout = CONFIG_DEFAULT_CLIENT_TIMEOUT; + _configuration.header_timeout = CONFIG_DEFAULT_HEADER_TIMEOUT; + _configuration.source_password = (char *)strdup(CONFIG_DEFAULT_SOURCE_PASSWORD); + _configuration.touch_freq = CONFIG_DEFAULT_TOUCH_FREQ; + _configuration.dir_list = NULL; + _configuration.hostname = (char *)strdup(CONFIG_DEFAULT_HOSTNAME); + _configuration.port = CONFIG_DEFAULT_PORT; + _configuration.bind_address = NULL; + _configuration.base_dir = (char *)strdup(CONFIG_DEFAULT_BASE_DIR); + _configuration.log_dir = (char *)strdup(CONFIG_DEFAULT_LOG_DIR); + _configuration.access_log = (char *)strdup(CONFIG_DEFAULT_ACCESS_LOG); + _configuration.error_log = (char *)strdup(CONFIG_DEFAULT_ERROR_LOG); +} + +static void _parse_root(xmlDocPtr doc, xmlNodePtr node) +{ + char *tmp; + + do { + if (node == NULL) break; + if (xmlIsBlankNode(node)) continue; + + if (strcmp(node->name, "location") == 0) { + if (_configuration.location) free(_configuration.location); + _configuration.location = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "admin") == 0) { + if (_configuration.admin) free(_configuration.admin); + _configuration.admin = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "source-password") == 0) { + if (_configuration.source_password) free(_configuration.source_password); + _configuration.source_password = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "hostname") == 0) { + if (_configuration.hostname) free(_configuration.hostname); + _configuration.hostname = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "port") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.port = atoi(tmp); + if (tmp) free(tmp); + } else if (strcmp(node->name, "bind-address") == 0) { + if (_configuration.bind_address) free(_configuration.bind_address); + _configuration.bind_address = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "limits") == 0) { + _parse_limits(doc, node->xmlChildrenNode); + } else if (strcmp(node->name, "directory") == 0) { + _parse_directory(doc, node->xmlChildrenNode); + } else if (strcmp(node->name, "paths") == 0) { + _parse_paths(doc, node->xmlChildrenNode); + } else if (strcmp(node->name, "logging") == 0) { + _parse_logging(doc, node->xmlChildrenNode); + } + } while ((node = node->next)); +} + +static void _parse_limits(xmlDocPtr doc, xmlNodePtr node) +{ + char *tmp; + + do { + if (node == NULL) break; + if (xmlIsBlankNode(node)) continue; + + if (strcmp(node->name, "clients") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.client_limit = atoi(tmp); + if (tmp) free(tmp); + } else if (strcmp(node->name, "sources") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.source_limit = atoi(tmp); + if (tmp) free(tmp); + } else if (strcmp(node->name, "threadpool") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.threadpool_size = atoi(tmp); + if (tmp) free(tmp); + } else if (strcmp(node->name, "client-timeout") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.client_timeout = atoi(tmp); + if (tmp) free(tmp); + } else if (strcmp(node->name, "header-timeout") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.header_timeout = atoi(tmp); + if (tmp) free(tmp); + } + } while ((node = node->next)); +} + +static void _parse_directory(xmlDocPtr doc, xmlNodePtr node) +{ + char *tmp; + + do { + if (node == NULL) break; + if (xmlIsBlankNode(node)) continue; + + if (strcmp(node->name, "server") == 0) { + _add_server(doc, node->xmlChildrenNode); + } else if (strcmp(node->name, "touch-freq") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.touch_freq = atoi(tmp); + if (tmp) free(tmp); + } + } while ((node = node->next)); +} + +static void _parse_paths(xmlDocPtr doc, xmlNodePtr node) +{ + do { + if (node == NULL) break; + if (xmlIsBlankNode(node)) continue; + + if (strcmp(node->name, "basedir") == 0) { + if (_configuration.base_dir) free(_configuration.base_dir); + _configuration.base_dir = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "logdir") == 0) { + if (_configuration.log_dir) free(_configuration.log_dir); + _configuration.log_dir = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } + } while ((node = node->next)); +} + +static void _parse_logging(xmlDocPtr doc, xmlNodePtr node) +{ + do { + if (node == NULL) break; + if (xmlIsBlankNode(node)) continue; + + if (strcmp(node->name, "accesslog") == 0) { + if (_configuration.access_log) free(_configuration.access_log); + _configuration.access_log = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "errorlog") == 0) { + if (_configuration.error_log) free(_configuration.error_log); + _configuration.error_log = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } + } while ((node = node->next)); +} + +static void _add_server(xmlDocPtr doc, xmlNodePtr node) +{ + ice_config_dir_t *dirnode, *server; + int addnode; + char *tmp; + + server = (ice_config_dir_t *)malloc(sizeof(ice_config_dir_t)); + server->touch_freq = _configuration.touch_freq; + server->host = NULL; + addnode = 0; + + do { + if (node == NULL) break; + if (xmlIsBlankNode(node)) continue; + + if (strcmp(node->name, "host") == 0) { + server->host = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + addnode = 1; + } else if (strcmp(node->name, "touch-freq") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + server->touch_freq = atoi(tmp); + if (tmp) free(tmp); + } + server->next = NULL; + } while ((node = node->next)); + + if (addnode) { + dirnode = _configuration.dir_list; + if (dirnode == NULL) { + _configuration.dir_list = server; + } else { + while (dirnode->next) dirnode = dirnode->next; + + dirnode->next = server; + } + + server = NULL; + addnode = 0; + } + + if (server) { + if (server->host) free(server->host); + free(server); + server = NULL; + } +} + + diff --git a/src/config.h b/src/config.h new file mode 100644 index 00000000..8fc04256 --- /dev/null +++ b/src/config.h @@ -0,0 +1,55 @@ +#ifndef __CONFIG_H__ +#define __CONFIG_H__ + +#define CONFIG_EINSANE -1 +#define CONFIG_ENOROOT -2 +#define CONFIG_EBADROOT -3 + +typedef struct ice_config_dir_tag +{ + char *host; + int touch_freq; + struct ice_config_dir_tag *next; +} ice_config_dir_t; + +typedef struct ice_config_tag +{ + char *location; + char *admin; + + int client_limit; + int source_limit; + int threadpool_size; + int client_timeout; + int header_timeout; + + char *source_password; + + int touch_freq; + ice_config_dir_t *dir_list; + + char *hostname; + int port; + char *bind_address; + + char *base_dir; + char *log_dir; + + char *access_log; + char *error_log; +} ice_config_t; + +void config_initialize(void); +void config_shutdown(void); + +int config_parse_file(const char *filename); +int config_parse_cmdline(int arg, char **argv); + +int config_rehash(void); + +ice_config_t *config_get_config(void); + +#endif /* __CONFIG_H__ */ + + + diff --git a/src/configtest.c b/src/configtest.c new file mode 100644 index 00000000..8d1776a6 --- /dev/null +++ b/src/configtest.c @@ -0,0 +1,58 @@ +#include +#include "config.h" + +void _dump_config(ice_config_t *config); + +int main(void) +{ + ice_config_t *config; + + config_initialize(); + + config_parse_file("icecast.xml"); + + config = config_get_config(); + + _dump_config(config); + + config_shutdown(); + + return 0; +} + +void _dump_config(ice_config_t *config) +{ + ice_config_dir_t *node; + + printf("-----\n"); + printf("location = %s\n", config->location); + printf("admin = %s\n", config->admin); + printf("client_limit = %d\n", config->client_limit); + printf("source_limit = %d\n", config->source_limit); + printf("threadpool_size = %d\n", config->threadpool_size); + printf("client_timeout = %d\n", config->client_timeout); + printf("source_password = %s\n", config->source_password); + printf("touch_freq = %d\n", config->touch_freq); + + node = config->dir_list; + while (node) { + printf("directory.touch_freq = %d\n", node->touch_freq); + printf("directory.host = %s\n", node->host); + + node = node->next; + } + + printf("hostname = %s\n", config->hostname); + printf("port = %d\n", config->port); + printf("bind_address = %s\n", config->bind_address); + printf("base_dir = %s\n", config->base_dir); + printf("log_dir = %s\n", config->log_dir); + printf("access_log = %s\n", config->access_log); + printf("error_log = %s\n", config->error_log); + printf("-----\n"); +} + + + + + diff --git a/src/connection.c b/src/connection.c new file mode 100644 index 00000000..84dca72f --- /dev/null +++ b/src/connection.c @@ -0,0 +1,505 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "os.h" + +#include "thread.h" +#include "avl.h" +#include "sock.h" +#include "log.h" +#include "httpp.h" + +#include "config.h" +#include "global.h" +#include "util.h" +#include "connection.h" +#include "refbuf.h" +#include "client.h" +#include "stats.h" +#include "format.h" +#include "logging.h" + +#include "source.h" + +#define CATMODULE "connection" + +typedef struct con_queue_tag { + connection_t *con; + struct con_queue_tag *next; +} con_queue_t; + +typedef struct _thread_queue_tag { + long thread_id; + struct _thread_queue_tag *next; +} thread_queue_t; + +static mutex_t _connection_mutex; +static unsigned long _current_id = 0; +static int _initialized = 0; +static cond_t _pool_cond; + +static con_queue_t *_queue = NULL; +static mutex_t _queue_mutex; + +static thread_queue_t *_conhands = NULL; + +static rwlock_t _source_shutdown_rwlock; + +static void *_handle_connection(void *arg); + +void connection_initialize(void) +{ + if (_initialized) return; + + thread_mutex_create(&_connection_mutex); + thread_mutex_create(&_queue_mutex); + thread_rwlock_create(&_source_shutdown_rwlock); + thread_cond_create(&_pool_cond); + + _initialized = 1; +} + +void connection_shutdown(void) +{ + if (!_initialized) return; + + thread_cond_destroy(&_pool_cond); + thread_rwlock_destroy(&_source_shutdown_rwlock); + thread_mutex_destroy(&_queue_mutex); + thread_mutex_destroy(&_connection_mutex); + + _initialized = 0; +} + +static connection_t *_create_connection(void) +{ + connection_t *con; + + con = (connection_t *)malloc(sizeof(connection_t)); + memset(con, 0, sizeof(connection_t)); + + return con; +} + +static unsigned long _next_connection_id(void) +{ + unsigned long id; + + thread_mutex_lock(&_connection_mutex); + id = _current_id++; + thread_mutex_unlock(&_connection_mutex); + + return id; +} + +static connection_t *_accept_connection(void) +{ + int sock; + fd_set rfds; + connection_t *con; + struct timeval tv; + char *ip; + + FD_ZERO(&rfds); + FD_SET(global.serversock, &rfds); + + tv.tv_sec = 0; + tv.tv_usec = 30000; + + if (select(global.serversock + 1, &rfds, NULL, NULL, &tv) <= 0) { + return NULL; + } + + /* malloc enough room for 123.123.123.123\0 */ + ip = (char *)malloc(16); + + sock = sock_accept(global.serversock, ip, 16); + if (sock >= 0) { + con = _create_connection(); + + con->sock = sock; + con->con_time = time(NULL); + con->id = _next_connection_id(); + con->ip = ip; + + return con; + } + + if (!sock_recoverable(sock_error())) + WARN2("accept() failed with error %d: %s", sock_error(), strerror(sock_error())); + + free(ip); + + return NULL; +} + +static void _add_connection(connection_t *con) +{ + con_queue_t *node; + + node = (con_queue_t *)malloc(sizeof(con_queue_t)); + + thread_mutex_lock(&_queue_mutex); + node->con = con; + node->next = _queue; + _queue = node; + thread_mutex_unlock(&_queue_mutex); + + printf("connection added....\n"); +} + +static void _signal_pool(void) +{ + thread_cond_signal(&_pool_cond); +} + +static void _push_thread(thread_queue_t **queue, long thread_id) +{ + /* create item */ + thread_queue_t *item = (thread_queue_t *)malloc(sizeof(thread_queue_t)); + item->thread_id = thread_id; + item->next = NULL; + + + thread_mutex_lock(&_queue_mutex); + if (*queue == NULL) { + *queue = item; + } else { + item->next = *queue; + *queue = item; + } + thread_mutex_unlock(&_queue_mutex); +} + +static long _pop_thread(thread_queue_t **queue) +{ + long id; + thread_queue_t *item; + + thread_mutex_lock(&_queue_mutex); + + item = *queue; + if (item == NULL) { + thread_mutex_unlock(&_queue_mutex); + return -1; + } + + *queue = item->next; + item->next = NULL; + id = item->thread_id; + free(item); + + thread_mutex_unlock(&_queue_mutex); + + return id; +} + +static void _build_pool(void) +{ + ice_config_t *config; + int i, tid; + char buff[64]; + + config = config_get_config(); + + for (i = 0; i < config->threadpool_size; i++) { + snprintf(buff, 64, "Connection Thread #%d", i); + tid = thread_create(buff, _handle_connection, NULL, THREAD_ATTACHED); + _push_thread(&_conhands, tid); + } +} + +static void _destroy_pool(void) +{ + long id; + int i; + + i = 0; + + thread_cond_broadcast(&_pool_cond); + id = _pop_thread(&_conhands); + while (id != -1) { + thread_join(id); + _signal_pool(); + id = _pop_thread(&_conhands); + } +} + +void connection_accept_loop(void) +{ + connection_t *con; + + _build_pool(); + + while (global.running == ICE_RUNNING) { + con = _accept_connection(); + + if (con) { + _add_connection(con); + _signal_pool(); + } + } + + _destroy_pool(); + + /* wait for all the sources to shutdown */ + thread_rwlock_wlock(&_source_shutdown_rwlock); + thread_rwlock_unlock(&_source_shutdown_rwlock); +} + +static connection_t *_get_connection(void) +{ + con_queue_t *node = NULL; + con_queue_t *oldnode = NULL; + connection_t *con = NULL; + + thread_mutex_lock(&_queue_mutex); + if (_queue) { + node = _queue; + while (node->next) { + oldnode = node; + node = node->next; + } + + /* node is now the last node + ** and oldnode is the previous one, or NULL + */ + if (oldnode) oldnode->next = NULL; + else (_queue) = NULL; + } + thread_mutex_unlock(&_queue_mutex); + + if (node) { + con = node->con; + free(node); + } + + return con; +} + +static void *_handle_connection(void *arg) +{ + char header[4096]; + connection_t *con; + http_parser_t *parser; + source_t *source; + stats_connection_t *stats; + avl_node *node; + http_var_t *var; + client_t *client; + int bytes; + + while (global.running == ICE_RUNNING) { + memset(header, 0, 4096); + + thread_cond_wait(&_pool_cond); + if (global.running != ICE_RUNNING) break; + + /* grab a connection and set the socket to blocking */ + con = _get_connection(); + + stats_event_inc(NULL, "connections"); + + sock_set_blocking(con->sock, SOCK_BLOCK); + + /* fill header with the http header */ + if (util_read_header(con->sock, header, 4096) == 0) { + /* either we didn't get a complete header, or we timed out */ + connection_close(con); + continue; + } + + parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + if (httpp_parse(parser, header, strlen(header))) { + /* handle the connection or something */ + + if (strcmp("ice", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) != 0 && strcmp("http", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) != 0) { + printf("DEBUG: bad protocol\n"); + connection_close(con); + httpp_destroy(parser); + continue; + } + + if (parser->req_type == httpp_req_source) { + printf("DEBUG: source logging in\n"); + stats_event_inc(NULL, "source_connections"); + + if (strcmp((httpp_getvar(parser, "ice-password") != NULL) ? httpp_getvar(parser, "ice-password") : "", (config_get_config()->source_password != NULL) ? config_get_config()->source_password : "") != 0) { + printf("DEBUG: bad password\n"); + INFO1("Source (%s) attempted to login with bad password", httpp_getvar(parser, HTTPP_VAR_URI)); + connection_close(con); + httpp_destroy(parser); + continue; + } + + global_lock(); + if (global.sources >= config_get_config()->source_limit) { + printf("TOO MANY SOURCE, KICKING THIS ONE\n"); + INFO1("Source (%d) logged in, but there are too many sources", httpp_getvar(parser, HTTPP_VAR_URI)); + connection_close(con); + httpp_destroy(parser); + global_unlock(); + continue; + } + global.sources++; + global_unlock(); + + stats_event_inc(NULL, "sources"); + + source = source_create(con, parser, httpp_getvar(parser, HTTPP_VAR_URI), FORMAT_TYPE_VORBIS); + source->shutdown_rwlock = &_source_shutdown_rwlock; + + sock_set_blocking(con->sock, SOCK_NONBLOCK); + + thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED); + + continue; + } else if (parser->req_type == httpp_req_stats) { + printf("DEBUG: stats connection...\n"); + stats_event_inc(NULL, "stats_connections"); + + if (strcmp((httpp_getvar(parser, "ice-password") != NULL) ? httpp_getvar(parser, "ice-password") : "", (config_get_config()->source_password != NULL) ? config_get_config()->source_password : "") != 0) { + printf("DEBUG: bad password\n"); + connection_close(con); + httpp_destroy(parser); + continue; + } + + stats_event_inc(NULL, "stats"); + + /* create stats connection and create stats handler thread */ + stats = (stats_connection_t *)malloc(sizeof(stats_connection_t)); + stats->parser = parser; + stats->con = con; + + thread_create("Stats Connection", stats_connection, (void *)stats, THREAD_DETACHED); + + continue; + } else if (parser->req_type == httpp_req_play || parser->req_type == httpp_req_get) { + printf("DEBUG: client coming in...\n"); + + /* make a client */ + client = client_create(con, parser); + stats_event_inc(NULL, "client_connections"); + + /* there are several types of HTTP GET clients + ** media clients, which are looking for a source (eg, URI = /stream.ogg) + ** stats clients, which are looking for /stats.xml + ** and director server authorizers, which are looking for /GUID-xxxxxxxx (where xxxxxx is the GUID in question + ** we need to handle the latter two before the former, as the latter two + ** aren't subject to the limits. + */ + // TODO: add GUID-xxxxxx + if (strcmp(httpp_getvar(parser, HTTPP_VAR_URI), "/stats.xml") == 0) { + printf("sending stats.xml\n"); + stats_sendxml(client); + continue; + } + + global_lock(); + if (global.clients >= config_get_config()->client_limit) { + if (parser->req_type == httpp_req_get) { + client->respcode = 504; + bytes = sock_write(client->con->sock, "HTTP/1.0 504 Server Full\r\nContent-Type: text/html\r\n\r\n"\ + "The server is already full. Try again later.\r\n"); + if (bytes > 0) client->con->sent_bytes = bytes; + } + client_destroy(client); + global_unlock(); + continue; + } + global_unlock(); + + avl_tree_rlock(global.source_tree); + source = source_find_mount(httpp_getvar(parser, HTTPP_VAR_URI)); + if (source) { + printf("DEBUG: source found for client\n"); + + global_lock(); + if (global.clients >= config_get_config()->client_limit) { + if (parser->req_type == httpp_req_get) { + client->respcode = 504; + bytes = sock_write(client->con->sock, "HTTP/1.0 504 Server Full\r\nContent-Type: text/html\r\n\r\n"\ + "The server is already full. Try again later.\r\n"); + if (bytes > 0) client->con->sent_bytes = bytes; + } + client_destroy(client); + global_unlock(); + continue; + } + global.clients++; + global_unlock(); + + if (parser->req_type == httpp_req_get) { + client->respcode = 200; + sock_write(client->con->sock, "HTTP/1.0 200 OK\r\nContent-Type: application/x-ogg\r\n"); + /* iterate through source http headers and send to client */ + avl_tree_rlock(source->parser->vars); + node = avl_get_first(source->parser->vars); + while (node) { + var = (http_var_t *)node->key; + if (strcasecmp(var->name, "ice-password") && !strncasecmp("ice-", var->name, 4)) { + printf("DEBUG: sending %s: %s\n", var->name, var->value); + sock_write(client->con->sock, "%s: %s\r\n", var->name, var->value); + } + node = avl_get_next(node); + } + avl_tree_unlock(source->parser->vars); + + sock_write(client->con->sock, "\r\n"); + + sock_set_blocking(client->con->sock, SOCK_NONBLOCK); + } + + avl_tree_wlock(source->pending_tree); + avl_insert(source->pending_tree, (void *)client); + avl_tree_unlock(source->pending_tree); + } + + avl_tree_unlock(global.source_tree); + + if (!source) { + printf("DEBUG: source not found for client\n"); + if (parser->req_type == httpp_req_get) { + client->respcode = 404; + bytes = sock_write(client->con->sock, "HTTP/1.0 404 Source Not Found\r\nContent-Type: text/html\r\n\r\n"\ + "The source you requested could not be found.\r\n"); + if (bytes > 0) client->con->sent_bytes = bytes; + } + client_destroy(client); + } + + continue; + } else { + printf("DEBUG: wrong request type\n"); + connection_close(con); + httpp_destroy(parser); + continue; + } + } else { + printf("DEBUG: parsing failed\n"); + connection_close(con); + httpp_destroy(parser); + continue; + } + } + + thread_exit(0); + + return NULL; +} + +void connection_close(connection_t *con) +{ + sock_close(con->sock); + if (con->ip) free(con->ip); + if (con->host) free(con->host); + free(con); +} diff --git a/src/connection.h b/src/connection.h new file mode 100644 index 00000000..c39e9d0a --- /dev/null +++ b/src/connection.h @@ -0,0 +1,23 @@ +#ifndef __CONNECTION_H__ +#define __CONNECTION_H__ + +typedef struct connection_tag +{ + unsigned long id; + + time_t con_time; + long long sent_bytes; + + int sock; + int error; + + char *ip; + char *host; +} connection_t; + +void connection_initialize(void); +void connection_shutdown(void); +void connection_accept_loop(void); +void connection_close(connection_t *con); + +#endif /* __CONNECTION_H__ */ diff --git a/src/format.c b/src/format.c new file mode 100644 index 00000000..159d7aa1 --- /dev/null +++ b/src/format.c @@ -0,0 +1,31 @@ +/* format.c +** +** format plugin implementation +** +*/ + +#include +#include + +#include "connection.h" +#include "refbuf.h" + +#include "format.h" + +#include "format_vorbis.h" + +format_plugin_t *format_get_plugin(format_type_t type) +{ + format_plugin_t *plugin; + + switch (type) { + case FORMAT_TYPE_VORBIS: + plugin = format_vorbis_get_plugin(); + break; + default: + plugin = NULL; + break; + } + + return plugin; +} diff --git a/src/format.h b/src/format.h new file mode 100644 index 00000000..6e7d596d --- /dev/null +++ b/src/format.h @@ -0,0 +1,40 @@ +/* format.h +** +** format plugin header +** +*/ +#ifndef __FORMAT_H__ +#define __FORMAT_H__ + +typedef enum _format_type_tag +{ + FORMAT_TYPE_VORBIS, + FORMAT_TYPE_MP3 +} format_type_t; + +typedef struct _format_plugin_tag +{ + format_type_t type; + + /* set this is the data format has a header that + ** we must send before regular data + */ + int has_predata; + + refbuf_t *(*get_buffer)(struct _format_plugin_tag *self, char *data, unsigned long len); + refbuf_queue_t *(*get_predata)(struct _format_plugin_tag *self); + + /* for internal state management */ + void *_state; +} format_plugin_t; + +format_plugin_t *format_get_plugin(format_type_t type); + +#endif /* __FORMAT_H__ */ + + + + + + + diff --git a/src/format_vorbis.c b/src/format_vorbis.c new file mode 100644 index 00000000..2578ab6e --- /dev/null +++ b/src/format_vorbis.c @@ -0,0 +1,120 @@ +/* format_vorbis.c +** +** format plugin for vorbis +** +*/ + +#include +#include +#include + +#include +#include + +#include "refbuf.h" + +#include "format.h" + +typedef struct _vstate_tag +{ + ogg_sync_state oy; + ogg_page og; + unsigned long serialno; + int header; + refbuf_t *headbuf[10]; +} vstate_t; + +refbuf_t *format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long len); +refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self); + +format_plugin_t *format_vorbis_get_plugin(void) +{ + format_plugin_t *plugin; + vstate_t *state; + + plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t)); + plugin->type = FORMAT_TYPE_VORBIS; + plugin->has_predata = 1; + plugin->get_buffer = format_vorbis_get_buffer; + plugin->get_predata = format_vorbis_get_predata; + + state = (vstate_t *)calloc(1, sizeof(vstate_t)); + ogg_sync_init(&state->oy); + + plugin->_state = (void *)state; + + return plugin; +} + +refbuf_t *format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long len) +{ + char *buffer; + refbuf_t *refbuf; + int i; + vstate_t *state = (vstate_t *)self->_state; + + if (data) { + /* write the data to the buffer */ + buffer = ogg_sync_buffer(&state->oy, len); + memcpy(buffer, data, len); + ogg_sync_wrote(&state->oy, len); + } + + refbuf = NULL; + if (ogg_sync_pageout(&state->oy, &state->og) == 1) { + refbuf = refbuf_new(state->og.header_len + state->og.body_len); + memcpy(refbuf->data, state->og.header, state->og.header_len); + memcpy(&refbuf->data[state->og.header_len], state->og.body, state->og.body_len); + + if (state->serialno != ogg_page_serialno(&state->og)) { + /* this is a new logical bitstream */ + state->header = 0; + for (i = 0; i < 10; i++) { + if (state->headbuf[i]) { + refbuf_release(state->headbuf[i]); + state->headbuf[i] = NULL; + } + } + + state->serialno = ogg_page_serialno(&state->og); + } + + if (state->header >= 0) { + if (ogg_page_granulepos(&state->og) == 0) { + state->header++; + } else { + state->header = 0; + } + } + + /* cache first three pages */ + if (state->header) { + refbuf_addref(refbuf); + state->headbuf[state->header - 1] = refbuf; + } + } + + return refbuf; +} + +refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self) +{ + refbuf_queue_t *queue; + int i; + vstate_t *state = (vstate_t *)self->_state; + + queue = NULL; + for (i = 0; i < 10; i++) { + if (state->headbuf[i]) { + refbuf_addref(state->headbuf[i]); + refbuf_queue_add(&queue, state->headbuf[i]); + } else { + break; + } + } + + return queue; +} + + + diff --git a/src/format_vorbis.h b/src/format_vorbis.h new file mode 100644 index 00000000..a8692dac --- /dev/null +++ b/src/format_vorbis.h @@ -0,0 +1,11 @@ +/* format_vorbis.h +** +** vorbis format plugin header +** +*/ +#ifndef __FORMAT_VORBIS_H__ +#define __FORMAT_VORBIS_H__ + +format_plugin_t *format_vorbis_get_plugin(void); + +#endif /* __FORMAT_VORBIS_H__ */ diff --git a/src/global.c b/src/global.c new file mode 100644 index 00000000..1c919fdb --- /dev/null +++ b/src/global.c @@ -0,0 +1,40 @@ +#include "thread.h" +#include "avl.h" + +#include "httpp.h" +#include "connection.h" +#include "refbuf.h" +#include "format.h" +#include "source.h" + +#include "global.h" + +ice_global_t global; + +static mutex_t _global_mutex; + +void global_initialize(void) +{ + global.serversock = 0; + global.running = 0; + global.clients = 0; + global.sources = 0; + global.source_tree = avl_tree_new(source_compare_sources, NULL); + thread_mutex_create(&_global_mutex); +} + +void global_shutdown(void) +{ + thread_mutex_destroy(&_global_mutex); + avl_tree_free(global.source_tree, source_free_source); +} + +void global_lock(void) +{ + thread_mutex_lock(&_global_mutex); +} + +void global_unlock(void) +{ + thread_mutex_unlock(&_global_mutex); +} diff --git a/src/global.h b/src/global.h new file mode 100644 index 00000000..e15ca8c4 --- /dev/null +++ b/src/global.h @@ -0,0 +1,28 @@ +#ifndef __GLOBAL_H__ +#define __GLOBAL_H__ + +#define ICE_LISTEN_QUEUE 5 + +#define ICE_RUNNING 1 +#define ICE_HALTING 2 + +typedef struct ice_global_tag +{ + int serversock; + + int running; + + int sources; + int clients; + + avl_tree *source_tree; +} ice_global_t; + +extern ice_global_t global; + +void global_initialize(void); +void global_shutdown(void); +void global_lock(void); +void global_unlock(void); + +#endif /* __GLOBAL_H__ */ diff --git a/src/logging.c b/src/logging.c new file mode 100644 index 00000000..c9596af1 --- /dev/null +++ b/src/logging.c @@ -0,0 +1,71 @@ +#include +#include + +#include "thread.h" +#include "httpp.h" +#include "log.h" + +#include "connection.h" +#include "refbuf.h" +#include "client.h" + +#include "logging.h" + +/* the global log descriptors */ +int errorlog; +int accesslog; + +/* +** ADDR USER AUTH DATE REQUEST CODE BYTES REFERER AGENT [TIME] +** +** ADDR = client->con->ip +** USER = - +** we should do this for real once we support authentication +** AUTH = - +** DATE = _make_date(client->con->con_time) +** REQUEST = build from client->parser +** CODE = client->respcode +** BYTES = client->con->sent_bytes +** REFERER = get from client->parser +** AGENT = get from client->parser +** TIME = timing_get_time() - client->con->con_time +*/ +void logging_access(client_t *client) +{ + char datebuf[128]; + char reqbuf[1024]; + struct tm *thetime; + time_t now; + time_t stayed; + + now = time(NULL); + + /* build the data */ + /* TODO: localtime is not threadsafe on all platforms + ** we should probably use localtime_r if it's available + */ + PROTECT_CODE(thetime = localtime(&now); strftime(datebuf, 128, LOGGING_FORMAT_CLF, thetime)) + + /* build the request */ + snprintf(reqbuf, 1024, "%s %s %s/%s", httpp_getvar(client->parser, HTTPP_VAR_REQ_TYPE), httpp_getvar(client->parser, HTTPP_VAR_URI), + httpp_getvar(client->parser, HTTPP_VAR_PROTOCOL), httpp_getvar(client->parser, HTTPP_VAR_VERSION)); + + stayed = now - client->con->con_time; + + log_write_direct(accesslog, "%s - - [%s] \"%s\" %d %lld \"%s\" \"%s\" %d", + client->con->ip, + datebuf, + reqbuf, + client->respcode, + client->con->sent_bytes, + (httpp_getvar(client->parser, "referer") != NULL) ? httpp_getvar(client->parser, "referer") : "-", + (httpp_getvar(client->parser, "user-agent") != NULL) ? httpp_getvar(client->parser, "user-agent") : "-", + (int)stayed); +} + + + + + + + diff --git a/src/logging.h b/src/logging.h new file mode 100644 index 00000000..4e6e0149 --- /dev/null +++ b/src/logging.h @@ -0,0 +1,80 @@ +#ifndef __LOGGING_H__ +#define __LOGGING_H__ + +/* declare the global log descriptors */ + +extern int errorlog; +extern int accesslog; + +/* these are all ERRORx and WARNx where _x_ is the number of parameters +** it takes. it turns out most other copmilers don't have support for +** varargs macros. that totally sucks, but this is still pretty easy. +** +** feel free to add more here if needed. +*/ + +#define ERROR0(y) log_write(errorlog, 1, CATMODULE "/" __FUNCTION__, y) +#define ERROR1(y, a) log_write(errorlog, 1, CATMODULE "/" __FUNCTION__, y, a) +#define ERROR2(y, a, b) log_write(errorlog, 1, CATMODULE "/" __FUNCTION__, y, a, b) +#define ERROR3(y, a, b, c) log_write(errorlog, 1, CATMODULE "/" __FUNCTION__, y, a, b, c) + +#define WARN0(y) log_write(errorlog, 2, CATMODULE "/" __FUNCTION__, y) +#define WARN1(y, a) log_write(errorlog, 2, CATMODULE "/" __FUNCTION__, y, a) +#define WARN2(y, a, b) log_write(errorlog, 2, CATMODULE "/" __FUNCTION__, y, a, b) +#define WARN3(y, a, b, c) log_write(errorlog, 2, CATMODULE "/" __FUNCTION__, y, a, b, c) + +#define INFO0(y) log_write(errorlog, 3, CATMODULE "/" __FUNCTION__, y) +#define INFO1(y, a) log_write(errorlog, 3, CATMODULE "/" __FUNCTION__, y, a) +#define INFO2(y, a, b) log_write(errorlog, 3, CATMODULE "/" __FUNCTION__, y, a, b) +#define INFO3(y, a, b, c) log_write(errorlog, 3, CATMODULE "/" __FUNCTION__, y, a, b, c) + +#define DEBUG0(y) log_write(errorlog, 4, CATMODULE "/" __FUNCTION__, y) +#define DEBUG1(y, a) log_write(errorlog, 4, CATMODULE "/" __FUNCTION__, y, a) +#define DEBUG2(y, a, b) log_write(errorlog, 4, CATMODULE "/" __FUNCTION__, y, a, b) +#define DEBUG3(y, a, b, c) log_write(errorlog, 4, CATMODULE "/" __FUNCTION__, y, a, b, c) + +/* CATMODULE is the category or module that logging messages come from. +** we set one here in cause someone forgets in the .c file. +*/ +/*#define CATMODULE "unknown" + */ + +/* this is the logging call to write entries to the access_log +** the combined log format is: +** ADDR USER AUTH DATE REQUEST CODE BYTES REFERER AGENT [TIME] +** ADDR = ip address of client +** USER = username if authenticated +** AUTH = auth type, not used, and set to "-" +** DATE = date in "[30/Apr/2001:01:25:34 -0700]" format +** REQUEST = request, ie "GET /live.ogg HTTP/1.0" +** CODE = response code, ie, 200 or 404 +** BYTES = total bytes of data sent (other than headers) +** REFERER = the refering URL +** AGENT = the user agent +** +** for icecast, we add on extra field at the end, which will be +** ignored by normal log parsers +** +** TIME = seconds that the connection lasted +** +** this allows you to get bitrates (BYTES / TIME) +** and figure out exact times of connections +** +** it should be noted also that events are sent on client disconnect, +** so the DATE is the timestamp of disconnection. DATE - TIME is the +** time of connection. +*/ + +#define LOGGING_FORMAT_CLF "%d/%b/%Y:%H:%M:%S %z" + +void logging_access(client_t *client); + +#endif /* __LOGGING_H__ */ + + + + + + + + diff --git a/src/main.c b/src/main.c new file mode 100644 index 00000000..632c1f6f --- /dev/null +++ b/src/main.c @@ -0,0 +1,233 @@ +#include +#include + +#include "thread.h" +#include "avl.h" +#include "log.h" +#include "sock.h" +#include "resolver.h" +#include "httpp.h" + + +#include "config.h" +#include "sighandler.h" + +#include "global.h" +#include "os.h" +#include "connection.h" +#include "refbuf.h" +#include "client.h" +#include "stats.h" +#include "logging.h" + +#undef CATMODULE +#define CATMODULE "main" + +void _print_usage() +{ + printf("icecast 2.0 usage:\n"); + printf("\t-c \t\tSpecify configuration file\n"); + printf("\n"); +} + +void _initialize_subsystems(void) +{ + log_initialize(); + thread_initialize(); + sock_initialize(); + resolver_initialize(); + config_initialize(); + connection_initialize(); + global_initialize(); + stats_initialize(); + refbuf_initialize(); +} + +void _shutdown_subsystems(void) +{ + refbuf_shutdown(); + stats_shutdown(); + global_shutdown(); + connection_shutdown(); + config_shutdown(); + resolver_shutdown(); + sock_shutdown(); + thread_shutdown(); + log_shutdown(); +} + +int _parse_config_file(int argc, char **argv, char *filename, int size) +{ + int i = 1; + + if (argc < 3) return 0; + + while (i < argc) { + if (strcmp(argv[i], "-c") == 0) { + if (i + 1 < argc) { + strncpy(filename, argv[i + 1], size); + return 1; + } else { + return -1; + } + } + i++; + } + + return 0; +} + +int _start_logging(void) +{ + char fn_error[FILENAME_MAX]; + char fn_access[FILENAME_MAX]; + ice_config_t *config = config_get_config(); + + snprintf(fn_error, FILENAME_MAX, "%s%s%s", config->log_dir, PATH_SEPARATOR, config->error_log); + snprintf(fn_access, FILENAME_MAX, "%s%s%s", config->log_dir, PATH_SEPARATOR, config->access_log); + + errorlog = log_open(fn_error); + accesslog = log_open(fn_access); + + log_set_level(errorlog, 4); + log_set_level(accesslog, 4); + + if (errorlog < 0) + fprintf(stderr, "FATAL: could not open %s for error logging\n", fn_error); + if (accesslog < 0) + fprintf(stderr, "FATAL: could not open %s for access logging\n", fn_access); + + if (errorlog >= 0 && accesslog >= 0) return 1; + + return 0; +} + +void _stop_logging(void) +{ + log_close(errorlog); + log_close(accesslog); +} + +int _setup_socket(void) +{ + ice_config_t *config; + + config = config_get_config(); + + global.serversock = sock_get_server_socket(config->port, config->bind_address); + if (global.serversock == SOCK_ERROR) + return 0; + + return 1; +} + +int _start_listening(void) +{ + if (sock_listen(global.serversock, ICE_LISTEN_QUEUE) == SOCK_ERROR) + return 0; + + sock_set_blocking(global.serversock, SOCK_NONBLOCK); + + return 1; +} + +/* this is the heart of the beast */ +void _server_proc(void) +{ + if (!_setup_socket()) { + ERROR1("Could not create listener socket on port %d", config_get_config()->port); + return; + } + + if (!_start_listening()) { + ERROR0("Failed trying to listen on server socket"); + return; + } + + connection_accept_loop(); + + sock_close(global.serversock); +} + +int main(int argc, char **argv) +{ + int res, ret; + char filename[256]; + + /* startup all the modules */ + _initialize_subsystems(); + + /* setup the default signal handlers */ + sighandler_initialize(); + + /* parse the '-c icecast.xml' option + ** only, so that we can read a configfile + */ + res = _parse_config_file(argc, argv, filename, 256); + if (res == 1) { + /* parse the config file */ + ret = config_parse_file(filename); + if (ret < 0) { + fprintf(stderr, "FATAL: error parsing config file:"); + switch (ret) { + case CONFIG_EINSANE: + fprintf(stderr, "filename was null or blank\n"); + break; + case CONFIG_ENOROOT: + fprintf(stderr, "no root element found\n"); + break; + case CONFIG_EBADROOT: + fprintf(stderr, "root element is not \n"); + break; + default: + fprintf(stderr, "parse error\n"); + break; + } + } + } else if (res == -1) { + fprintf(stderr, "FATAL: -c option must have a filename\n"); + _print_usage(); + _shutdown_subsystems(); + return 1; + } + + /* override config file options with commandline options */ + config_parse_cmdline(argc, argv); + + if (!_start_logging()) { + fprintf(stderr, "FATAL: Could not start logging\n"); + _shutdown_subsystems(); + return 1; + } + + INFO0("icecast server started"); + + /* REM 3D Graphics */ + + /* let her rip */ + global.running = ICE_RUNNING; + _server_proc(); + + INFO0("Shutting down"); + + _stop_logging(); + + _shutdown_subsystems(); + + return 0; +} + + + + + + + + + + + + + + + diff --git a/src/os.h b/src/os.h new file mode 100644 index 00000000..ff9c2064 --- /dev/null +++ b/src/os.h @@ -0,0 +1,16 @@ +#ifndef __OS_H__ +#define __OS_H__ + +#ifdef _WIN32 +#include +#else +#include +#endif + +#ifdef _WIN32 +#define PATH_DEPARATOR "\\" +#else +#define PATH_SEPARATOR "/" +#endif + +#endif /* __GLOBALS_H__ */ diff --git a/src/refbuf.c b/src/refbuf.c new file mode 100644 index 00000000..39d9c2d0 --- /dev/null +++ b/src/refbuf.c @@ -0,0 +1,115 @@ +/* refbuf.c +** +** reference counting buffer implementation +** +*/ + +#include +#include + +#include "thread.h" + +#include "refbuf.h" + +mutex_t _refbuf_mutex; + +void refbuf_initialize(void) +{ + thread_mutex_create(&_refbuf_mutex); +} + +void refbuf_shutdown(void) +{ + thread_mutex_destroy(&_refbuf_mutex); +} + +refbuf_t *refbuf_new(unsigned long size) +{ + refbuf_t *refbuf; + + refbuf = (refbuf_t *)malloc(sizeof(refbuf_t)); + refbuf->data = (void *)malloc(size); + refbuf->len = size; + refbuf->_count = 1; + + return refbuf; +} + +void refbuf_addref(refbuf_t *self) +{ + thread_mutex_lock(&_refbuf_mutex); + self->_count++; + thread_mutex_unlock(&_refbuf_mutex); +} + +void refbuf_release(refbuf_t *self) +{ + thread_mutex_lock(&_refbuf_mutex); + self->_count--; + if (self->_count == 0) { + free(self->data); + free(self); + } + thread_mutex_unlock(&_refbuf_mutex); +} + +void refbuf_queue_add(refbuf_queue_t **queue, refbuf_t *refbuf) +{ + refbuf_queue_t *node; + refbuf_queue_t *item = (refbuf_queue_t *)malloc(sizeof(refbuf_queue_t)); + + item->refbuf = refbuf; + item->next = NULL; + + if (*queue == NULL) { + *queue = item; + } else { + node = *queue; + while (node->next) node = node->next; + node->next = item; + } +} + +refbuf_t *refbuf_queue_remove(refbuf_queue_t **queue) +{ + refbuf_queue_t *item; + refbuf_t *refbuf; + + if (*queue == NULL) return NULL; + + item = *queue; + *queue = item->next; + item->next = NULL; + + refbuf = item->refbuf; + item->refbuf = NULL; + + free(item); + + return refbuf; +} + +void refbuf_queue_insert(refbuf_queue_t **queue, refbuf_t *refbuf) +{ + refbuf_queue_t *item = (refbuf_queue_t *)malloc(sizeof(refbuf_queue_t)); + + item->refbuf = refbuf; + item->next = *queue; + *queue = item; +} + +int refbuf_queue_size(refbuf_queue_t **queue) +{ + refbuf_queue_t *node = *queue; + int size = 0; + + while (node) { + node = node->next; + size++; + } + + return size; +} + + + diff --git a/src/refbuf.h b/src/refbuf.h new file mode 100644 index 00000000..c1d1c1f4 --- /dev/null +++ b/src/refbuf.h @@ -0,0 +1,44 @@ +/* refbuf.h +** +** reference counting data buffer +** +*/ +#ifndef __REFBUF_H__ +#define __REFBUF_H__ + +typedef struct _refbuf_tag +{ + char *data; + long len; + + unsigned long _count; +} refbuf_t; + +typedef struct _refbuf_queue_tag +{ + refbuf_t *refbuf; + + struct _refbuf_queue_tag *next; +} refbuf_queue_t; + +void refbuf_initialize(void); +void refbuf_shutdown(void); + +refbuf_t *refbuf_new(unsigned long size); +void refbuf_addref(refbuf_t *self); +void refbuf_release(refbuf_t *self); + +void refbuf_queue_add(refbuf_queue_t **queue, refbuf_t *refbuf); +refbuf_t *refbuf_queue_remove(refbuf_queue_t **queue); +void refbuf_queue_insert(refbuf_queue_t **queue, refbuf_t *refbuf); +int refbuf_queue_size(refbuf_queue_t **queue); + +#endif /* __REFBUF_H__ */ + + + + + + + + diff --git a/src/sighandler.c b/src/sighandler.c new file mode 100644 index 00000000..56b28416 --- /dev/null +++ b/src/sighandler.c @@ -0,0 +1,57 @@ +#include + +#include "thread.h" +#include "avl.h" +#include "log.h" +#include "httpp.h" + +#include "global.h" +#include "connection.h" +#include "refbuf.h" +#include "client.h" +#include "logging.h" + +#include "sighandler.h" + +#define CATMODULE "sighandler" + +#ifndef _WIN32 +void _sig_hup(int signo); +void _sig_die(int signo); +#endif + +void sighandler_initialize(void) +{ +#ifndef _WIN32 + signal(SIGHUP, _sig_hup); + signal(SIGINT, _sig_die); + signal(SIGTERM, _sig_die); + signal(SIGPIPE, SIG_IGN); +#endif +} + +#ifndef _WIN32 + +void _sig_hup(int signo) +{ + INFO1("Caught signal %d, rehashing config and reopening logfiles...", signo); + + /* reread config file */ + + /* reopen logfiles */ + +#ifdef __linux__ + /* linux requires us to reattach the signal handler */ + signal(SIGHUP, _sig_hup); +#endif +} + +void _sig_die(int signo) +{ + INFO1("Caught signal %d, shutting down...", signo); + + /* inform the server to start shutting down */ + global.running = ICE_HALTING; +} + +#endif diff --git a/src/sighandler.h b/src/sighandler.h new file mode 100644 index 00000000..e9dd6073 --- /dev/null +++ b/src/sighandler.h @@ -0,0 +1,8 @@ +#ifndef __SIGHANDLER_H__ +#define __SIGHANDLER_H__ + + +void sighandler_initialize(void); + + +#endif /* __SIGHANDLER_H__ */ diff --git a/src/source.c b/src/source.c new file mode 100644 index 00000000..1b310825 --- /dev/null +++ b/src/source.c @@ -0,0 +1,364 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "thread.h" +#include "avl.h" +#include "httpp.h" +#include "sock.h" + +#include "connection.h" +#include "global.h" +#include "refbuf.h" +#include "client.h" +#include "stats.h" +#include "format.h" + +#include "source.h" + +/* avl tree helper */ +static int _compare_clients(void *compare_arg, void *a, void *b); +static int _remove_client(void *key); +static int _free_client(void *key); + +source_t *source_create(connection_t *con, http_parser_t *parser, const char *mount, format_type_t type) +{ + source_t *src; + + src = (source_t *)malloc(sizeof(source_t)); + src->format = format_get_plugin(type); + src->con = con; + src->parser = parser; + src->mount = (char *)strdup(mount); + src->client_tree = avl_tree_new(_compare_clients, NULL); + src->pending_tree = avl_tree_new(_compare_clients, NULL); + + return src; +} + +/* you must already have a read lock on the global source tree +** to call this function +*/ +source_t *source_find_mount(const char *mount) +{ + source_t *source; + avl_node *node; + int cmp; + + /* get the root node */ + node = global.source_tree->root->right; + + while (node) { + source = (source_t *)node->key; + cmp = strcmp(mount, source->mount); + if (cmp < 0) + node = node->left; + else if (cmp > 0) + node = node->right; + else + return source; + } + + /* didn't find it */ + return NULL; +} + +int source_compare_sources(void *arg, void *a, void *b) +{ + source_t *srca = (source_t *)a; + source_t *srcb = (source_t *)b; + + return strcmp(srca->mount, srcb->mount); +} + +int source_free_source(void *key) +{ + source_t *source = (source_t *)key; + + free(source->mount); + connection_close(source->con); + httpp_destroy(source->parser); + avl_tree_free(source->pending_tree, _free_client); + avl_tree_free(source->client_tree, _free_client); + free(source); + + return 1; +} + + +void *source_main(void *arg) +{ + source_t *source = (source_t *)arg; + char buffer[4096]; + long bytes, sbytes; + client_t *client; + avl_node *client_node; + + refbuf_t *refbuf, *abuf; + int data_done; + + fd_set rfds; + struct timeval tv; + + int listeners = 0; + + /* grab a read lock, to make sure we get a chance to cleanup */ + thread_rwlock_rlock(source->shutdown_rwlock); + + /* get a write lock on the global source tree */ + avl_tree_wlock(global.source_tree); + /* insert source onto source tree */ + avl_insert(global.source_tree, (void *)source); + /* release write lock on global source tree */ + avl_tree_unlock(global.source_tree); + + + /* start off the statistics */ + stats_event(source->mount, "listeners", "0"); + + while (global.running == ICE_RUNNING) { + refbuf = source->format->get_buffer(source->format, NULL, 0); + while (refbuf == NULL) { + bytes = 0; + while (bytes <= 0) { + FD_ZERO(&rfds); + FD_SET(source->con->sock, &rfds); + tv.tv_sec = 0; + tv.tv_usec = 30000; + + select(source->con->sock + 1, &rfds, NULL, NULL, &tv); + + bytes = sock_read_bytes(source->con->sock, buffer, 4096); + if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) break; + } + if (bytes <= 0) break; + refbuf = source->format->get_buffer(source->format, buffer, bytes); + } + + if (bytes <= 0) { + printf("DEBUG: got 0 bytes reading data, the source must have disconnected...\n"); + break; + } + + /* we have a refbuf buffer, which a data block to be sent to + ** all clients. if a client is not able to send the buffer + ** immediately, it should store it on it's queue for the next + ** go around. + ** + ** instead of sending the current block, a client should send + ** all data in the cue, plus the current block, until either + ** it runs out of data, or it hits a recoverable error like + ** EAGAIN. this will allow a client that got slightly lagged + ** to catch back up if it can + */ + + /* acquire read lock on client_tree */ + avl_tree_rlock(source->client_tree); + + client_node = avl_get_first(source->client_tree); + while (client_node) { + /* acquire read lock on node */ + avl_node_wlock(client_node); + + client = (client_t *)client_node->key; + + data_done = 0; + + /* do we have any old buffers? */ + abuf = refbuf_queue_remove(&client->queue); + while (abuf) { + if (client->pos > 0) + bytes = abuf->len - client->pos; + else + bytes = abuf->len; + + sbytes = sock_write_bytes(client->con->sock, &abuf->data[client->pos], bytes); + if (sbytes > 0) client->con->sent_bytes += sbytes; + if (sbytes < bytes) { + if (!sock_recoverable(sock_error())) { + printf("SOURCE: Client had unrecoverable error catching up (%ld/%ld)\n", sbytes, bytes); + client->con->error = 1; + } else { + printf("SOURCE: client had recoverable error...\n"); + client->pos += sbytes; + /* put the refbuf back on top of the queue, since we didn't finish with it */ + refbuf_queue_insert(&client->queue, abuf); + } + + data_done = 1; + break; + } + + /* we're done with that refbuf, release it and reset the pos */ + refbuf_release(abuf); + client->pos = 0; + + abuf = refbuf_queue_remove(&client->queue); + } + + /* now send or queue the new data */ + if (data_done) { + refbuf_addref(refbuf); + refbuf_queue_add(&client->queue, refbuf); + } else { + sbytes = sock_write_bytes(client->con->sock, refbuf->data, refbuf->len); + if (sbytes > 0) client->con->sent_bytes += sbytes; + if (sbytes < refbuf->len) { + bytes = sock_error(); + if (!sock_recoverable(bytes)) { + printf("SOURCE: client had unrecoverable error %ld with new data (%ld/%ld)\n", bytes, sbytes, refbuf->len); + client->con->error = 1; + } else { + printf("SOURCE: recoverable error %ld\n", bytes); + client->pos = sbytes; + refbuf_addref(refbuf); + refbuf_queue_insert(&client->queue, refbuf); + } + } + } + + /* if the client is too slow, its queue will slowly build up. + ** we need to make sure the client is keeping up with the + ** data, so we'll kick any client who's queue gets to large. + ** the queue_limit might need to be tuned, but should work fine. + ** TODO: put queue_limit in a config file + */ + if (refbuf_queue_size(&client->queue) > 25) { + printf("SOURCE: client is too lagged... kicking\n"); + client->con->error = 1; + } + + /* release read lock on node */ + avl_node_unlock(client_node); + + /* get the next node */ + client_node = avl_get_next(client_node); + } + /* release read lock on client_tree */ + avl_tree_unlock(source->client_tree); + + refbuf_release(refbuf); + + /* acquire write lock on client_tree */ + avl_tree_wlock(source->client_tree); + + /** delete bad clients **/ + client_node = avl_get_first(source->client_tree); + while (client_node) { + client = (client_t *)client_node->key; + if (client->con->error) { + client_node = avl_get_next(client_node); + avl_delete(source->client_tree, (void *)client, _free_client); + listeners--; + global_lock(); + global.clients--; + global_unlock(); + stats_event_dec(NULL, "clients"); + stats_event_args(source->mount, "listeners", "%d", listeners); + printf("DEBUG: Client dropped...\n"); + continue; + } + client_node = avl_get_next(client_node); + } + + /* acquire write lock on pending_tree */ + avl_tree_wlock(source->pending_tree); + + /** add pending clients **/ + client_node = avl_get_first(source->pending_tree); + while (client_node) { + avl_insert(source->client_tree, client_node->key); + listeners++; + printf("Client added\n"); + stats_event_inc(NULL, "clients"); + stats_event_inc(source->mount, "connections"); + stats_event_args(source->mount, "listeners", "%d", listeners); + + /* we have to send cached headers for some data formats + ** this is where we queue up the buffers to send + */ + if (source->format->has_predata) { + client = (client_t *)client_node->key; + client->queue = source->format->get_predata(source->format); + } + + client_node = avl_get_next(client_node); + } + + /** clear pending tree **/ + while (avl_get_first(source->pending_tree)) { + avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, _remove_client); + } + + /* release write lock on pending_tree */ + avl_tree_unlock(source->pending_tree); + + /* release write lock on client_tree */ + avl_tree_unlock(source->client_tree); + } + + printf("DEBUG: we're going down...\n"); + + /* we need to empty the client and pending trees */ + avl_tree_wlock(source->pending_tree); + while (avl_get_first(source->pending_tree)) { + avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, _free_client); + } + avl_tree_unlock(source->pending_tree); + avl_tree_wlock(source->client_tree); + while (avl_get_first(source->client_tree)) { + avl_delete(source->client_tree, avl_get_first(source->client_tree)->key, _free_client); + } + avl_tree_unlock(source->client_tree); + + /* delete this sources stats */ + stats_event_dec(NULL, "sources"); + stats_event(source->mount, "listeners", NULL); + + printf("DEBUG: souce_main() is now exiting...\n"); + + global_lock(); + global.sources--; + global_unlock(); + + /* release our hold on the lock so the main thread can continue cleaning up */ + thread_rwlock_unlock(source->shutdown_rwlock); + + avl_tree_wlock(global.source_tree); + avl_delete(global.source_tree, source, source_free_source); + avl_tree_unlock(global.source_tree); + + thread_exit(0); + + return NULL; +} + +static int _compare_clients(void *compare_arg, void *a, void *b) +{ + connection_t *cona = (connection_t *)a; + connection_t *conb = (connection_t *)b; + + if (cona->id < conb->id) return -1; + if (cona->id > conb->id) return 1; + + return 0; +} + +static int _remove_client(void *key) +{ + return 1; +} + +static int _free_client(void *key) +{ + client_t *client = (client_t *)key; + + client_destroy(client); + + return 1; +} diff --git a/src/source.h b/src/source.h new file mode 100644 index 00000000..fff84dc5 --- /dev/null +++ b/src/source.h @@ -0,0 +1,26 @@ +#ifndef __SOURCE_H__ +#define __SOURCE_H__ + +typedef struct source_tag +{ + connection_t *con; + http_parser_t *parser; + + char *mount; + format_plugin_t *format; + + avl_tree *client_tree; + avl_tree *pending_tree; + + rwlock_t *shutdown_rwlock; +} source_t; + +source_t *source_create(connection_t *con, http_parser_t *parser, const char *mount, format_type_t type); +source_t *source_find_mount(const char *mount); +int source_compare_sources(void *arg, void *a, void *b); +int source_free_source(void *key); +void *source_main(void *arg); + +#endif + + diff --git a/src/stats.c b/src/stats.c new file mode 100644 index 00000000..4bb2ef22 --- /dev/null +++ b/src/stats.c @@ -0,0 +1,740 @@ +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "connection.h" + +#include "global.h" +#include "refbuf.h" +#include "client.h" +#include "stats.h" + + + +typedef struct _event_listener_tag +{ + stats_event_t **queue; + mutex_t *mutex; + + struct _event_listener_tag *next; +} event_listener_t; + +int _stats_running = 1; +long _stats_thread_id; + +stats_t _stats; +mutex_t _stats_mutex; + +stats_event_t *_global_event_queue; +mutex_t _global_event_mutex; + +cond_t _event_signal_cond; + +event_listener_t *_event_listeners; + + + +static void *_stats_thread(void *arg); +static int _compare_stats(void *a, void *b, void *arg); +static int _compare_source_stats(void *a, void *b, void *arg); +static int _free_stats(void *key); +static int _free_source_stats(void *key); +static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue); +static stats_node_t *_find_node(avl_tree *tree, char *name); +static stats_source_t *_find_source(avl_tree *tree, char *source); +static void _free_event(stats_event_t *event); + +void stats_initialize() +{ + /* set up global struct */ + _stats.global_tree = avl_tree_new(_compare_stats, NULL); + _stats.source_tree = avl_tree_new(_compare_source_stats, NULL); + + /* set up global mutex */ + thread_mutex_create(&_stats_mutex); + + /* set up event signaler */ + thread_cond_create(&_event_signal_cond); + + /* set up stats queues */ + _global_event_queue = NULL; + thread_mutex_create(&_global_event_mutex); + + /* fire off the stats thread */ + _stats_running = 1; + _stats_thread_id = thread_create("Stats Thread", _stats_thread, NULL, THREAD_ATTACHED); +} + +void stats_shutdown() +{ + + /* wait for thread to exit */ + _stats_running = 0; + thread_join(_stats_thread_id); + + /* free the queues */ + + /* destroy the queue mutexes */ + thread_mutex_destroy(&_global_event_mutex); + + /* tear it all down */ + thread_cond_destroy(&_event_signal_cond); + thread_mutex_destroy(&_stats_mutex); + avl_tree_free(_stats.source_tree, _free_source_stats); + avl_tree_free(_stats.global_tree, _free_stats); +} + +stats_t *stats_get_stats() +{ + // lock global stats + + // copy stats + + // unlock global stats + + // return copied stats + + return NULL; +} + +void stats_event(char *source, char *name, char *value) +{ + stats_event_t *node; + stats_event_t *event; + + if (name == NULL || strcmp(name, "") == 0) return; + + /* build event */ + event = (stats_event_t *)malloc(sizeof(stats_event_t)); + event->source = NULL; + if (source != NULL) event->source = (char *)strdup(source); + event->name = (char *)strdup(name); + event->value = NULL; + event->next = NULL; + if (value != NULL) event->value = (char *)strdup(value); + + /* queue event */ + thread_mutex_lock(&_global_event_mutex); + if (_global_event_queue == NULL) { + _global_event_queue = event; + } else { + node = _global_event_queue; + while (node->next) node = node->next; + node->next = event; + } + thread_mutex_unlock(&_global_event_mutex); +} + +void stats_event_args(char *source, char *name, char *format, ...) +{ + char buf[1024]; + va_list val; + + va_start(val, format); + vsnprintf(buf, 1024, format, val); + va_end(val); + + stats_event(source, name, buf); +} + +static char *_get_stats(char *source, char *name) +{ + stats_node_t *stats = NULL; + stats_source_t *src = NULL; + char *value = NULL; + + thread_mutex_lock(&_stats_mutex); + + if (source == NULL) { + stats = _find_node(_stats.global_tree, name); + } else { + src = _find_source(_stats.source_tree, source); + if (src) { + stats = _find_node(src->stats_tree, name); + } + } + + if (stats) value = (char *)strdup(stats->value); + + thread_mutex_unlock(&_stats_mutex); + + return value; +} + +void stats_event_inc(char *source, char *name) +{ + char *old_value; + int new_value; + + old_value = _get_stats(source, name); + if (old_value != NULL) { + new_value = atoi(old_value); + free(old_value); + new_value++; + } else { + new_value = 1; + } + + stats_event_args(source, name, "%d", new_value); +} + +void stats_event_add(char *source, char *name, unsigned long value) +{ + char *old_value; + unsigned long new_value; + + old_value = _get_stats(source, name); + if (old_value != NULL) { + new_value = atol(old_value); + free(old_value); + new_value += value; + } else { + new_value = value; + } + + stats_event_args(source, name, "%ld", new_value); +} + +void stats_event_dec(char *source, char *name) +{ + char *old_value; + int new_value; + + old_value = _get_stats(source, name); + if (old_value != NULL) { + new_value = atoi(old_value); + free(old_value); + new_value--; + if (new_value < 0) new_value = 0; + } else { + new_value = 0; + } + + stats_event_args(source, name, "%d", new_value); +} + +/* note: you must call this function only when you have exclusive access +** to the avl_tree +*/ +static stats_node_t *_find_node(avl_tree *stats_tree, char *name) +{ + stats_node_t *stats; + avl_node *node; + int cmp; + + /* get the root node */ + node = stats_tree->root->right; + + while (node) { + stats = (stats_node_t *)node->key; + cmp = strcmp(name, stats->name); + if (cmp < 0) + node = node->left; + else if (cmp > 0) + node = node->right; + else + return stats; + } + + /* didn't find it */ + return NULL; +} + +/* note: you must call this function only when you have exclusive access +** to the avl_tree +*/ +static stats_source_t *_find_source(avl_tree *source_tree, char *source) +{ + stats_source_t *stats; + avl_node *node; + int cmp; + + /* get the root node */ + node = source_tree->root->right; + while (node) { + stats = (stats_source_t *)node->key; + cmp = strcmp(source, stats->source); + if (cmp < 0) + node = node->left; + else if (cmp > 0) + node = node->right; + else + return stats; + } + + /* didn't find it */ + return NULL; +} + +static stats_event_t *_copy_event(stats_event_t *event) +{ + stats_event_t *copy = (stats_event_t *)malloc(sizeof(stats_event_t)); + if (event->source) + copy->source = (char *)strdup(event->source); + else + copy->source = NULL; + copy->name = (char *)strdup(event->name); + copy->value = (char *)strdup(event->value); + copy->next = NULL; + + return copy; +} + +static void *_stats_thread(void *arg) +{ + stats_event_t *event; + stats_event_t *copy; + stats_node_t *node; + stats_node_t *anode; + stats_source_t *snode; + stats_source_t *asnode; + event_listener_t *listener; + avl_node *avlnode; + + while (_stats_running) { + thread_mutex_lock(&_global_event_mutex); + if (_global_event_queue != NULL) { + /* grab the next event from the queue */ + event = _global_event_queue; + _global_event_queue = event->next; + event->next = NULL; + thread_mutex_unlock(&_global_event_mutex); + + thread_mutex_lock(&_stats_mutex); + if (event->source == NULL) { + /* we have a global event */ + if (event->value != NULL) { + /* adding/updating */ + node = _find_node(_stats.global_tree, event->name); + if (node == NULL) { + /* add node */ + anode = (stats_node_t *)malloc(sizeof(stats_node_t)); + anode->name = (char *)strdup(event->name); + anode->value = (char *)strdup(event->value); + + avl_insert(_stats.global_tree, (void *)anode); + } else { + /* update node */ + free(node->value); + node->value = (char *)strdup(event->value); + } + + } else { + /* we're deleting */ + node = _find_node(_stats.global_tree, event->name); + if (node != NULL) + avl_delete(_stats.global_tree, (void *)node, _free_stats); + } + } else { + /* we have a source event */ + + snode = _find_source(_stats.source_tree, event->source); + if (snode != NULL) { + /* this is a source we already have a tree for */ + if (event->value != NULL) { + /* we're add/updating */ + node = _find_node(snode->stats_tree, event->name); + if (node == NULL) { + /* adding node */ + anode = (stats_node_t *)malloc(sizeof(stats_node_t)); + anode->name = (char *)strdup(event->name); + anode->value = (char *)strdup(event->value); + + avl_insert(snode->stats_tree, (void *)anode); + } else { + /* updating node */ + free(node->value); + node->value = (char *)strdup(event->value); + } + } else { + /* we're deleting */ + node = _find_node(snode->stats_tree, event->name); + if (node != NULL) { + avl_delete(snode->stats_tree, (void *)node, _free_stats); + + avlnode = avl_get_first(snode->stats_tree); + if (avlnode == NULL) { + avl_delete(_stats.source_tree, (void *)snode, _free_source_stats); + } + } + } + } else { + /* this is a new source */ + asnode = (stats_source_t *)malloc(sizeof(stats_source_t)); + asnode->source = (char *)strdup(event->source); + asnode->stats_tree = avl_tree_new(_compare_stats, NULL); + + anode = (stats_node_t *)malloc(sizeof(stats_node_t)); + anode->name = (char *)strdup(event->name); + anode->value = (char *)strdup(event->value); + + avl_insert(asnode->stats_tree, (void *)anode); + + avl_insert(_stats.source_tree, (void *)asnode); + } + } + + /* now we have an event that's been processed into the running stats */ + /* this event should get copied to event listeners' queues */ + listener = _event_listeners; + while (listener) { + copy = _copy_event(event); + thread_mutex_lock(listener->mutex); + _add_event_to_queue(copy, listener->queue); + thread_mutex_unlock(listener->mutex); + + listener = listener->next; + } + thread_cond_broadcast(&_event_signal_cond); + + /* now we need to destroy the event */ + _free_event(event); + + thread_mutex_unlock(&_stats_mutex); + } else { + thread_mutex_unlock(&_global_event_mutex); + } + + thread_sleep(300000); + } + + thread_exit(0); + + return NULL; +} + +/* you must have the _stats_mutex locked here */ +static void _register_listener(stats_event_t **queue, mutex_t *mutex) +{ + event_listener_t *node; + event_listener_t *evli = (event_listener_t *)malloc(sizeof(event_listener_t)); + + evli->queue = queue; + evli->mutex = mutex; + evli->next = NULL; + + if (_event_listeners == NULL) { + _event_listeners = evli; + } else { + node = _event_listeners; + while (node->next) node = node->next; + node->next = evli; + } +} + +static stats_event_t *_make_event_from_node(stats_node_t *node, char *source) +{ + stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t)); + + if (source != NULL) + event->source = (char *)strdup(source); + else + event->source = NULL; + event->name = (char *)strdup(node->name); + event->value = (char *)strdup(node->value); + event->next = NULL; + + return event; +} + +static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue) +{ + stats_event_t *node; + + if (*queue == NULL) { + *queue = event; + } else { + node = *queue; + while (node->next) node = node->next; + node->next = event; + } +} + +static stats_event_t *_get_event_from_queue(stats_event_t **queue) +{ + stats_event_t *event; + + if (*queue == NULL) return NULL; + + event = *queue; + *queue = (*queue)->next; + event->next = NULL; + + return event; +} + +static int _send_event_to_client(stats_event_t *event, connection_t *con) +{ + int ret; + + /* send data to the client!!!! */ + ret = sock_write(con->sock, "EVENT %s %s %s\n", (event->source != NULL) ? event->source : "global", event->name, event->value); + + return (ret == -1) ? 0 : 1; +} + +void _dump_stats_to_queue(stats_event_t **queue) +{ + avl_node *node; + avl_node *node2; + stats_event_t *event; + stats_source_t *source; + + thread_mutex_lock(&_stats_mutex); + /* first we fill our queue with the current stats */ + /* start with the global stats */ + node = avl_get_first(_stats.global_tree); + while (node) { + event = _make_event_from_node((stats_node_t *)node->key, NULL); + _add_event_to_queue(event, queue); + + node = avl_get_next(node); + } + + /* now the stats for each source */ + node = avl_get_first(_stats.source_tree); + while (node) { + source = (stats_source_t *)node->key; + node2 = avl_get_first(source->stats_tree); + while (node2) { + event = _make_event_from_node((stats_node_t *)node2->key, source->source); + _add_event_to_queue(event, queue); + + node2 = avl_get_next(node2); + } + + node = avl_get_next(node); + } + thread_mutex_unlock(&_stats_mutex); +} + +void *stats_connection(void *arg) +{ + stats_connection_t *statcon = (stats_connection_t *)arg; + stats_event_t *local_event_queue = NULL; + mutex_t local_event_mutex; + + avl_node *node; + avl_node *node2; + stats_event_t *event; + stats_source_t *source; + + + thread_mutex_create(&local_event_mutex); + + /* we must get the current stats and register for events atomicly. + ** we don't want to miss any and have inaccurate stats. + */ + thread_mutex_lock(&_stats_mutex); + + /* first we fill our queue with the current stats */ + /* start with the global stats */ + node = avl_get_first(_stats.global_tree); + while (node) { + event = _make_event_from_node((stats_node_t *)node->key, NULL); + _add_event_to_queue(event, &local_event_queue); + + node = avl_get_next(node); + } + + /* now the stats for each source */ + node = avl_get_first(_stats.source_tree); + while (node) { + source = (stats_source_t *)node->key; + node2 = avl_get_first(source->stats_tree); + while (node2) { + event = _make_event_from_node((stats_node_t *)node2->key, source->source); + _add_event_to_queue(event, &local_event_queue); + + node2 = avl_get_next(node2); + } + + node = avl_get_next(node); + } + + /* now we register to receive future event notices */ + _register_listener(&local_event_queue, &local_event_mutex); + + thread_mutex_unlock(&_stats_mutex); + + while (global.running == ICE_RUNNING) { + thread_mutex_lock(&local_event_mutex); + event = _get_event_from_queue(&local_event_queue); + if (event != NULL) { + if (!_send_event_to_client(event, statcon->con)) { + _free_event(event); + thread_mutex_unlock(&local_event_mutex); + break; + } + _free_event(event); + } else { + thread_mutex_unlock(&local_event_mutex); + thread_cond_wait(&_event_signal_cond); + continue; + } + + thread_mutex_unlock(&local_event_mutex); + } + + thread_mutex_destroy(&local_event_mutex); + + thread_exit(0); + + return NULL; +} + +typedef struct _source_xml_tag { + char *mount; + xmlNodePtr node; + + struct _source_xml_tag *next; +} source_xml_t; + +static xmlNodePtr _find_xml_node(char *mount, source_xml_t **list, xmlNodePtr root) +{ + source_xml_t *node, *node2; + int found = 0; + + /* search for existing node */ + node = *list; + while (node) { + if (strcmp(node->mount, mount) == 0) { + found = 1; + break; + } + node = node->next; + } + + if (found) return node->node; + + /* if we didn't find it, we must build it and add it to the list */ + + /* build node */ + node = (source_xml_t *)malloc(sizeof(source_xml_t)); + node->mount = strdup(mount); + node->node = xmlNewChild(root, NULL, "source", NULL); + xmlSetProp(node->node, "mount", mount); + node->next = NULL; + + /* add node */ + if (*list == NULL) { + *list = node; + } else { + node2 = *list; + while (node2->next) node2 = node2->next; + node2->next = node; + } + + return node->node; +} + +void stats_sendxml(client_t *client) +{ + int bytes; + stats_event_t *event; + stats_event_t *queue; + xmlDocPtr doc; + xmlNodePtr node, srcnode; + int len; + char *buff = NULL; + source_xml_t *snd; + source_xml_t *src_nodes = NULL; + + queue = NULL; + _dump_stats_to_queue(&queue); + + doc = xmlNewDoc("1.0"); + node = xmlNewDocNode(doc, NULL, "icestats", NULL); + xmlDocSetRootElement(doc, node); + + + event = _get_event_from_queue(&queue); + while (event) { + if (event->source == NULL) { + xmlNewChild(node, NULL, event->name, event->value); + } else { + srcnode = _find_xml_node(event->source, &src_nodes, node); + xmlNewChild(srcnode, NULL, event->name, event->value); + } + + _free_event(event); + event = _get_event_from_queue(&queue); + } + + xmlDocDumpMemory(doc, (xmlChar **)&buff, &len); + xmlFreeDoc(doc); + + client->respcode = 200; + bytes = sock_write(client->con->sock, "HTTP/1.0 200 OK\r\n" + "Content-Length: %d\r\n" + "Content-Type: text/xml\r\n" + "\r\n", len); + if (bytes > 0) client->con->sent_bytes += bytes; + else goto send_error; + + bytes = sock_write_bytes(client->con->sock, buff, len); + if (bytes > 0) client->con->sent_bytes += bytes; + + send_error: + while (src_nodes) { + snd = src_nodes->next; + free(src_nodes->mount); + free(src_nodes); + src_nodes = snd; + } + if (buff) free(buff); + client_destroy(client); +} + +static int _compare_stats(void *arg, void *a, void *b) +{ + stats_node_t *nodea = (stats_node_t *)a; + stats_node_t *nodeb = (stats_node_t *)b; + + return strcmp(nodea->name, nodeb->name); +} + +static int _compare_source_stats(void *arg, void *a, void *b) +{ + stats_source_t *nodea = (stats_source_t *)a; + stats_source_t *nodeb = (stats_source_t *)b; + + return strcmp(nodea->source, nodeb->source); +} + +static int _free_stats(void *key) +{ + stats_node_t *node = (stats_node_t *)key; + free(node->value); + free(node->name); + free(node); + + return 1; +} + +static int _free_source_stats(void *key) +{ + stats_source_t *node = (stats_source_t *)key; + avl_tree_free(node->stats_tree, _free_stats); + free(node->source); + + return 1; +} + +static void _free_event(stats_event_t *event) +{ + if (event->source) free(event->source); + if (event->name) free(event->name); + if (event->value) free(event->value); + free(event); +} diff --git a/src/stats.h b/src/stats.h new file mode 100644 index 00000000..17a9a7e1 --- /dev/null +++ b/src/stats.h @@ -0,0 +1,75 @@ +#ifndef __STATS_H__ +#define __STATS_H__ + +typedef struct _stats_connection_tag +{ + connection_t *con; + http_parser_t *parser; +} stats_connection_t; + +typedef struct _stats_node_tag +{ + char *name; + char *value; +} stats_node_t; + +typedef struct _stats_event_tag +{ + char *source; + char *name; + char *value; + + struct _stats_event_tag *next; +} stats_event_t; + +typedef struct _stats_source_tag +{ + char *source; + avl_tree *stats_tree; +} stats_source_t; + +typedef struct _stats_tag +{ + avl_tree *global_tree; + + /* global stats + start_time + total_users + max_users + total_sources + max_sources + total_user_connections + total_source_connections + */ + + avl_tree *source_tree; + + /* stats by source, and for stats + start_time + total_users + max_users + */ + +} stats_t; + +void stats_initialize(); +void stats_shutdown(); + +stats_t *stats_get_stats(); + +void stats_event(char *source, char *name, char *value); +void stats_event_args(char *source, char *name, char *format, ...); +void stats_event_inc(char *source, char *name); +void stats_event_add(char *source, char *name, unsigned long value); +void stats_event_dec(char *source, char *name); + +void *stats_connection(void *arg); + +void stats_sendxml(client_t *client); + +#endif /* __STATS_H__ */ + + + + + diff --git a/src/util.c b/src/util.c new file mode 100644 index 00000000..cc72bf24 --- /dev/null +++ b/src/util.c @@ -0,0 +1,58 @@ +#include +#include +#include + +#ifndef _WIN32 +#include +#endif + +#include "sock.h" + +#include "config.h" +#include "util.h" + +int util_read_header(int sock, char *buff, unsigned long len) +{ + fd_set rfds; + int read_bytes, ret; + unsigned long pos; + char c; + struct timeval tv; + ice_config_t *config; + + config = config_get_config(); + + read_bytes = 1; + pos = 0; + ret = 0; + + while ((read_bytes == 1) && (pos < (len - 1))) { + read_bytes = 0; + + FD_ZERO(&rfds); + FD_SET(sock, &rfds); + + tv.tv_sec = config->header_timeout; + tv.tv_usec = 0; + + if (select(sock + 1, &rfds, NULL, NULL, &tv) > 0) { + if ((read_bytes = recv(sock, &c, 1, 0))) { + if (c != '\r') buff[pos++] = c; + if ((pos > 1) && (buff[pos - 1] == '\n' && buff[pos - 2] == '\n')) { + ret = 1; + break; + } + } + } else { + break; + } + } + + if (ret) buff[pos] = '\0'; + + return ret; +} + + + + diff --git a/src/util.h b/src/util.h new file mode 100644 index 00000000..f0f7eb4b --- /dev/null +++ b/src/util.h @@ -0,0 +1,6 @@ +#ifndef __UTIL_H__ +#define __UTIL_H__ + +int util_read_header(int sock, char *buff, unsigned long len); + +#endif /* __UTIL_H__ */