1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-06-16 06:15:24 +00:00

Initial revision

svn path=/trunk/icecast/; revision=1996
This commit is contained in:
Jack Moffitt 2001-09-10 02:21:46 +00:00
commit 61316a25d7
38 changed files with 3638 additions and 0 deletions

1
AUTHORS Normal file
View File

@ -0,0 +1 @@
Jack Moffitt <jack@icecast.org>

1
COPYING Normal file
View File

@ -0,0 +1 @@
<insert GPL here>

16
Makefile.am Normal file
View File

@ -0,0 +1,16 @@
## Process this file with automake to produce Makefile.in
AUTOMAKE_OPTIONS = foreign dist-zip
SUBDIRS = src
EXTRA_DIST = README AUTHORS COPYING
# SCCS Definitions (for BitKeeper)
GET = true
debug:
$(MAKE) all CFLAGS="@DEBUG@"
profile:
$(MAKE) all CFLAGS="@PROFILE@"

1
README Normal file
View File

@ -0,0 +1 @@
This is still experimental.

18
TODO Normal file
View File

@ -0,0 +1,18 @@
BUGS
----
- stats get off?
FEATURES
--------
- pull out vorbis comments. and send to stats.
- directory server GUID checks
directory server does GET /GUID-asldjfasldfjalsdkfjasldkfj HTTP/1.0
and either gets a 404 if it's wrong, or a 200 if it's correct.
- adding new stats type, event. events don't modify the global stats tree,
ie, source /1234.ogg disconnected
- stats.xml, a users requests this, and gets an xml dump of the current stats.

195
acinclude.m4 Normal file
View File

@ -0,0 +1,195 @@
# Configure paths for libogg
# Jack Moffitt <jack@icecast.org> 10-21-2000
# Shamelessly stolen from Owen Taylor and Manish Singh
dnl AM_PATH_OGG([ACTION-IF-FOUND [, ACTION-IF-NOT-FOUND]])
dnl Test for libogg, and define OGG_CFLAGS and OGG_LIBS
dnl
AC_DEFUN(AM_PATH_OGG,
[dnl
dnl Get the cflags and libraries
dnl
AC_ARG_WITH(ogg-prefix,[ --with-ogg-prefix=PFX Prefix where libogg is installed (optional)], ogg_prefix="$withval", ogg_prefix="")
AC_ARG_ENABLE(oggtest, [ --disable-oggtest Do not try to compile and run a test Ogg program],, enable_oggtest=yes)
if test "x$ogg_prefix" != "xNONE" ; then
ogg_args="$ogg_args --prefix=$ogg_prefix"
OGG_CFLAGS="-I$ogg_prefix/include"
OGG_LIBS="-L$ogg_prefix/lib"
elif test "$prefix" != ""; then
ogg_args="$ogg_args --prefix=$prefix"
OGG_CFLAGS="-I$prefix/include"
OGG_LIBS="-L$prefix/lib"
fi
OGG_LIBS="$OGG_LIBS -logg"
AC_MSG_CHECKING(for Ogg)
no_ogg=""
if test "x$enable_oggtest" = "xyes" ; then
ac_save_CFLAGS="$CFLAGS"
ac_save_LIBS="$LIBS"
CFLAGS="$CFLAGS $OGG_CFLAGS"
LIBS="$LIBS $OGG_LIBS"
dnl
dnl Now check if the installed Ogg is sufficiently new.
dnl
rm -f conf.oggtest
AC_TRY_RUN([
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ogg/ogg.h>
int main ()
{
system("touch conf.oggtest");
return 0;
}
],, no_ogg=yes,[echo $ac_n "cross compiling; assumed OK... $ac_c"])
CFLAGS="$ac_save_CFLAGS"
LIBS="$ac_save_LIBS"
fi
if test "x$no_ogg" = "x" ; then
AC_MSG_RESULT(yes)
ifelse([$1], , :, [$1])
else
AC_MSG_RESULT(no)
if test -f conf.oggtest ; then
:
else
echo "*** Could not run Ogg test program, checking why..."
CFLAGS="$CFLAGS $OGG_CFLAGS"
LIBS="$LIBS $OGG_LIBS"
AC_TRY_LINK([
#include <stdio.h>
#include <ogg/ogg.h>
], [ return 0; ],
[ echo "*** The test program compiled, but did not run. This usually means"
echo "*** that the run-time linker is not finding Ogg or finding the wrong"
echo "*** version of Ogg. If it is not finding Ogg, you'll need to set your"
echo "*** LD_LIBRARY_PATH environment variable, or edit /etc/ld.so.conf to point"
echo "*** to the installed location Also, make sure you have run ldconfig if that"
echo "*** is required on your system"
echo "***"
echo "*** If you have an old version installed, it is best to remove it, although"
echo "*** you may also be able to get things to work by modifying LD_LIBRARY_PATH"],
[ echo "*** The test program failed to compile or link. See the file config.log for the"
echo "*** exact error that occured. This usually means Ogg was incorrectly installed"
echo "*** or that you have moved Ogg since it was installed. In the latter case, you"
echo "*** may want to edit the ogg-config script: $OGG_CONFIG" ])
CFLAGS="$ac_save_CFLAGS"
LIBS="$ac_save_LIBS"
fi
OGG_CFLAGS=""
OGG_LIBS=""
ifelse([$2], , :, [$2])
fi
AC_SUBST(OGG_CFLAGS)
AC_SUBST(OGG_LIBS)
rm -f conf.oggtest
])
# Configure paths for libvorbis
# Jack Moffitt <jack@icecast.org> 10-21-2000
# Shamelessly stolen from Owen Taylor and Manish Singh
dnl AM_PATH_VORBIS([ACTION-IF-FOUND [, ACTION-IF-NOT-FOUND]])
dnl Test for libvorbis, and define VORBIS_CFLAGS and VORBIS_LIBS
dnl
AC_DEFUN(AM_PATH_VORBIS,
[dnl
dnl Get the cflags and libraries
dnl
AC_ARG_WITH(vorbis-prefix,[ --with-vorbis-prefix=PFX Prefix where libvorbis is installed (optional)], vorbis_prefix="$withval", vorbis_prefix="")
AC_ARG_ENABLE(vorbistest, [ --disable-vorbistest Do not try to compile and run a test Vorbis program],, enable_vorbistest=yes)
if test "x$vorbis_prefix" != "xNONE" ; then
vorbis_args="$vorbis_args --prefix=$vorbis_prefix"
VORBIS_CFLAGS="-I$vorbis_prefix/include"
VORBIS_LIBDIR="-L$vorbis_prefix/lib"
elif test "$prefix" != ""; then
vorbis_args="$vorbis_args --prefix=$prefix"
VORBIS_CFLAGS="-I$prefix/include"
VORBIS_LIBDIR="-L$prefix/lib"
fi
VORBIS_LIBS="$VORBIS_LIBDIR -lvorbis -lm"
VORBISFILE_LIBS="-lvorbisfile"
VORBISENC_LIBS="-lvorbisenc"
AC_MSG_CHECKING(for Vorbis)
no_vorbis=""
if test "x$enable_vorbistest" = "xyes" ; then
ac_save_CFLAGS="$CFLAGS"
ac_save_LIBS="$LIBS"
CFLAGS="$CFLAGS $VORBIS_CFLAGS"
LIBS="$LIBS $VORBIS_LIBS $OGG_LIBS"
dnl
dnl Now check if the installed Vorbis is sufficiently new.
dnl
rm -f conf.vorbistest
AC_TRY_RUN([
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <vorbis/codec.h>
int main ()
{
system("touch conf.vorbistest");
return 0;
}
],, no_vorbis=yes,[echo $ac_n "cross compiling; assumed OK... $ac_c"])
CFLAGS="$ac_save_CFLAGS"
LIBS="$ac_save_LIBS"
fi
if test "x$no_vorbis" = "x" ; then
AC_MSG_RESULT(yes)
ifelse([$1], , :, [$1])
else
AC_MSG_RESULT(no)
if test -f conf.vorbistest ; then
:
else
echo "*** Could not run Vorbis test program, checking why..."
CFLAGS="$CFLAGS $VORBIS_CFLAGS"
LIBS="$LIBS $VORBIS_LIBS $OGG_LIBS"
AC_TRY_LINK([
#include <stdio.h>
#include <vorbis/codec.h>
], [ return 0; ],
[ echo "*** The test program compiled, but did not run. This usually means"
echo "*** that the run-time linker is not finding Vorbis or finding the wrong"
echo "*** version of Vorbis. If it is not finding Vorbis, you'll need to set your"
echo "*** LD_LIBRARY_PATH environment variable, or edit /etc/ld.so.conf to point"
echo "*** to the installed location Also, make sure you have run ldconfig if that"
echo "*** is required on your system"
echo "***"
echo "*** If you have an old version installed, it is best to remove it, although"
echo "*** you may also be able to get things to work by modifying LD_LIBRARY_PATH"],
[ echo "*** The test program failed to compile or link. See the file config.log for the"
echo "*** exact error that occured. This usually means Vorbis was incorrectly installed"
echo "*** or that you have moved Vorbis since it was installed." ])
CFLAGS="$ac_save_CFLAGS"
LIBS="$ac_save_LIBS"
fi
VORBIS_CFLAGS=""
VORBIS_LIBS=""
VORBISFILE_LIBS=""
VORBISENC_LIBS=""
ifelse([$2], , :, [$2])
fi
AC_SUBST(VORBIS_CFLAGS)
AC_SUBST(VORBIS_LIBS)
AC_SUBST(VORBISFILE_LIBS)
AC_SUBST(VORBISENC_LIBS)
rm -f conf.vorbistest
])

61
autogen.sh Executable file
View File

@ -0,0 +1,61 @@
#!/bin/sh
# Run this to set up the build system: configure, makefiles, etc.
# (based on the version in enlightenment's cvs)
package="icecast"
srcdir=`dirname $0`
test -z "$srcdir" && srcdir=.
cd "$srcdir"
DIE=0
(autoconf --version) < /dev/null > /dev/null 2>&1 || {
echo
echo "You must have autoconf installed to compile $package."
echo "Download the appropriate package for your distribution,"
echo "or get the source tarball at ftp://ftp.gnu.org/pub/gnu/"
DIE=1
}
(automake --version) < /dev/null > /dev/null 2>&1 || {
echo
echo "You must have automake installed to compile $package."
echo "Download the appropriate package for your system,
echo "or get the source from one of the GNU ftp sites"
echo "listed in http://www.gnu.org/order/ftp.html"
DIE=1
}
(libtool --version) < /dev/null > /dev/null 2>&1 || {
echo
echo "You must have libtool installed to compile $package."
echo "Download the appropriate package for your system,
echo "or get the source from one of the GNU ftp sites"
echo "listed in http://www.gnu.org/order/ftp.html"
DIE=1
}
if test "$DIE" -eq 1; then
exit 1
fi
if test -z "$*"; then
echo "I am going to run ./configure with no arguments - if you wish "
echo "to pass any to it, please specify them on the $0 command line."
fi
echo "Generating configuration files for $package, please wait...."
echo " aclocal $ACLOCAL_FLAGS"
aclocal $ACLOCAL_FLAGS
#echo " autoheader"
#autoheader
echo " libtoolize --automake"
libtoolize --automake
echo " automake --add-missing"
automake --add-missing
echo " autoconf"
autoconf
$srcdir/configure "$@" && echo

35
conf/icecast.xml Normal file
View File

@ -0,0 +1,35 @@
<icecast>
<location>Jack's House</location>
<admin>jack@icecast.org</admin>
<limits>
<clients>100</clients>
<sources>2</sources>
<threadpool>5</threadpool>
<client-timeout>15</client-timeout>
</limits>
<source-password>hackme</source-password>
<directory>
<touch-freq>5</touch-freq>
<server>
<host>yp.icecast.org</host>
<touch-freq>15</touch-freq>
</server>
</directory>
<hostname>i.cantcode.com</hostname>
<port>8000</port>
<!--<bind-address>127.0.0.1</bind-address>-->
<paths>
<basedir>/usr/local/icecast</basedir>
<logdir>/tmp</logdir>
</paths>
<logging>
<accesslog>access.log</accesslog>
<errorlog>error.log</errorlog>
</logging>
</icecast>

106
configure.in Normal file
View File

@ -0,0 +1,106 @@
dnl Process this file with autoconf to produce a configure script.
AC_INIT(src/main.c)
AM_INIT_AUTOMAKE(icecast,2.0)
AC_PROG_CC
AC_CANONICAL_HOST
AM_PROG_LIBTOOL
dnl Set some options based on environment
if test -z "$GCC"; then
case $host in
*-*-irix*)
DEBUG="-g -signed"
CFLAGS="-O2 -w -signed"
PROFILE="-p -g3 -O2 -signed"
;;
sparc-sun-solaris*)
DEBUG="-v -g"
CFLAGS="-xO4 -fast -w -fsimple -native -xcg92"
PROFILE="-v -xpg -g -xO4 -fast -native -fsimple -xcg92 -Dsuncc"
;;
*)
DEBUG="-g"
CFLAGS="-O"
PROFILE="-g -p"
;;
esac
else
case $host in
*-*-linux*)
DEBUG="-g -Wall -fsigned-char -D_REENTRANT -D_GNU_SOURCE"
CFLAGS="-O20 -ffast-math -fsigned-char -D_REENTRANT -D_GNU_SOURCE"
PROFILE="-Wall -W -pg -g -O20 -ffast-math -fsigned-char -D_REENTRANT -D_GNU_SOURCE"
;;
sparc-sun-*)
DEBUG="-g -Wall -fsigned-char -mv8"
CFLAGS="-O20 -ffast-math -fsigned-char -mv8"
PROFILE="-pg -g -O20 -fsigned-char -mv8"
;;
*)
DEBUG="-g -Wall -fsigned-char"
CFLAGS="-O20 -fsigned-char"
PROFILE="-O20 -g -pg -fsigned-char"
;;
esac
fi
dnl Checks for programs.
dnl Checks for libraries.
dnl IPV6
AC_SEARCH_LIBS(inet_pton, socket, AC_DEFINE(HAVE_IPV6, 1, [Define if you have IPV6 support]))
AC_SEARCH_LIBS(getipnodebyname, nsl,
AC_DEFINE(HAVE_GETIPNODEBYNAME, 1,
[Define if you have the getipnodebyname function])
)
dnl Checks for header files.
AC_HEADER_STDC
dnl Checks for typedefs, structures, and compiler characteristics.
AC_C_CONST
dnl Check for types
dnl Checks for library functions.
dnl -- configure options --
AC_ARG_WITH(xml-config,
[ --with-xml-config=PATH use xml-config in PATH to find libxml ],
[if ! test -x "$with_xml_config"
then
AC_MSG_ERROR([$with_xml_config cannot be executed])
fi
XMLCONFIG="$with_xml_config"]
)
if test -z "$XMLCONFIG"
then
AC_CHECK_PROGS(XMLCONFIG, [xml2-config xml-config])
fi
if test -n "$XMLCONFIG"
then
LIBS="$LIBS `$XMLCONFIG --libs`"
CPPFLAGS="$CPPFLAGS `$XMLCONFIG --cflags`"
AC_CHECK_FUNC(xmlParseFile,, [AC_MSG_ERROR([There was a problem linking with libxml])])
else
AC_MSG_ERROR([xml-config could not be found])
fi
AM_PATH_OGG(LIBS="$LIBS $OGG_LIBS", AC_MSG_ERROR(must have Ogg installed!))
AM_PATH_VORBIS(LIBS="$LIBS $VORBIS_LIBS", AC_MSG_ERROR(must have Vorbis installed!))
dnl Make substitutions
AC_SUBST(LIBTOOL_DEPS)
AC_SUBST(OPT)
AC_SUBST(LIBS)
AC_SUBST(DEBUG)
AC_SUBST(CFLAGS)
AC_SUBST(PROFILE)
AC_OUTPUT(Makefile src/Makefile src/avl/Makefile src/httpp/Makefile src/thread/Makefile src/log/Makefile src/net/Makefile src/timing/Makefile)

29
src/Makefile.am Normal file
View File

@ -0,0 +1,29 @@
## Process this with automake to create Makefile.in
AUTOMAKE_OPTIONS = foreign
SUBDIRS = avl thread httpp net log timing
bin_PROGRAMS = icecast
noinst_HEADERS = config.h os.h logging.h sighandler.h connection.h global.h\
util.h source.h stats.h refbuf.h client.h format.h format_vorbis.h
icecast_SOURCES = config.c main.c logging.c sighandler.c connection.c global.c\
util.c source.c stats.c refbuf.c client.c format.c format_vorbis.c
icecast_LDADD = net/libicenet.la thread/libicethread.la httpp/libicehttpp.la\
log/libicelog.la avl/libiceavl.la timing/libicetiming.la
LIBS = -lpthread
INCLUDES = -I$(srcdir)/net -I$(srcdir)/thread -I$(srcdir)/avl -I$(srcdir)/httpp \
-I$(srcdir)/log -I$(srcdir)/timing
# SCCS stuff (for BitKeeper)
GET = true
debug:
$(MAKE) all CFLAGS="@DEBUG@"
profile:
$(MAKE) all CFLAGS="@PROFILE@"

1
src/TODO Normal file
View File

@ -0,0 +1 @@
need a shutdown function in case anything else in the code needs to have icecast gracefully shutdown.

46
src/client.c Normal file
View File

@ -0,0 +1,46 @@
/* client.c
**
** client interface implementation
**
*/
#include <stdlib.h>
#include <string.h>
#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "logging.h"
client_t *client_create(connection_t *con, http_parser_t *parser)
{
client_t *client = (client_t *)malloc(sizeof(client_t));
client->con = con;
client->parser = parser;
client->queue = NULL;
client->pos = 0;
return client;
}
void client_destroy(client_t *client)
{
refbuf_t *refbuf;
/* write log entry */
logging_access(client);
connection_close(client->con);
httpp_destroy(client->parser);
while ((refbuf = refbuf_queue_remove(&client->queue)))
refbuf_release(refbuf);
free(client);
}

28
src/client.h Normal file
View File

@ -0,0 +1,28 @@
/* client.h
**
** client data structions and function definitions
**
*/
#ifndef __CLIENT_H__
#define __CLIENT_H__
typedef struct _client_tag
{
/* the clients connection */
connection_t *con;
/* the clients http headers */
http_parser_t *parser;
/* http response code for this client */
int respcode;
/* buffer queue */
refbuf_queue_t *queue;
/* position in first buffer */
unsigned long pos;
} client_t;
client_t *client_create(connection_t *con, http_parser_t *parser);
void client_destroy(client_t *client);
#endif /* __CLIENT_H__ */

296
src/config.c Normal file
View File

@ -0,0 +1,296 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <xmlmemory.h>
#include <parser.h>
#include "config.h"
#define CONFIG_DEFAULT_LOCATION "Earth"
#define CONFIG_DEFAULT_ADMIN "icemaster@localhost"
#define CONFIG_DEFAULT_CLIENT_LIMIT 256
#define CONFIG_DEFAULT_SOURCE_LIMIT 16
#define CONFIG_DEFAULT_THREADPOOL_SIZE 4
#define CONFIG_DEFAULT_CLIENT_TIMEOUT 30
#define CONFIG_DEFAULT_HEADER_TIMEOUT 15
#define CONFIG_DEFAULT_SOURCE_PASSWORD "changeme"
#define CONFIG_DEFAULT_TOUCH_FREQ 5
#define CONFIG_DEFAULT_HOSTNAME "localhost"
#define CONFIG_DEFAULT_PORT 8888
#define CONFIG_DEFAULT_BASE_DIR "/usr/local/icecast"
#define CONFIG_DEFAULT_LOG_DIR "/usr/local/icecast/logs"
#define CONFIG_DEFAULT_ACCESS_LOG "access.log"
#define CONFIG_DEFAULT_ERROR_LOG "error.log"
ice_config_t _configuration;
char *_config_filename;
static void _set_defaults(void);
static void _parse_root(xmlDocPtr doc, xmlNodePtr node);
static void _parse_limits(xmlDocPtr doc, xmlNodePtr node);
static void _parse_directory(xmlDocPtr doc, xmlNodePtr node);
static void _parse_paths(xmlDocPtr doc, xmlNodePtr node);
static void _parse_logging(xmlDocPtr doc, xmlNodePtr node);
static void _add_server(xmlDocPtr doc, xmlNodePtr node);
void config_initialize(void)
{
memset(&_configuration, 0, sizeof(ice_config_t));
_set_defaults();
_config_filename = NULL;
}
void config_shutdown(void)
{
if (_config_filename) free(_config_filename);
if (_configuration.location) free(_configuration.location);
if (_configuration.admin) free(_configuration.admin);
if (_configuration.source_password) free(_configuration.source_password);
if (_configuration.hostname) free(_configuration.hostname);
if (_configuration.base_dir) free(_configuration.base_dir);
if (_configuration.log_dir) free(_configuration.log_dir);
if (_configuration.access_log) free(_configuration.access_log);
if (_configuration.error_log) free(_configuration.error_log);
memset(&_configuration, 0, sizeof(ice_config_t));
}
int config_parse_file(const char *filename)
{
xmlDocPtr doc;
xmlNodePtr node;
if (filename == NULL || strcmp(filename, "") == 0) return CONFIG_EINSANE;
_config_filename = (char *)strdup(filename);
doc = xmlParseFile(_config_filename);
if (doc == NULL) {
return -1;
}
node = xmlDocGetRootElement(doc);
if (node == NULL) {
xmlFreeDoc(doc);
return CONFIG_ENOROOT;
}
if (strcmp(node->name, "icecast") != 0) {
xmlFreeDoc(doc);
return CONFIG_EBADROOT;
}
xmlDocDump(stdout, doc);
_parse_root(doc, node->xmlChildrenNode);
xmlFreeDoc(doc);
return 0;
}
int config_parse_cmdline(int arg, char **argv)
{
return 0;
}
int config_rehash(void)
{
return 0;
}
ice_config_t *config_get_config(void)
{
return &_configuration;
}
static void _set_defaults(void)
{
_configuration.location = (char *)strdup(CONFIG_DEFAULT_LOCATION);
_configuration.admin = (char *)strdup(CONFIG_DEFAULT_ADMIN);
_configuration.client_limit = CONFIG_DEFAULT_CLIENT_LIMIT;
_configuration.source_limit = CONFIG_DEFAULT_SOURCE_LIMIT;
_configuration.threadpool_size = CONFIG_DEFAULT_THREADPOOL_SIZE;
_configuration.client_timeout = CONFIG_DEFAULT_CLIENT_TIMEOUT;
_configuration.header_timeout = CONFIG_DEFAULT_HEADER_TIMEOUT;
_configuration.source_password = (char *)strdup(CONFIG_DEFAULT_SOURCE_PASSWORD);
_configuration.touch_freq = CONFIG_DEFAULT_TOUCH_FREQ;
_configuration.dir_list = NULL;
_configuration.hostname = (char *)strdup(CONFIG_DEFAULT_HOSTNAME);
_configuration.port = CONFIG_DEFAULT_PORT;
_configuration.bind_address = NULL;
_configuration.base_dir = (char *)strdup(CONFIG_DEFAULT_BASE_DIR);
_configuration.log_dir = (char *)strdup(CONFIG_DEFAULT_LOG_DIR);
_configuration.access_log = (char *)strdup(CONFIG_DEFAULT_ACCESS_LOG);
_configuration.error_log = (char *)strdup(CONFIG_DEFAULT_ERROR_LOG);
}
static void _parse_root(xmlDocPtr doc, xmlNodePtr node)
{
char *tmp;
do {
if (node == NULL) break;
if (xmlIsBlankNode(node)) continue;
if (strcmp(node->name, "location") == 0) {
if (_configuration.location) free(_configuration.location);
_configuration.location = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "admin") == 0) {
if (_configuration.admin) free(_configuration.admin);
_configuration.admin = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "source-password") == 0) {
if (_configuration.source_password) free(_configuration.source_password);
_configuration.source_password = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "hostname") == 0) {
if (_configuration.hostname) free(_configuration.hostname);
_configuration.hostname = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "port") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.port = atoi(tmp);
if (tmp) free(tmp);
} else if (strcmp(node->name, "bind-address") == 0) {
if (_configuration.bind_address) free(_configuration.bind_address);
_configuration.bind_address = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "limits") == 0) {
_parse_limits(doc, node->xmlChildrenNode);
} else if (strcmp(node->name, "directory") == 0) {
_parse_directory(doc, node->xmlChildrenNode);
} else if (strcmp(node->name, "paths") == 0) {
_parse_paths(doc, node->xmlChildrenNode);
} else if (strcmp(node->name, "logging") == 0) {
_parse_logging(doc, node->xmlChildrenNode);
}
} while ((node = node->next));
}
static void _parse_limits(xmlDocPtr doc, xmlNodePtr node)
{
char *tmp;
do {
if (node == NULL) break;
if (xmlIsBlankNode(node)) continue;
if (strcmp(node->name, "clients") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.client_limit = atoi(tmp);
if (tmp) free(tmp);
} else if (strcmp(node->name, "sources") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.source_limit = atoi(tmp);
if (tmp) free(tmp);
} else if (strcmp(node->name, "threadpool") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.threadpool_size = atoi(tmp);
if (tmp) free(tmp);
} else if (strcmp(node->name, "client-timeout") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.client_timeout = atoi(tmp);
if (tmp) free(tmp);
} else if (strcmp(node->name, "header-timeout") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.header_timeout = atoi(tmp);
if (tmp) free(tmp);
}
} while ((node = node->next));
}
static void _parse_directory(xmlDocPtr doc, xmlNodePtr node)
{
char *tmp;
do {
if (node == NULL) break;
if (xmlIsBlankNode(node)) continue;
if (strcmp(node->name, "server") == 0) {
_add_server(doc, node->xmlChildrenNode);
} else if (strcmp(node->name, "touch-freq") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.touch_freq = atoi(tmp);
if (tmp) free(tmp);
}
} while ((node = node->next));
}
static void _parse_paths(xmlDocPtr doc, xmlNodePtr node)
{
do {
if (node == NULL) break;
if (xmlIsBlankNode(node)) continue;
if (strcmp(node->name, "basedir") == 0) {
if (_configuration.base_dir) free(_configuration.base_dir);
_configuration.base_dir = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "logdir") == 0) {
if (_configuration.log_dir) free(_configuration.log_dir);
_configuration.log_dir = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
}
} while ((node = node->next));
}
static void _parse_logging(xmlDocPtr doc, xmlNodePtr node)
{
do {
if (node == NULL) break;
if (xmlIsBlankNode(node)) continue;
if (strcmp(node->name, "accesslog") == 0) {
if (_configuration.access_log) free(_configuration.access_log);
_configuration.access_log = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "errorlog") == 0) {
if (_configuration.error_log) free(_configuration.error_log);
_configuration.error_log = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
}
} while ((node = node->next));
}
static void _add_server(xmlDocPtr doc, xmlNodePtr node)
{
ice_config_dir_t *dirnode, *server;
int addnode;
char *tmp;
server = (ice_config_dir_t *)malloc(sizeof(ice_config_dir_t));
server->touch_freq = _configuration.touch_freq;
server->host = NULL;
addnode = 0;
do {
if (node == NULL) break;
if (xmlIsBlankNode(node)) continue;
if (strcmp(node->name, "host") == 0) {
server->host = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
addnode = 1;
} else if (strcmp(node->name, "touch-freq") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
server->touch_freq = atoi(tmp);
if (tmp) free(tmp);
}
server->next = NULL;
} while ((node = node->next));
if (addnode) {
dirnode = _configuration.dir_list;
if (dirnode == NULL) {
_configuration.dir_list = server;
} else {
while (dirnode->next) dirnode = dirnode->next;
dirnode->next = server;
}
server = NULL;
addnode = 0;
}
if (server) {
if (server->host) free(server->host);
free(server);
server = NULL;
}
}

55
src/config.h Normal file
View File

@ -0,0 +1,55 @@
#ifndef __CONFIG_H__
#define __CONFIG_H__
#define CONFIG_EINSANE -1
#define CONFIG_ENOROOT -2
#define CONFIG_EBADROOT -3
typedef struct ice_config_dir_tag
{
char *host;
int touch_freq;
struct ice_config_dir_tag *next;
} ice_config_dir_t;
typedef struct ice_config_tag
{
char *location;
char *admin;
int client_limit;
int source_limit;
int threadpool_size;
int client_timeout;
int header_timeout;
char *source_password;
int touch_freq;
ice_config_dir_t *dir_list;
char *hostname;
int port;
char *bind_address;
char *base_dir;
char *log_dir;
char *access_log;
char *error_log;
} ice_config_t;
void config_initialize(void);
void config_shutdown(void);
int config_parse_file(const char *filename);
int config_parse_cmdline(int arg, char **argv);
int config_rehash(void);
ice_config_t *config_get_config(void);
#endif /* __CONFIG_H__ */

58
src/configtest.c Normal file
View File

@ -0,0 +1,58 @@
#include <stdio.h>
#include "config.h"
void _dump_config(ice_config_t *config);
int main(void)
{
ice_config_t *config;
config_initialize();
config_parse_file("icecast.xml");
config = config_get_config();
_dump_config(config);
config_shutdown();
return 0;
}
void _dump_config(ice_config_t *config)
{
ice_config_dir_t *node;
printf("-----\n");
printf("location = %s\n", config->location);
printf("admin = %s\n", config->admin);
printf("client_limit = %d\n", config->client_limit);
printf("source_limit = %d\n", config->source_limit);
printf("threadpool_size = %d\n", config->threadpool_size);
printf("client_timeout = %d\n", config->client_timeout);
printf("source_password = %s\n", config->source_password);
printf("touch_freq = %d\n", config->touch_freq);
node = config->dir_list;
while (node) {
printf("directory.touch_freq = %d\n", node->touch_freq);
printf("directory.host = %s\n", node->host);
node = node->next;
}
printf("hostname = %s\n", config->hostname);
printf("port = %d\n", config->port);
printf("bind_address = %s\n", config->bind_address);
printf("base_dir = %s\n", config->base_dir);
printf("log_dir = %s\n", config->log_dir);
printf("access_log = %s\n", config->access_log);
printf("error_log = %s\n", config->error_log);
printf("-----\n");
}

505
src/connection.c Normal file
View File

@ -0,0 +1,505 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <time.h>
#include "os.h"
#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"
#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "format.h"
#include "logging.h"
#include "source.h"
#define CATMODULE "connection"
typedef struct con_queue_tag {
connection_t *con;
struct con_queue_tag *next;
} con_queue_t;
typedef struct _thread_queue_tag {
long thread_id;
struct _thread_queue_tag *next;
} thread_queue_t;
static mutex_t _connection_mutex;
static unsigned long _current_id = 0;
static int _initialized = 0;
static cond_t _pool_cond;
static con_queue_t *_queue = NULL;
static mutex_t _queue_mutex;
static thread_queue_t *_conhands = NULL;
static rwlock_t _source_shutdown_rwlock;
static void *_handle_connection(void *arg);
void connection_initialize(void)
{
if (_initialized) return;
thread_mutex_create(&_connection_mutex);
thread_mutex_create(&_queue_mutex);
thread_rwlock_create(&_source_shutdown_rwlock);
thread_cond_create(&_pool_cond);
_initialized = 1;
}
void connection_shutdown(void)
{
if (!_initialized) return;
thread_cond_destroy(&_pool_cond);
thread_rwlock_destroy(&_source_shutdown_rwlock);
thread_mutex_destroy(&_queue_mutex);
thread_mutex_destroy(&_connection_mutex);
_initialized = 0;
}
static connection_t *_create_connection(void)
{
connection_t *con;
con = (connection_t *)malloc(sizeof(connection_t));
memset(con, 0, sizeof(connection_t));
return con;
}
static unsigned long _next_connection_id(void)
{
unsigned long id;
thread_mutex_lock(&_connection_mutex);
id = _current_id++;
thread_mutex_unlock(&_connection_mutex);
return id;
}
static connection_t *_accept_connection(void)
{
int sock;
fd_set rfds;
connection_t *con;
struct timeval tv;
char *ip;
FD_ZERO(&rfds);
FD_SET(global.serversock, &rfds);
tv.tv_sec = 0;
tv.tv_usec = 30000;
if (select(global.serversock + 1, &rfds, NULL, NULL, &tv) <= 0) {
return NULL;
}
/* malloc enough room for 123.123.123.123\0 */
ip = (char *)malloc(16);
sock = sock_accept(global.serversock, ip, 16);
if (sock >= 0) {
con = _create_connection();
con->sock = sock;
con->con_time = time(NULL);
con->id = _next_connection_id();
con->ip = ip;
return con;
}
if (!sock_recoverable(sock_error()))
WARN2("accept() failed with error %d: %s", sock_error(), strerror(sock_error()));
free(ip);
return NULL;
}
static void _add_connection(connection_t *con)
{
con_queue_t *node;
node = (con_queue_t *)malloc(sizeof(con_queue_t));
thread_mutex_lock(&_queue_mutex);
node->con = con;
node->next = _queue;
_queue = node;
thread_mutex_unlock(&_queue_mutex);
printf("connection added....\n");
}
static void _signal_pool(void)
{
thread_cond_signal(&_pool_cond);
}
static void _push_thread(thread_queue_t **queue, long thread_id)
{
/* create item */
thread_queue_t *item = (thread_queue_t *)malloc(sizeof(thread_queue_t));
item->thread_id = thread_id;
item->next = NULL;
thread_mutex_lock(&_queue_mutex);
if (*queue == NULL) {
*queue = item;
} else {
item->next = *queue;
*queue = item;
}
thread_mutex_unlock(&_queue_mutex);
}
static long _pop_thread(thread_queue_t **queue)
{
long id;
thread_queue_t *item;
thread_mutex_lock(&_queue_mutex);
item = *queue;
if (item == NULL) {
thread_mutex_unlock(&_queue_mutex);
return -1;
}
*queue = item->next;
item->next = NULL;
id = item->thread_id;
free(item);
thread_mutex_unlock(&_queue_mutex);
return id;
}
static void _build_pool(void)
{
ice_config_t *config;
int i, tid;
char buff[64];
config = config_get_config();
for (i = 0; i < config->threadpool_size; i++) {
snprintf(buff, 64, "Connection Thread #%d", i);
tid = thread_create(buff, _handle_connection, NULL, THREAD_ATTACHED);
_push_thread(&_conhands, tid);
}
}
static void _destroy_pool(void)
{
long id;
int i;
i = 0;
thread_cond_broadcast(&_pool_cond);
id = _pop_thread(&_conhands);
while (id != -1) {
thread_join(id);
_signal_pool();
id = _pop_thread(&_conhands);
}
}
void connection_accept_loop(void)
{
connection_t *con;
_build_pool();
while (global.running == ICE_RUNNING) {
con = _accept_connection();
if (con) {
_add_connection(con);
_signal_pool();
}
}
_destroy_pool();
/* wait for all the sources to shutdown */
thread_rwlock_wlock(&_source_shutdown_rwlock);
thread_rwlock_unlock(&_source_shutdown_rwlock);
}
static connection_t *_get_connection(void)
{
con_queue_t *node = NULL;
con_queue_t *oldnode = NULL;
connection_t *con = NULL;
thread_mutex_lock(&_queue_mutex);
if (_queue) {
node = _queue;
while (node->next) {
oldnode = node;
node = node->next;
}
/* node is now the last node
** and oldnode is the previous one, or NULL
*/
if (oldnode) oldnode->next = NULL;
else (_queue) = NULL;
}
thread_mutex_unlock(&_queue_mutex);
if (node) {
con = node->con;
free(node);
}
return con;
}
static void *_handle_connection(void *arg)
{
char header[4096];
connection_t *con;
http_parser_t *parser;
source_t *source;
stats_connection_t *stats;
avl_node *node;
http_var_t *var;
client_t *client;
int bytes;
while (global.running == ICE_RUNNING) {
memset(header, 0, 4096);
thread_cond_wait(&_pool_cond);
if (global.running != ICE_RUNNING) break;
/* grab a connection and set the socket to blocking */
con = _get_connection();
stats_event_inc(NULL, "connections");
sock_set_blocking(con->sock, SOCK_BLOCK);
/* fill header with the http header */
if (util_read_header(con->sock, header, 4096) == 0) {
/* either we didn't get a complete header, or we timed out */
connection_close(con);
continue;
}
parser = httpp_create_parser();
httpp_initialize(parser, NULL);
if (httpp_parse(parser, header, strlen(header))) {
/* handle the connection or something */
if (strcmp("ice", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) != 0 && strcmp("http", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) != 0) {
printf("DEBUG: bad protocol\n");
connection_close(con);
httpp_destroy(parser);
continue;
}
if (parser->req_type == httpp_req_source) {
printf("DEBUG: source logging in\n");
stats_event_inc(NULL, "source_connections");
if (strcmp((httpp_getvar(parser, "ice-password") != NULL) ? httpp_getvar(parser, "ice-password") : "", (config_get_config()->source_password != NULL) ? config_get_config()->source_password : "") != 0) {
printf("DEBUG: bad password\n");
INFO1("Source (%s) attempted to login with bad password", httpp_getvar(parser, HTTPP_VAR_URI));
connection_close(con);
httpp_destroy(parser);
continue;
}
global_lock();
if (global.sources >= config_get_config()->source_limit) {
printf("TOO MANY SOURCE, KICKING THIS ONE\n");
INFO1("Source (%d) logged in, but there are too many sources", httpp_getvar(parser, HTTPP_VAR_URI));
connection_close(con);
httpp_destroy(parser);
global_unlock();
continue;
}
global.sources++;
global_unlock();
stats_event_inc(NULL, "sources");
source = source_create(con, parser, httpp_getvar(parser, HTTPP_VAR_URI), FORMAT_TYPE_VORBIS);
source->shutdown_rwlock = &_source_shutdown_rwlock;
sock_set_blocking(con->sock, SOCK_NONBLOCK);
thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED);
continue;
} else if (parser->req_type == httpp_req_stats) {
printf("DEBUG: stats connection...\n");
stats_event_inc(NULL, "stats_connections");
if (strcmp((httpp_getvar(parser, "ice-password") != NULL) ? httpp_getvar(parser, "ice-password") : "", (config_get_config()->source_password != NULL) ? config_get_config()->source_password : "") != 0) {
printf("DEBUG: bad password\n");
connection_close(con);
httpp_destroy(parser);
continue;
}
stats_event_inc(NULL, "stats");
/* create stats connection and create stats handler thread */
stats = (stats_connection_t *)malloc(sizeof(stats_connection_t));
stats->parser = parser;
stats->con = con;
thread_create("Stats Connection", stats_connection, (void *)stats, THREAD_DETACHED);
continue;
} else if (parser->req_type == httpp_req_play || parser->req_type == httpp_req_get) {
printf("DEBUG: client coming in...\n");
/* make a client */
client = client_create(con, parser);
stats_event_inc(NULL, "client_connections");
/* there are several types of HTTP GET clients
** media clients, which are looking for a source (eg, URI = /stream.ogg)
** stats clients, which are looking for /stats.xml
** and director server authorizers, which are looking for /GUID-xxxxxxxx (where xxxxxx is the GUID in question
** we need to handle the latter two before the former, as the latter two
** aren't subject to the limits.
*/
// TODO: add GUID-xxxxxx
if (strcmp(httpp_getvar(parser, HTTPP_VAR_URI), "/stats.xml") == 0) {
printf("sending stats.xml\n");
stats_sendxml(client);
continue;
}
global_lock();
if (global.clients >= config_get_config()->client_limit) {
if (parser->req_type == httpp_req_get) {
client->respcode = 504;
bytes = sock_write(client->con->sock, "HTTP/1.0 504 Server Full\r\nContent-Type: text/html\r\n\r\n"\
"<b>The server is already full. Try again later.</b>\r\n");
if (bytes > 0) client->con->sent_bytes = bytes;
}
client_destroy(client);
global_unlock();
continue;
}
global_unlock();
avl_tree_rlock(global.source_tree);
source = source_find_mount(httpp_getvar(parser, HTTPP_VAR_URI));
if (source) {
printf("DEBUG: source found for client\n");
global_lock();
if (global.clients >= config_get_config()->client_limit) {
if (parser->req_type == httpp_req_get) {
client->respcode = 504;
bytes = sock_write(client->con->sock, "HTTP/1.0 504 Server Full\r\nContent-Type: text/html\r\n\r\n"\
"<b>The server is already full. Try again later.</b>\r\n");
if (bytes > 0) client->con->sent_bytes = bytes;
}
client_destroy(client);
global_unlock();
continue;
}
global.clients++;
global_unlock();
if (parser->req_type == httpp_req_get) {
client->respcode = 200;
sock_write(client->con->sock, "HTTP/1.0 200 OK\r\nContent-Type: application/x-ogg\r\n");
/* iterate through source http headers and send to client */
avl_tree_rlock(source->parser->vars);
node = avl_get_first(source->parser->vars);
while (node) {
var = (http_var_t *)node->key;
if (strcasecmp(var->name, "ice-password") && !strncasecmp("ice-", var->name, 4)) {
printf("DEBUG: sending %s: %s\n", var->name, var->value);
sock_write(client->con->sock, "%s: %s\r\n", var->name, var->value);
}
node = avl_get_next(node);
}
avl_tree_unlock(source->parser->vars);
sock_write(client->con->sock, "\r\n");
sock_set_blocking(client->con->sock, SOCK_NONBLOCK);
}
avl_tree_wlock(source->pending_tree);
avl_insert(source->pending_tree, (void *)client);
avl_tree_unlock(source->pending_tree);
}
avl_tree_unlock(global.source_tree);
if (!source) {
printf("DEBUG: source not found for client\n");
if (parser->req_type == httpp_req_get) {
client->respcode = 404;
bytes = sock_write(client->con->sock, "HTTP/1.0 404 Source Not Found\r\nContent-Type: text/html\r\n\r\n"\
"<b>The source you requested could not be found.</b>\r\n");
if (bytes > 0) client->con->sent_bytes = bytes;
}
client_destroy(client);
}
continue;
} else {
printf("DEBUG: wrong request type\n");
connection_close(con);
httpp_destroy(parser);
continue;
}
} else {
printf("DEBUG: parsing failed\n");
connection_close(con);
httpp_destroy(parser);
continue;
}
}
thread_exit(0);
return NULL;
}
void connection_close(connection_t *con)
{
sock_close(con->sock);
if (con->ip) free(con->ip);
if (con->host) free(con->host);
free(con);
}

23
src/connection.h Normal file
View File

@ -0,0 +1,23 @@
#ifndef __CONNECTION_H__
#define __CONNECTION_H__
typedef struct connection_tag
{
unsigned long id;
time_t con_time;
long long sent_bytes;
int sock;
int error;
char *ip;
char *host;
} connection_t;
void connection_initialize(void);
void connection_shutdown(void);
void connection_accept_loop(void);
void connection_close(connection_t *con);
#endif /* __CONNECTION_H__ */

31
src/format.c Normal file
View File

@ -0,0 +1,31 @@
/* format.c
**
** format plugin implementation
**
*/
#include <stdlib.h>
#include <string.h>
#include "connection.h"
#include "refbuf.h"
#include "format.h"
#include "format_vorbis.h"
format_plugin_t *format_get_plugin(format_type_t type)
{
format_plugin_t *plugin;
switch (type) {
case FORMAT_TYPE_VORBIS:
plugin = format_vorbis_get_plugin();
break;
default:
plugin = NULL;
break;
}
return plugin;
}

40
src/format.h Normal file
View File

@ -0,0 +1,40 @@
/* format.h
**
** format plugin header
**
*/
#ifndef __FORMAT_H__
#define __FORMAT_H__
typedef enum _format_type_tag
{
FORMAT_TYPE_VORBIS,
FORMAT_TYPE_MP3
} format_type_t;
typedef struct _format_plugin_tag
{
format_type_t type;
/* set this is the data format has a header that
** we must send before regular data
*/
int has_predata;
refbuf_t *(*get_buffer)(struct _format_plugin_tag *self, char *data, unsigned long len);
refbuf_queue_t *(*get_predata)(struct _format_plugin_tag *self);
/* for internal state management */
void *_state;
} format_plugin_t;
format_plugin_t *format_get_plugin(format_type_t type);
#endif /* __FORMAT_H__ */

120
src/format_vorbis.c Normal file
View File

@ -0,0 +1,120 @@
/* format_vorbis.c
**
** format plugin for vorbis
**
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ogg/ogg.h>
#include <vorbis/codec.h>
#include "refbuf.h"
#include "format.h"
typedef struct _vstate_tag
{
ogg_sync_state oy;
ogg_page og;
unsigned long serialno;
int header;
refbuf_t *headbuf[10];
} vstate_t;
refbuf_t *format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long len);
refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self);
format_plugin_t *format_vorbis_get_plugin(void)
{
format_plugin_t *plugin;
vstate_t *state;
plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t));
plugin->type = FORMAT_TYPE_VORBIS;
plugin->has_predata = 1;
plugin->get_buffer = format_vorbis_get_buffer;
plugin->get_predata = format_vorbis_get_predata;
state = (vstate_t *)calloc(1, sizeof(vstate_t));
ogg_sync_init(&state->oy);
plugin->_state = (void *)state;
return plugin;
}
refbuf_t *format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long len)
{
char *buffer;
refbuf_t *refbuf;
int i;
vstate_t *state = (vstate_t *)self->_state;
if (data) {
/* write the data to the buffer */
buffer = ogg_sync_buffer(&state->oy, len);
memcpy(buffer, data, len);
ogg_sync_wrote(&state->oy, len);
}
refbuf = NULL;
if (ogg_sync_pageout(&state->oy, &state->og) == 1) {
refbuf = refbuf_new(state->og.header_len + state->og.body_len);
memcpy(refbuf->data, state->og.header, state->og.header_len);
memcpy(&refbuf->data[state->og.header_len], state->og.body, state->og.body_len);
if (state->serialno != ogg_page_serialno(&state->og)) {
/* this is a new logical bitstream */
state->header = 0;
for (i = 0; i < 10; i++) {
if (state->headbuf[i]) {
refbuf_release(state->headbuf[i]);
state->headbuf[i] = NULL;
}
}
state->serialno = ogg_page_serialno(&state->og);
}
if (state->header >= 0) {
if (ogg_page_granulepos(&state->og) == 0) {
state->header++;
} else {
state->header = 0;
}
}
/* cache first three pages */
if (state->header) {
refbuf_addref(refbuf);
state->headbuf[state->header - 1] = refbuf;
}
}
return refbuf;
}
refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self)
{
refbuf_queue_t *queue;
int i;
vstate_t *state = (vstate_t *)self->_state;
queue = NULL;
for (i = 0; i < 10; i++) {
if (state->headbuf[i]) {
refbuf_addref(state->headbuf[i]);
refbuf_queue_add(&queue, state->headbuf[i]);
} else {
break;
}
}
return queue;
}

11
src/format_vorbis.h Normal file
View File

@ -0,0 +1,11 @@
/* format_vorbis.h
**
** vorbis format plugin header
**
*/
#ifndef __FORMAT_VORBIS_H__
#define __FORMAT_VORBIS_H__
format_plugin_t *format_vorbis_get_plugin(void);
#endif /* __FORMAT_VORBIS_H__ */

40
src/global.c Normal file
View File

@ -0,0 +1,40 @@
#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "connection.h"
#include "refbuf.h"
#include "format.h"
#include "source.h"
#include "global.h"
ice_global_t global;
static mutex_t _global_mutex;
void global_initialize(void)
{
global.serversock = 0;
global.running = 0;
global.clients = 0;
global.sources = 0;
global.source_tree = avl_tree_new(source_compare_sources, NULL);
thread_mutex_create(&_global_mutex);
}
void global_shutdown(void)
{
thread_mutex_destroy(&_global_mutex);
avl_tree_free(global.source_tree, source_free_source);
}
void global_lock(void)
{
thread_mutex_lock(&_global_mutex);
}
void global_unlock(void)
{
thread_mutex_unlock(&_global_mutex);
}

28
src/global.h Normal file
View File

@ -0,0 +1,28 @@
#ifndef __GLOBAL_H__
#define __GLOBAL_H__
#define ICE_LISTEN_QUEUE 5
#define ICE_RUNNING 1
#define ICE_HALTING 2
typedef struct ice_global_tag
{
int serversock;
int running;
int sources;
int clients;
avl_tree *source_tree;
} ice_global_t;
extern ice_global_t global;
void global_initialize(void);
void global_shutdown(void);
void global_lock(void);
void global_unlock(void);
#endif /* __GLOBAL_H__ */

71
src/logging.c Normal file
View File

@ -0,0 +1,71 @@
#include <stdio.h>
#include <time.h>
#include "thread.h"
#include "httpp.h"
#include "log.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "logging.h"
/* the global log descriptors */
int errorlog;
int accesslog;
/*
** ADDR USER AUTH DATE REQUEST CODE BYTES REFERER AGENT [TIME]
**
** ADDR = client->con->ip
** USER = -
** we should do this for real once we support authentication
** AUTH = -
** DATE = _make_date(client->con->con_time)
** REQUEST = build from client->parser
** CODE = client->respcode
** BYTES = client->con->sent_bytes
** REFERER = get from client->parser
** AGENT = get from client->parser
** TIME = timing_get_time() - client->con->con_time
*/
void logging_access(client_t *client)
{
char datebuf[128];
char reqbuf[1024];
struct tm *thetime;
time_t now;
time_t stayed;
now = time(NULL);
/* build the data */
/* TODO: localtime is not threadsafe on all platforms
** we should probably use localtime_r if it's available
*/
PROTECT_CODE(thetime = localtime(&now); strftime(datebuf, 128, LOGGING_FORMAT_CLF, thetime))
/* build the request */
snprintf(reqbuf, 1024, "%s %s %s/%s", httpp_getvar(client->parser, HTTPP_VAR_REQ_TYPE), httpp_getvar(client->parser, HTTPP_VAR_URI),
httpp_getvar(client->parser, HTTPP_VAR_PROTOCOL), httpp_getvar(client->parser, HTTPP_VAR_VERSION));
stayed = now - client->con->con_time;
log_write_direct(accesslog, "%s - - [%s] \"%s\" %d %lld \"%s\" \"%s\" %d",
client->con->ip,
datebuf,
reqbuf,
client->respcode,
client->con->sent_bytes,
(httpp_getvar(client->parser, "referer") != NULL) ? httpp_getvar(client->parser, "referer") : "-",
(httpp_getvar(client->parser, "user-agent") != NULL) ? httpp_getvar(client->parser, "user-agent") : "-",
(int)stayed);
}

80
src/logging.h Normal file
View File

@ -0,0 +1,80 @@
#ifndef __LOGGING_H__
#define __LOGGING_H__
/* declare the global log descriptors */
extern int errorlog;
extern int accesslog;
/* these are all ERRORx and WARNx where _x_ is the number of parameters
** it takes. it turns out most other copmilers don't have support for
** varargs macros. that totally sucks, but this is still pretty easy.
**
** feel free to add more here if needed.
*/
#define ERROR0(y) log_write(errorlog, 1, CATMODULE "/" __FUNCTION__, y)
#define ERROR1(y, a) log_write(errorlog, 1, CATMODULE "/" __FUNCTION__, y, a)
#define ERROR2(y, a, b) log_write(errorlog, 1, CATMODULE "/" __FUNCTION__, y, a, b)
#define ERROR3(y, a, b, c) log_write(errorlog, 1, CATMODULE "/" __FUNCTION__, y, a, b, c)
#define WARN0(y) log_write(errorlog, 2, CATMODULE "/" __FUNCTION__, y)
#define WARN1(y, a) log_write(errorlog, 2, CATMODULE "/" __FUNCTION__, y, a)
#define WARN2(y, a, b) log_write(errorlog, 2, CATMODULE "/" __FUNCTION__, y, a, b)
#define WARN3(y, a, b, c) log_write(errorlog, 2, CATMODULE "/" __FUNCTION__, y, a, b, c)
#define INFO0(y) log_write(errorlog, 3, CATMODULE "/" __FUNCTION__, y)
#define INFO1(y, a) log_write(errorlog, 3, CATMODULE "/" __FUNCTION__, y, a)
#define INFO2(y, a, b) log_write(errorlog, 3, CATMODULE "/" __FUNCTION__, y, a, b)
#define INFO3(y, a, b, c) log_write(errorlog, 3, CATMODULE "/" __FUNCTION__, y, a, b, c)
#define DEBUG0(y) log_write(errorlog, 4, CATMODULE "/" __FUNCTION__, y)
#define DEBUG1(y, a) log_write(errorlog, 4, CATMODULE "/" __FUNCTION__, y, a)
#define DEBUG2(y, a, b) log_write(errorlog, 4, CATMODULE "/" __FUNCTION__, y, a, b)
#define DEBUG3(y, a, b, c) log_write(errorlog, 4, CATMODULE "/" __FUNCTION__, y, a, b, c)
/* CATMODULE is the category or module that logging messages come from.
** we set one here in cause someone forgets in the .c file.
*/
/*#define CATMODULE "unknown"
*/
/* this is the logging call to write entries to the access_log
** the combined log format is:
** ADDR USER AUTH DATE REQUEST CODE BYTES REFERER AGENT [TIME]
** ADDR = ip address of client
** USER = username if authenticated
** AUTH = auth type, not used, and set to "-"
** DATE = date in "[30/Apr/2001:01:25:34 -0700]" format
** REQUEST = request, ie "GET /live.ogg HTTP/1.0"
** CODE = response code, ie, 200 or 404
** BYTES = total bytes of data sent (other than headers)
** REFERER = the refering URL
** AGENT = the user agent
**
** for icecast, we add on extra field at the end, which will be
** ignored by normal log parsers
**
** TIME = seconds that the connection lasted
**
** this allows you to get bitrates (BYTES / TIME)
** and figure out exact times of connections
**
** it should be noted also that events are sent on client disconnect,
** so the DATE is the timestamp of disconnection. DATE - TIME is the
** time of connection.
*/
#define LOGGING_FORMAT_CLF "%d/%b/%Y:%H:%M:%S %z"
void logging_access(client_t *client);
#endif /* __LOGGING_H__ */

233
src/main.c Normal file
View File

@ -0,0 +1,233 @@
#include <stdio.h>
#include <string.h>
#include "thread.h"
#include "avl.h"
#include "log.h"
#include "sock.h"
#include "resolver.h"
#include "httpp.h"
#include "config.h"
#include "sighandler.h"
#include "global.h"
#include "os.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "logging.h"
#undef CATMODULE
#define CATMODULE "main"
void _print_usage()
{
printf("icecast 2.0 usage:\n");
printf("\t-c <file>\t\tSpecify configuration file\n");
printf("\n");
}
void _initialize_subsystems(void)
{
log_initialize();
thread_initialize();
sock_initialize();
resolver_initialize();
config_initialize();
connection_initialize();
global_initialize();
stats_initialize();
refbuf_initialize();
}
void _shutdown_subsystems(void)
{
refbuf_shutdown();
stats_shutdown();
global_shutdown();
connection_shutdown();
config_shutdown();
resolver_shutdown();
sock_shutdown();
thread_shutdown();
log_shutdown();
}
int _parse_config_file(int argc, char **argv, char *filename, int size)
{
int i = 1;
if (argc < 3) return 0;
while (i < argc) {
if (strcmp(argv[i], "-c") == 0) {
if (i + 1 < argc) {
strncpy(filename, argv[i + 1], size);
return 1;
} else {
return -1;
}
}
i++;
}
return 0;
}
int _start_logging(void)
{
char fn_error[FILENAME_MAX];
char fn_access[FILENAME_MAX];
ice_config_t *config = config_get_config();
snprintf(fn_error, FILENAME_MAX, "%s%s%s", config->log_dir, PATH_SEPARATOR, config->error_log);
snprintf(fn_access, FILENAME_MAX, "%s%s%s", config->log_dir, PATH_SEPARATOR, config->access_log);
errorlog = log_open(fn_error);
accesslog = log_open(fn_access);
log_set_level(errorlog, 4);
log_set_level(accesslog, 4);
if (errorlog < 0)
fprintf(stderr, "FATAL: could not open %s for error logging\n", fn_error);
if (accesslog < 0)
fprintf(stderr, "FATAL: could not open %s for access logging\n", fn_access);
if (errorlog >= 0 && accesslog >= 0) return 1;
return 0;
}
void _stop_logging(void)
{
log_close(errorlog);
log_close(accesslog);
}
int _setup_socket(void)
{
ice_config_t *config;
config = config_get_config();
global.serversock = sock_get_server_socket(config->port, config->bind_address);
if (global.serversock == SOCK_ERROR)
return 0;
return 1;
}
int _start_listening(void)
{
if (sock_listen(global.serversock, ICE_LISTEN_QUEUE) == SOCK_ERROR)
return 0;
sock_set_blocking(global.serversock, SOCK_NONBLOCK);
return 1;
}
/* this is the heart of the beast */
void _server_proc(void)
{
if (!_setup_socket()) {
ERROR1("Could not create listener socket on port %d", config_get_config()->port);
return;
}
if (!_start_listening()) {
ERROR0("Failed trying to listen on server socket");
return;
}
connection_accept_loop();
sock_close(global.serversock);
}
int main(int argc, char **argv)
{
int res, ret;
char filename[256];
/* startup all the modules */
_initialize_subsystems();
/* setup the default signal handlers */
sighandler_initialize();
/* parse the '-c icecast.xml' option
** only, so that we can read a configfile
*/
res = _parse_config_file(argc, argv, filename, 256);
if (res == 1) {
/* parse the config file */
ret = config_parse_file(filename);
if (ret < 0) {
fprintf(stderr, "FATAL: error parsing config file:");
switch (ret) {
case CONFIG_EINSANE:
fprintf(stderr, "filename was null or blank\n");
break;
case CONFIG_ENOROOT:
fprintf(stderr, "no root element found\n");
break;
case CONFIG_EBADROOT:
fprintf(stderr, "root element is not <icecast>\n");
break;
default:
fprintf(stderr, "parse error\n");
break;
}
}
} else if (res == -1) {
fprintf(stderr, "FATAL: -c option must have a filename\n");
_print_usage();
_shutdown_subsystems();
return 1;
}
/* override config file options with commandline options */
config_parse_cmdline(argc, argv);
if (!_start_logging()) {
fprintf(stderr, "FATAL: Could not start logging\n");
_shutdown_subsystems();
return 1;
}
INFO0("icecast server started");
/* REM 3D Graphics */
/* let her rip */
global.running = ICE_RUNNING;
_server_proc();
INFO0("Shutting down");
_stop_logging();
_shutdown_subsystems();
return 0;
}

16
src/os.h Normal file
View File

@ -0,0 +1,16 @@
#ifndef __OS_H__
#define __OS_H__
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
#ifdef _WIN32
#define PATH_DEPARATOR "\\"
#else
#define PATH_SEPARATOR "/"
#endif
#endif /* __GLOBALS_H__ */

115
src/refbuf.c Normal file
View File

@ -0,0 +1,115 @@
/* refbuf.c
**
** reference counting buffer implementation
**
*/
#include <stdlib.h>
#include <string.h>
#include "thread.h"
#include "refbuf.h"
mutex_t _refbuf_mutex;
void refbuf_initialize(void)
{
thread_mutex_create(&_refbuf_mutex);
}
void refbuf_shutdown(void)
{
thread_mutex_destroy(&_refbuf_mutex);
}
refbuf_t *refbuf_new(unsigned long size)
{
refbuf_t *refbuf;
refbuf = (refbuf_t *)malloc(sizeof(refbuf_t));
refbuf->data = (void *)malloc(size);
refbuf->len = size;
refbuf->_count = 1;
return refbuf;
}
void refbuf_addref(refbuf_t *self)
{
thread_mutex_lock(&_refbuf_mutex);
self->_count++;
thread_mutex_unlock(&_refbuf_mutex);
}
void refbuf_release(refbuf_t *self)
{
thread_mutex_lock(&_refbuf_mutex);
self->_count--;
if (self->_count == 0) {
free(self->data);
free(self);
}
thread_mutex_unlock(&_refbuf_mutex);
}
void refbuf_queue_add(refbuf_queue_t **queue, refbuf_t *refbuf)
{
refbuf_queue_t *node;
refbuf_queue_t *item = (refbuf_queue_t *)malloc(sizeof(refbuf_queue_t));
item->refbuf = refbuf;
item->next = NULL;
if (*queue == NULL) {
*queue = item;
} else {
node = *queue;
while (node->next) node = node->next;
node->next = item;
}
}
refbuf_t *refbuf_queue_remove(refbuf_queue_t **queue)
{
refbuf_queue_t *item;
refbuf_t *refbuf;
if (*queue == NULL) return NULL;
item = *queue;
*queue = item->next;
item->next = NULL;
refbuf = item->refbuf;
item->refbuf = NULL;
free(item);
return refbuf;
}
void refbuf_queue_insert(refbuf_queue_t **queue, refbuf_t *refbuf)
{
refbuf_queue_t *item = (refbuf_queue_t *)malloc(sizeof(refbuf_queue_t));
item->refbuf = refbuf;
item->next = *queue;
*queue = item;
}
int refbuf_queue_size(refbuf_queue_t **queue)
{
refbuf_queue_t *node = *queue;
int size = 0;
while (node) {
node = node->next;
size++;
}
return size;
}

44
src/refbuf.h Normal file
View File

@ -0,0 +1,44 @@
/* refbuf.h
**
** reference counting data buffer
**
*/
#ifndef __REFBUF_H__
#define __REFBUF_H__
typedef struct _refbuf_tag
{
char *data;
long len;
unsigned long _count;
} refbuf_t;
typedef struct _refbuf_queue_tag
{
refbuf_t *refbuf;
struct _refbuf_queue_tag *next;
} refbuf_queue_t;
void refbuf_initialize(void);
void refbuf_shutdown(void);
refbuf_t *refbuf_new(unsigned long size);
void refbuf_addref(refbuf_t *self);
void refbuf_release(refbuf_t *self);
void refbuf_queue_add(refbuf_queue_t **queue, refbuf_t *refbuf);
refbuf_t *refbuf_queue_remove(refbuf_queue_t **queue);
void refbuf_queue_insert(refbuf_queue_t **queue, refbuf_t *refbuf);
int refbuf_queue_size(refbuf_queue_t **queue);
#endif /* __REFBUF_H__ */

57
src/sighandler.c Normal file
View File

@ -0,0 +1,57 @@
#include <signal.h>
#include "thread.h"
#include "avl.h"
#include "log.h"
#include "httpp.h"
#include "global.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "logging.h"
#include "sighandler.h"
#define CATMODULE "sighandler"
#ifndef _WIN32
void _sig_hup(int signo);
void _sig_die(int signo);
#endif
void sighandler_initialize(void)
{
#ifndef _WIN32
signal(SIGHUP, _sig_hup);
signal(SIGINT, _sig_die);
signal(SIGTERM, _sig_die);
signal(SIGPIPE, SIG_IGN);
#endif
}
#ifndef _WIN32
void _sig_hup(int signo)
{
INFO1("Caught signal %d, rehashing config and reopening logfiles...", signo);
/* reread config file */
/* reopen logfiles */
#ifdef __linux__
/* linux requires us to reattach the signal handler */
signal(SIGHUP, _sig_hup);
#endif
}
void _sig_die(int signo)
{
INFO1("Caught signal %d, shutting down...", signo);
/* inform the server to start shutting down */
global.running = ICE_HALTING;
}
#endif

8
src/sighandler.h Normal file
View File

@ -0,0 +1,8 @@
#ifndef __SIGHANDLER_H__
#define __SIGHANDLER_H__
void sighandler_initialize(void);
#endif /* __SIGHANDLER_H__ */

364
src/source.c Normal file
View File

@ -0,0 +1,364 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <ogg/ogg.h>
#include "thread.h"
#include "avl.h"
#include "httpp.h"
#include "sock.h"
#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "format.h"
#include "source.h"
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _remove_client(void *key);
static int _free_client(void *key);
source_t *source_create(connection_t *con, http_parser_t *parser, const char *mount, format_type_t type)
{
source_t *src;
src = (source_t *)malloc(sizeof(source_t));
src->format = format_get_plugin(type);
src->con = con;
src->parser = parser;
src->mount = (char *)strdup(mount);
src->client_tree = avl_tree_new(_compare_clients, NULL);
src->pending_tree = avl_tree_new(_compare_clients, NULL);
return src;
}
/* you must already have a read lock on the global source tree
** to call this function
*/
source_t *source_find_mount(const char *mount)
{
source_t *source;
avl_node *node;
int cmp;
/* get the root node */
node = global.source_tree->root->right;
while (node) {
source = (source_t *)node->key;
cmp = strcmp(mount, source->mount);
if (cmp < 0)
node = node->left;
else if (cmp > 0)
node = node->right;
else
return source;
}
/* didn't find it */
return NULL;
}
int source_compare_sources(void *arg, void *a, void *b)
{
source_t *srca = (source_t *)a;
source_t *srcb = (source_t *)b;
return strcmp(srca->mount, srcb->mount);
}
int source_free_source(void *key)
{
source_t *source = (source_t *)key;
free(source->mount);
connection_close(source->con);
httpp_destroy(source->parser);
avl_tree_free(source->pending_tree, _free_client);
avl_tree_free(source->client_tree, _free_client);
free(source);
return 1;
}
void *source_main(void *arg)
{
source_t *source = (source_t *)arg;
char buffer[4096];
long bytes, sbytes;
client_t *client;
avl_node *client_node;
refbuf_t *refbuf, *abuf;
int data_done;
fd_set rfds;
struct timeval tv;
int listeners = 0;
/* grab a read lock, to make sure we get a chance to cleanup */
thread_rwlock_rlock(source->shutdown_rwlock);
/* get a write lock on the global source tree */
avl_tree_wlock(global.source_tree);
/* insert source onto source tree */
avl_insert(global.source_tree, (void *)source);
/* release write lock on global source tree */
avl_tree_unlock(global.source_tree);
/* start off the statistics */
stats_event(source->mount, "listeners", "0");
while (global.running == ICE_RUNNING) {
refbuf = source->format->get_buffer(source->format, NULL, 0);
while (refbuf == NULL) {
bytes = 0;
while (bytes <= 0) {
FD_ZERO(&rfds);
FD_SET(source->con->sock, &rfds);
tv.tv_sec = 0;
tv.tv_usec = 30000;
select(source->con->sock + 1, &rfds, NULL, NULL, &tv);
bytes = sock_read_bytes(source->con->sock, buffer, 4096);
if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) break;
}
if (bytes <= 0) break;
refbuf = source->format->get_buffer(source->format, buffer, bytes);
}
if (bytes <= 0) {
printf("DEBUG: got 0 bytes reading data, the source must have disconnected...\n");
break;
}
/* we have a refbuf buffer, which a data block to be sent to
** all clients. if a client is not able to send the buffer
** immediately, it should store it on it's queue for the next
** go around.
**
** instead of sending the current block, a client should send
** all data in the cue, plus the current block, until either
** it runs out of data, or it hits a recoverable error like
** EAGAIN. this will allow a client that got slightly lagged
** to catch back up if it can
*/
/* acquire read lock on client_tree */
avl_tree_rlock(source->client_tree);
client_node = avl_get_first(source->client_tree);
while (client_node) {
/* acquire read lock on node */
avl_node_wlock(client_node);
client = (client_t *)client_node->key;
data_done = 0;
/* do we have any old buffers? */
abuf = refbuf_queue_remove(&client->queue);
while (abuf) {
if (client->pos > 0)
bytes = abuf->len - client->pos;
else
bytes = abuf->len;
sbytes = sock_write_bytes(client->con->sock, &abuf->data[client->pos], bytes);
if (sbytes > 0) client->con->sent_bytes += sbytes;
if (sbytes < bytes) {
if (!sock_recoverable(sock_error())) {
printf("SOURCE: Client had unrecoverable error catching up (%ld/%ld)\n", sbytes, bytes);
client->con->error = 1;
} else {
printf("SOURCE: client had recoverable error...\n");
client->pos += sbytes;
/* put the refbuf back on top of the queue, since we didn't finish with it */
refbuf_queue_insert(&client->queue, abuf);
}
data_done = 1;
break;
}
/* we're done with that refbuf, release it and reset the pos */
refbuf_release(abuf);
client->pos = 0;
abuf = refbuf_queue_remove(&client->queue);
}
/* now send or queue the new data */
if (data_done) {
refbuf_addref(refbuf);
refbuf_queue_add(&client->queue, refbuf);
} else {
sbytes = sock_write_bytes(client->con->sock, refbuf->data, refbuf->len);
if (sbytes > 0) client->con->sent_bytes += sbytes;
if (sbytes < refbuf->len) {
bytes = sock_error();
if (!sock_recoverable(bytes)) {
printf("SOURCE: client had unrecoverable error %ld with new data (%ld/%ld)\n", bytes, sbytes, refbuf->len);
client->con->error = 1;
} else {
printf("SOURCE: recoverable error %ld\n", bytes);
client->pos = sbytes;
refbuf_addref(refbuf);
refbuf_queue_insert(&client->queue, refbuf);
}
}
}
/* if the client is too slow, its queue will slowly build up.
** we need to make sure the client is keeping up with the
** data, so we'll kick any client who's queue gets to large.
** the queue_limit might need to be tuned, but should work fine.
** TODO: put queue_limit in a config file
*/
if (refbuf_queue_size(&client->queue) > 25) {
printf("SOURCE: client is too lagged... kicking\n");
client->con->error = 1;
}
/* release read lock on node */
avl_node_unlock(client_node);
/* get the next node */
client_node = avl_get_next(client_node);
}
/* release read lock on client_tree */
avl_tree_unlock(source->client_tree);
refbuf_release(refbuf);
/* acquire write lock on client_tree */
avl_tree_wlock(source->client_tree);
/** delete bad clients **/
client_node = avl_get_first(source->client_tree);
while (client_node) {
client = (client_t *)client_node->key;
if (client->con->error) {
client_node = avl_get_next(client_node);
avl_delete(source->client_tree, (void *)client, _free_client);
listeners--;
global_lock();
global.clients--;
global_unlock();
stats_event_dec(NULL, "clients");
stats_event_args(source->mount, "listeners", "%d", listeners);
printf("DEBUG: Client dropped...\n");
continue;
}
client_node = avl_get_next(client_node);
}
/* acquire write lock on pending_tree */
avl_tree_wlock(source->pending_tree);
/** add pending clients **/
client_node = avl_get_first(source->pending_tree);
while (client_node) {
avl_insert(source->client_tree, client_node->key);
listeners++;
printf("Client added\n");
stats_event_inc(NULL, "clients");
stats_event_inc(source->mount, "connections");
stats_event_args(source->mount, "listeners", "%d", listeners);
/* we have to send cached headers for some data formats
** this is where we queue up the buffers to send
*/
if (source->format->has_predata) {
client = (client_t *)client_node->key;
client->queue = source->format->get_predata(source->format);
}
client_node = avl_get_next(client_node);
}
/** clear pending tree **/
while (avl_get_first(source->pending_tree)) {
avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, _remove_client);
}
/* release write lock on pending_tree */
avl_tree_unlock(source->pending_tree);
/* release write lock on client_tree */
avl_tree_unlock(source->client_tree);
}
printf("DEBUG: we're going down...\n");
/* we need to empty the client and pending trees */
avl_tree_wlock(source->pending_tree);
while (avl_get_first(source->pending_tree)) {
avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, _free_client);
}
avl_tree_unlock(source->pending_tree);
avl_tree_wlock(source->client_tree);
while (avl_get_first(source->client_tree)) {
avl_delete(source->client_tree, avl_get_first(source->client_tree)->key, _free_client);
}
avl_tree_unlock(source->client_tree);
/* delete this sources stats */
stats_event_dec(NULL, "sources");
stats_event(source->mount, "listeners", NULL);
printf("DEBUG: souce_main() is now exiting...\n");
global_lock();
global.sources--;
global_unlock();
/* release our hold on the lock so the main thread can continue cleaning up */
thread_rwlock_unlock(source->shutdown_rwlock);
avl_tree_wlock(global.source_tree);
avl_delete(global.source_tree, source, source_free_source);
avl_tree_unlock(global.source_tree);
thread_exit(0);
return NULL;
}
static int _compare_clients(void *compare_arg, void *a, void *b)
{
connection_t *cona = (connection_t *)a;
connection_t *conb = (connection_t *)b;
if (cona->id < conb->id) return -1;
if (cona->id > conb->id) return 1;
return 0;
}
static int _remove_client(void *key)
{
return 1;
}
static int _free_client(void *key)
{
client_t *client = (client_t *)key;
client_destroy(client);
return 1;
}

26
src/source.h Normal file
View File

@ -0,0 +1,26 @@
#ifndef __SOURCE_H__
#define __SOURCE_H__
typedef struct source_tag
{
connection_t *con;
http_parser_t *parser;
char *mount;
format_plugin_t *format;
avl_tree *client_tree;
avl_tree *pending_tree;
rwlock_t *shutdown_rwlock;
} source_t;
source_t *source_create(connection_t *con, http_parser_t *parser, const char *mount, format_type_t type);
source_t *source_find_mount(const char *mount);
int source_compare_sources(void *arg, void *a, void *b);
int source_free_source(void *key);
void *source_main(void *arg);
#endif

740
src/stats.c Normal file
View File

@ -0,0 +1,740 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdarg.h>
#include <xmlmemory.h>
#include <parser.h>
#include <tree.h>
#include <thread/thread.h>
#include <avl/avl.h>
#include <httpp/httpp.h>
#include <net/sock.h>
#include "connection.h"
#include "global.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
typedef struct _event_listener_tag
{
stats_event_t **queue;
mutex_t *mutex;
struct _event_listener_tag *next;
} event_listener_t;
int _stats_running = 1;
long _stats_thread_id;
stats_t _stats;
mutex_t _stats_mutex;
stats_event_t *_global_event_queue;
mutex_t _global_event_mutex;
cond_t _event_signal_cond;
event_listener_t *_event_listeners;
static void *_stats_thread(void *arg);
static int _compare_stats(void *a, void *b, void *arg);
static int _compare_source_stats(void *a, void *b, void *arg);
static int _free_stats(void *key);
static int _free_source_stats(void *key);
static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue);
static stats_node_t *_find_node(avl_tree *tree, char *name);
static stats_source_t *_find_source(avl_tree *tree, char *source);
static void _free_event(stats_event_t *event);
void stats_initialize()
{
/* set up global struct */
_stats.global_tree = avl_tree_new(_compare_stats, NULL);
_stats.source_tree = avl_tree_new(_compare_source_stats, NULL);
/* set up global mutex */
thread_mutex_create(&_stats_mutex);
/* set up event signaler */
thread_cond_create(&_event_signal_cond);
/* set up stats queues */
_global_event_queue = NULL;
thread_mutex_create(&_global_event_mutex);
/* fire off the stats thread */
_stats_running = 1;
_stats_thread_id = thread_create("Stats Thread", _stats_thread, NULL, THREAD_ATTACHED);
}
void stats_shutdown()
{
/* wait for thread to exit */
_stats_running = 0;
thread_join(_stats_thread_id);
/* free the queues */
/* destroy the queue mutexes */
thread_mutex_destroy(&_global_event_mutex);
/* tear it all down */
thread_cond_destroy(&_event_signal_cond);
thread_mutex_destroy(&_stats_mutex);
avl_tree_free(_stats.source_tree, _free_source_stats);
avl_tree_free(_stats.global_tree, _free_stats);
}
stats_t *stats_get_stats()
{
// lock global stats
// copy stats
// unlock global stats
// return copied stats
return NULL;
}
void stats_event(char *source, char *name, char *value)
{
stats_event_t *node;
stats_event_t *event;
if (name == NULL || strcmp(name, "") == 0) return;
/* build event */
event = (stats_event_t *)malloc(sizeof(stats_event_t));
event->source = NULL;
if (source != NULL) event->source = (char *)strdup(source);
event->name = (char *)strdup(name);
event->value = NULL;
event->next = NULL;
if (value != NULL) event->value = (char *)strdup(value);
/* queue event */
thread_mutex_lock(&_global_event_mutex);
if (_global_event_queue == NULL) {
_global_event_queue = event;
} else {
node = _global_event_queue;
while (node->next) node = node->next;
node->next = event;
}
thread_mutex_unlock(&_global_event_mutex);
}
void stats_event_args(char *source, char *name, char *format, ...)
{
char buf[1024];
va_list val;
va_start(val, format);
vsnprintf(buf, 1024, format, val);
va_end(val);
stats_event(source, name, buf);
}
static char *_get_stats(char *source, char *name)
{
stats_node_t *stats = NULL;
stats_source_t *src = NULL;
char *value = NULL;
thread_mutex_lock(&_stats_mutex);
if (source == NULL) {
stats = _find_node(_stats.global_tree, name);
} else {
src = _find_source(_stats.source_tree, source);
if (src) {
stats = _find_node(src->stats_tree, name);
}
}
if (stats) value = (char *)strdup(stats->value);
thread_mutex_unlock(&_stats_mutex);
return value;
}
void stats_event_inc(char *source, char *name)
{
char *old_value;
int new_value;
old_value = _get_stats(source, name);
if (old_value != NULL) {
new_value = atoi(old_value);
free(old_value);
new_value++;
} else {
new_value = 1;
}
stats_event_args(source, name, "%d", new_value);
}
void stats_event_add(char *source, char *name, unsigned long value)
{
char *old_value;
unsigned long new_value;
old_value = _get_stats(source, name);
if (old_value != NULL) {
new_value = atol(old_value);
free(old_value);
new_value += value;
} else {
new_value = value;
}
stats_event_args(source, name, "%ld", new_value);
}
void stats_event_dec(char *source, char *name)
{
char *old_value;
int new_value;
old_value = _get_stats(source, name);
if (old_value != NULL) {
new_value = atoi(old_value);
free(old_value);
new_value--;
if (new_value < 0) new_value = 0;
} else {
new_value = 0;
}
stats_event_args(source, name, "%d", new_value);
}
/* note: you must call this function only when you have exclusive access
** to the avl_tree
*/
static stats_node_t *_find_node(avl_tree *stats_tree, char *name)
{
stats_node_t *stats;
avl_node *node;
int cmp;
/* get the root node */
node = stats_tree->root->right;
while (node) {
stats = (stats_node_t *)node->key;
cmp = strcmp(name, stats->name);
if (cmp < 0)
node = node->left;
else if (cmp > 0)
node = node->right;
else
return stats;
}
/* didn't find it */
return NULL;
}
/* note: you must call this function only when you have exclusive access
** to the avl_tree
*/
static stats_source_t *_find_source(avl_tree *source_tree, char *source)
{
stats_source_t *stats;
avl_node *node;
int cmp;
/* get the root node */
node = source_tree->root->right;
while (node) {
stats = (stats_source_t *)node->key;
cmp = strcmp(source, stats->source);
if (cmp < 0)
node = node->left;
else if (cmp > 0)
node = node->right;
else
return stats;
}
/* didn't find it */
return NULL;
}
static stats_event_t *_copy_event(stats_event_t *event)
{
stats_event_t *copy = (stats_event_t *)malloc(sizeof(stats_event_t));
if (event->source)
copy->source = (char *)strdup(event->source);
else
copy->source = NULL;
copy->name = (char *)strdup(event->name);
copy->value = (char *)strdup(event->value);
copy->next = NULL;
return copy;
}
static void *_stats_thread(void *arg)
{
stats_event_t *event;
stats_event_t *copy;
stats_node_t *node;
stats_node_t *anode;
stats_source_t *snode;
stats_source_t *asnode;
event_listener_t *listener;
avl_node *avlnode;
while (_stats_running) {
thread_mutex_lock(&_global_event_mutex);
if (_global_event_queue != NULL) {
/* grab the next event from the queue */
event = _global_event_queue;
_global_event_queue = event->next;
event->next = NULL;
thread_mutex_unlock(&_global_event_mutex);
thread_mutex_lock(&_stats_mutex);
if (event->source == NULL) {
/* we have a global event */
if (event->value != NULL) {
/* adding/updating */
node = _find_node(_stats.global_tree, event->name);
if (node == NULL) {
/* add node */
anode = (stats_node_t *)malloc(sizeof(stats_node_t));
anode->name = (char *)strdup(event->name);
anode->value = (char *)strdup(event->value);
avl_insert(_stats.global_tree, (void *)anode);
} else {
/* update node */
free(node->value);
node->value = (char *)strdup(event->value);
}
} else {
/* we're deleting */
node = _find_node(_stats.global_tree, event->name);
if (node != NULL)
avl_delete(_stats.global_tree, (void *)node, _free_stats);
}
} else {
/* we have a source event */
snode = _find_source(_stats.source_tree, event->source);
if (snode != NULL) {
/* this is a source we already have a tree for */
if (event->value != NULL) {
/* we're add/updating */
node = _find_node(snode->stats_tree, event->name);
if (node == NULL) {
/* adding node */
anode = (stats_node_t *)malloc(sizeof(stats_node_t));
anode->name = (char *)strdup(event->name);
anode->value = (char *)strdup(event->value);
avl_insert(snode->stats_tree, (void *)anode);
} else {
/* updating node */
free(node->value);
node->value = (char *)strdup(event->value);
}
} else {
/* we're deleting */
node = _find_node(snode->stats_tree, event->name);
if (node != NULL) {
avl_delete(snode->stats_tree, (void *)node, _free_stats);
avlnode = avl_get_first(snode->stats_tree);
if (avlnode == NULL) {
avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
}
}
}
} else {
/* this is a new source */
asnode = (stats_source_t *)malloc(sizeof(stats_source_t));
asnode->source = (char *)strdup(event->source);
asnode->stats_tree = avl_tree_new(_compare_stats, NULL);
anode = (stats_node_t *)malloc(sizeof(stats_node_t));
anode->name = (char *)strdup(event->name);
anode->value = (char *)strdup(event->value);
avl_insert(asnode->stats_tree, (void *)anode);
avl_insert(_stats.source_tree, (void *)asnode);
}
}
/* now we have an event that's been processed into the running stats */
/* this event should get copied to event listeners' queues */
listener = _event_listeners;
while (listener) {
copy = _copy_event(event);
thread_mutex_lock(listener->mutex);
_add_event_to_queue(copy, listener->queue);
thread_mutex_unlock(listener->mutex);
listener = listener->next;
}
thread_cond_broadcast(&_event_signal_cond);
/* now we need to destroy the event */
_free_event(event);
thread_mutex_unlock(&_stats_mutex);
} else {
thread_mutex_unlock(&_global_event_mutex);
}
thread_sleep(300000);
}
thread_exit(0);
return NULL;
}
/* you must have the _stats_mutex locked here */
static void _register_listener(stats_event_t **queue, mutex_t *mutex)
{
event_listener_t *node;
event_listener_t *evli = (event_listener_t *)malloc(sizeof(event_listener_t));
evli->queue = queue;
evli->mutex = mutex;
evli->next = NULL;
if (_event_listeners == NULL) {
_event_listeners = evli;
} else {
node = _event_listeners;
while (node->next) node = node->next;
node->next = evli;
}
}
static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
{
stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t));
if (source != NULL)
event->source = (char *)strdup(source);
else
event->source = NULL;
event->name = (char *)strdup(node->name);
event->value = (char *)strdup(node->value);
event->next = NULL;
return event;
}
static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue)
{
stats_event_t *node;
if (*queue == NULL) {
*queue = event;
} else {
node = *queue;
while (node->next) node = node->next;
node->next = event;
}
}
static stats_event_t *_get_event_from_queue(stats_event_t **queue)
{
stats_event_t *event;
if (*queue == NULL) return NULL;
event = *queue;
*queue = (*queue)->next;
event->next = NULL;
return event;
}
static int _send_event_to_client(stats_event_t *event, connection_t *con)
{
int ret;
/* send data to the client!!!! */
ret = sock_write(con->sock, "EVENT %s %s %s\n", (event->source != NULL) ? event->source : "global", event->name, event->value);
return (ret == -1) ? 0 : 1;
}
void _dump_stats_to_queue(stats_event_t **queue)
{
avl_node *node;
avl_node *node2;
stats_event_t *event;
stats_source_t *source;
thread_mutex_lock(&_stats_mutex);
/* first we fill our queue with the current stats */
/* start with the global stats */
node = avl_get_first(_stats.global_tree);
while (node) {
event = _make_event_from_node((stats_node_t *)node->key, NULL);
_add_event_to_queue(event, queue);
node = avl_get_next(node);
}
/* now the stats for each source */
node = avl_get_first(_stats.source_tree);
while (node) {
source = (stats_source_t *)node->key;
node2 = avl_get_first(source->stats_tree);
while (node2) {
event = _make_event_from_node((stats_node_t *)node2->key, source->source);
_add_event_to_queue(event, queue);
node2 = avl_get_next(node2);
}
node = avl_get_next(node);
}
thread_mutex_unlock(&_stats_mutex);
}
void *stats_connection(void *arg)
{
stats_connection_t *statcon = (stats_connection_t *)arg;
stats_event_t *local_event_queue = NULL;
mutex_t local_event_mutex;
avl_node *node;
avl_node *node2;
stats_event_t *event;
stats_source_t *source;
thread_mutex_create(&local_event_mutex);
/* we must get the current stats and register for events atomicly.
** we don't want to miss any and have inaccurate stats.
*/
thread_mutex_lock(&_stats_mutex);
/* first we fill our queue with the current stats */
/* start with the global stats */
node = avl_get_first(_stats.global_tree);
while (node) {
event = _make_event_from_node((stats_node_t *)node->key, NULL);
_add_event_to_queue(event, &local_event_queue);
node = avl_get_next(node);
}
/* now the stats for each source */
node = avl_get_first(_stats.source_tree);
while (node) {
source = (stats_source_t *)node->key;
node2 = avl_get_first(source->stats_tree);
while (node2) {
event = _make_event_from_node((stats_node_t *)node2->key, source->source);
_add_event_to_queue(event, &local_event_queue);
node2 = avl_get_next(node2);
}
node = avl_get_next(node);
}
/* now we register to receive future event notices */
_register_listener(&local_event_queue, &local_event_mutex);
thread_mutex_unlock(&_stats_mutex);
while (global.running == ICE_RUNNING) {
thread_mutex_lock(&local_event_mutex);
event = _get_event_from_queue(&local_event_queue);
if (event != NULL) {
if (!_send_event_to_client(event, statcon->con)) {
_free_event(event);
thread_mutex_unlock(&local_event_mutex);
break;
}
_free_event(event);
} else {
thread_mutex_unlock(&local_event_mutex);
thread_cond_wait(&_event_signal_cond);
continue;
}
thread_mutex_unlock(&local_event_mutex);
}
thread_mutex_destroy(&local_event_mutex);
thread_exit(0);
return NULL;
}
typedef struct _source_xml_tag {
char *mount;
xmlNodePtr node;
struct _source_xml_tag *next;
} source_xml_t;
static xmlNodePtr _find_xml_node(char *mount, source_xml_t **list, xmlNodePtr root)
{
source_xml_t *node, *node2;
int found = 0;
/* search for existing node */
node = *list;
while (node) {
if (strcmp(node->mount, mount) == 0) {
found = 1;
break;
}
node = node->next;
}
if (found) return node->node;
/* if we didn't find it, we must build it and add it to the list */
/* build node */
node = (source_xml_t *)malloc(sizeof(source_xml_t));
node->mount = strdup(mount);
node->node = xmlNewChild(root, NULL, "source", NULL);
xmlSetProp(node->node, "mount", mount);
node->next = NULL;
/* add node */
if (*list == NULL) {
*list = node;
} else {
node2 = *list;
while (node2->next) node2 = node2->next;
node2->next = node;
}
return node->node;
}
void stats_sendxml(client_t *client)
{
int bytes;
stats_event_t *event;
stats_event_t *queue;
xmlDocPtr doc;
xmlNodePtr node, srcnode;
int len;
char *buff = NULL;
source_xml_t *snd;
source_xml_t *src_nodes = NULL;
queue = NULL;
_dump_stats_to_queue(&queue);
doc = xmlNewDoc("1.0");
node = xmlNewDocNode(doc, NULL, "icestats", NULL);
xmlDocSetRootElement(doc, node);
event = _get_event_from_queue(&queue);
while (event) {
if (event->source == NULL) {
xmlNewChild(node, NULL, event->name, event->value);
} else {
srcnode = _find_xml_node(event->source, &src_nodes, node);
xmlNewChild(srcnode, NULL, event->name, event->value);
}
_free_event(event);
event = _get_event_from_queue(&queue);
}
xmlDocDumpMemory(doc, (xmlChar **)&buff, &len);
xmlFreeDoc(doc);
client->respcode = 200;
bytes = sock_write(client->con->sock, "HTTP/1.0 200 OK\r\n"
"Content-Length: %d\r\n"
"Content-Type: text/xml\r\n"
"\r\n", len);
if (bytes > 0) client->con->sent_bytes += bytes;
else goto send_error;
bytes = sock_write_bytes(client->con->sock, buff, len);
if (bytes > 0) client->con->sent_bytes += bytes;
send_error:
while (src_nodes) {
snd = src_nodes->next;
free(src_nodes->mount);
free(src_nodes);
src_nodes = snd;
}
if (buff) free(buff);
client_destroy(client);
}
static int _compare_stats(void *arg, void *a, void *b)
{
stats_node_t *nodea = (stats_node_t *)a;
stats_node_t *nodeb = (stats_node_t *)b;
return strcmp(nodea->name, nodeb->name);
}
static int _compare_source_stats(void *arg, void *a, void *b)
{
stats_source_t *nodea = (stats_source_t *)a;
stats_source_t *nodeb = (stats_source_t *)b;
return strcmp(nodea->source, nodeb->source);
}
static int _free_stats(void *key)
{
stats_node_t *node = (stats_node_t *)key;
free(node->value);
free(node->name);
free(node);
return 1;
}
static int _free_source_stats(void *key)
{
stats_source_t *node = (stats_source_t *)key;
avl_tree_free(node->stats_tree, _free_stats);
free(node->source);
return 1;
}
static void _free_event(stats_event_t *event)
{
if (event->source) free(event->source);
if (event->name) free(event->name);
if (event->value) free(event->value);
free(event);
}

75
src/stats.h Normal file
View File

@ -0,0 +1,75 @@
#ifndef __STATS_H__
#define __STATS_H__
typedef struct _stats_connection_tag
{
connection_t *con;
http_parser_t *parser;
} stats_connection_t;
typedef struct _stats_node_tag
{
char *name;
char *value;
} stats_node_t;
typedef struct _stats_event_tag
{
char *source;
char *name;
char *value;
struct _stats_event_tag *next;
} stats_event_t;
typedef struct _stats_source_tag
{
char *source;
avl_tree *stats_tree;
} stats_source_t;
typedef struct _stats_tag
{
avl_tree *global_tree;
/* global stats
start_time
total_users
max_users
total_sources
max_sources
total_user_connections
total_source_connections
*/
avl_tree *source_tree;
/* stats by source, and for stats
start_time
total_users
max_users
*/
} stats_t;
void stats_initialize();
void stats_shutdown();
stats_t *stats_get_stats();
void stats_event(char *source, char *name, char *value);
void stats_event_args(char *source, char *name, char *format, ...);
void stats_event_inc(char *source, char *name);
void stats_event_add(char *source, char *name, unsigned long value);
void stats_event_dec(char *source, char *name);
void *stats_connection(void *arg);
void stats_sendxml(client_t *client);
#endif /* __STATS_H__ */

58
src/util.c Normal file
View File

@ -0,0 +1,58 @@
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#ifndef _WIN32
#include <unistd.h>
#endif
#include "sock.h"
#include "config.h"
#include "util.h"
int util_read_header(int sock, char *buff, unsigned long len)
{
fd_set rfds;
int read_bytes, ret;
unsigned long pos;
char c;
struct timeval tv;
ice_config_t *config;
config = config_get_config();
read_bytes = 1;
pos = 0;
ret = 0;
while ((read_bytes == 1) && (pos < (len - 1))) {
read_bytes = 0;
FD_ZERO(&rfds);
FD_SET(sock, &rfds);
tv.tv_sec = config->header_timeout;
tv.tv_usec = 0;
if (select(sock + 1, &rfds, NULL, NULL, &tv) > 0) {
if ((read_bytes = recv(sock, &c, 1, 0))) {
if (c != '\r') buff[pos++] = c;
if ((pos > 1) && (buff[pos - 1] == '\n' && buff[pos - 2] == '\n')) {
ret = 1;
break;
}
}
} else {
break;
}
}
if (ret) buff[pos] = '\0';
return ret;
}

6
src/util.h Normal file
View File

@ -0,0 +1,6 @@
#ifndef __UTIL_H__
#define __UTIL_H__
int util_read_header(int sock, char *buff, unsigned long len);
#endif /* __UTIL_H__ */