1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-09-22 04:15:54 -04:00

bump kh30.

Mostly internal updates dealing with a quick response then termination of client. A
few possible leaks cases plugged and guards for invalid cases. Trying out an updated
stats API to avoid lookups (only partly used currently). Reduced memory copies on xsl
requests by writing direct to refbufs.


svn path=/icecast/branches/kh/icecast/; revision=18080
This commit is contained in:
Karl Heyes 2011-09-10 13:11:07 +00:00
parent 544525c6c1
commit deba1eba00
34 changed files with 1202 additions and 876 deletions

31
NEWS
View File

@ -16,6 +16,37 @@ Feature differences from SVN trunk
any extra tags are show in the conf/icecast.xml.dist file
2.3.2-kh30
. stats updates.
Split stats lock into global and source stats locks, reduces contention.
Updated internal API for using a stats handle for quick lookup when updating
several stats at once.
Make metadata_updated stat to be sent last in the initial listing, we would want
that to appear after title, useful as a trigger for updates.
make NEW also report the content type after the mountpoint
. re-work metadata insert merge with shoutcast inserts for mp3/aac, do metadata
followed by stream data instead of the other way around, reduces code.
. fix url decode crash bug with %invalid code.
. prevent yp add if required data is missing.
. allow for all args to be supplied to xsl transforms from request not just mount
. xsl transforms are written direct to refbufs, so should avoid a memory copy on
each transform, should help in cases of large data sets.
. Add redirect tag on <mount>
. change default to allow ogg matadata via url (for consistency with trunk).
. have master relays supply auth by default.
. allow for quick responses to be tried and possible close the connection, saves
rescheduling the worker for just closing the socket.
. streamlist.txt was not getting tried if /admin/streams failed. IE kh slave to a
2.3.2 master.
. reduce duration of when the config write lock is held when doing reload.
. a number of tunings for rescheduling clients (source and stats mainly).
. small reworking of client_t creation and accounting.
. missing redirect new listener case not handled.
. source count could get out of sync in certainly failing relay cases.
. a few cases of memory leaks fixed, not commonly used but can accumulate in
certain setups, eg intros via auth or reloading frequently.
. fix win32 snprintf missing nul char bug
. automatically set nofile and corefile size limits to max allowed, for debug
2.3.2-kh29
. prevent crash with very small burst size (eg 0).

View File

@ -95,7 +95,7 @@
#define PACKAGE_NAME "Icecast"
/* Version number of package */
#define VERSION "2.3.2-kh29"
#define VERSION "2.3.2-kh30"
/* Define to the version of this package. */
#define PACKAGE_VERSION VERSION
@ -132,8 +132,11 @@ typedef unsigned int socklen_t;
#define strcasecmp _stricmp
#define strncasecmp _strnicmp
#define snprintf _snprintf
#define vsnprintf _vsnprintf
#include <stdarg.h>
int msvc_snprintf (char *buf, int len, const char *fmt, ...);
int msvc_vsnprintf (char *buf, int len, const char *fmt, va_list);
#define snprintf msvc_snprintf
#define vsnprintf msvc_vsnprintf
#define getpid _getpid
#define atoll _atoi64
#define pipe(x) _pipe(x,255,O_BINARY)

View File

@ -1,4 +1,4 @@
AC_INIT([Icecast], [2.3.2-kh29], [karl@xiph.org])
AC_INIT([Icecast], [2.3.2-kh30], [karl@xiph.org])
LT_INIT
AC_PREREQ(2.59)

View File

@ -44,31 +44,25 @@
#define CATMODULE "admin"
static void command_fallback(client_t *client, source_t *source, int response);
static void command_metadata(client_t *client, source_t *source, int response);
static void command_shoutcast_metadata(client_t *client, source_t *source);
static void command_show_listeners(client_t *client, source_t *source,
int response);
static void command_move_clients(client_t *client, source_t *source,
int response);
static void command_stats(client_t *client, const char *filename);
static void command_stats_mount (client_t *client, source_t *source, int response);
static void command_kill_client(client_t *client, source_t *source,
int response);
static void command_reset_stats (client_t *client, source_t *source, int response);
static void command_manageauth(client_t *client, source_t *source,
int response);
static void command_buildm3u(client_t *client, const char *mount);
static void command_show_image (client_t *client, const char* mount);
static void command_kill_source(client_t *client, source_t *source,
int response);
static void command_updatemetadata(client_t *client, source_t *source,
int response);
static void command_admin_function (client_t *client, int response);
static void command_list_log (client_t *client, int response);
static void command_manage_relay (client_t *client, int response);
static int command_fallback(client_t *client, source_t *source, int response);
static int command_metadata(client_t *client, source_t *source, int response);
static int command_shoutcast_metadata(client_t *client, source_t *source);
static int command_show_listeners(client_t *client, source_t *source, int response);
static int command_move_clients(client_t *client, source_t *source, int response);
static int command_stats(client_t *client, const char *filename);
static int command_stats_mount (client_t *client, source_t *source, int response);
static int command_kill_client(client_t *client, source_t *source, int response);
static int command_reset_stats (client_t *client, source_t *source, int response);
static int command_manageauth(client_t *client, source_t *source, int response);
static int command_buildm3u(client_t *client, const char *mount);
static int command_show_image (client_t *client, const char* mount);
static int command_kill_source(client_t *client, source_t *source, int response);
static int command_updatemetadata(client_t *client, source_t *source, int response);
static int command_admin_function (client_t *client, int response);
static int command_list_log (client_t *client, int response);
static int command_manage_relay (client_t *client, int response);
static void admin_handle_general_request(client_t *client, const char *command);
static int admin_handle_general_request(client_t *client, const char *command);
struct admin_command
@ -77,8 +71,8 @@ struct admin_command
admin_response_type response;
union {
void *x; /* not used but helps on initialisations */
void (*source)(client_t *client, source_t *source, int response);
void (*general)(client_t *client, int response);
int (*source)(client_t *client, source_t *source, int response);
int (*general)(client_t *client, int response);
} handle;
};
@ -202,9 +196,12 @@ xmlDocPtr admin_build_sourcelist (const char *mount)
return(doc);
}
void admin_send_response(xmlDocPtr doc, client_t *client,
int admin_send_response (xmlDocPtr doc, client_t *client,
admin_response_type response, const char *xslt_template)
{
int ret = -1;
if (response == RAW)
{
xmlChar *buff = NULL;
@ -220,8 +217,9 @@ void admin_send_response(xmlDocPtr doc, client_t *client,
len = snprintf (client->refbuf->data, buf_len, "%s%d\r\n\r\n%s", http, len, buff);
client->refbuf->len = len;
xmlFree(buff);
xmlFreeDoc (doc);
client->respcode = 200;
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
if (response == XSLT)
{
@ -237,9 +235,11 @@ void admin_send_response(xmlDocPtr doc, client_t *client,
config_release_config();
DEBUG1("Sending XSLT (%s)", fullpath_xslt_template);
xslt_transform(doc, fullpath_xslt_template, client);
ret = xslt_transform (doc, fullpath_xslt_template, client);
free(fullpath_xslt_template);
xmlFreeDoc(doc);
}
return ret;
}
@ -259,7 +259,8 @@ static struct admin_command *find_admin_command (struct admin_command *list, con
return list;
}
void admin_mount_request (client_t *client, const char *uri)
int admin_mount_request (client_t *client, const char *uri)
{
source_t *source;
const char *mount = httpp_get_query_param (client->parser, "mount");
@ -267,16 +268,12 @@ void admin_mount_request (client_t *client, const char *uri)
struct admin_command *cmd = find_admin_command (admin_mount, uri);
if (cmd == NULL)
{
command_stats (client, uri);
return;
}
return command_stats (client, uri);
if (cmd == NULL || cmd->handle.source == NULL)
{
INFO0("mount request not recognised");
client_send_400 (client, "unknown request");
return;
return client_send_400 (client, "unknown request");
}
avl_tree_rlock(global.source_tree);
@ -286,57 +283,41 @@ void admin_mount_request (client_t *client, const char *uri)
{
avl_tree_unlock(global.source_tree);
if (strncmp (cmd->request, "stats", 5) == 0)
{
command_stats (client, uri);
return;
}
return command_stats (client, uri);
if (strncmp (cmd->request, "listclients", 11) == 0)
{
fserve_list_clients (client, mount, cmd->response, 1);
return;
}
return fserve_list_clients (client, mount, cmd->response, 1);
if (strncmp (cmd->request, "killclient", 10) == 0)
{
fserve_kill_client (client, mount, cmd->response);
return;
}
return fserve_kill_client (client, mount, cmd->response);
WARN1("Admin command on non-existent source %s", mount);
client_send_400 (client, "Source does not exist");
return client_send_400 (client, "Source does not exist");
}
else
{
int ret = 0;
thread_mutex_lock (&source->lock);
if (source_available (source) == 0)
{
thread_mutex_unlock (&source->lock);
avl_tree_unlock (global.source_tree);
INFO1("Received admin command on unavailable mount \"%s\"", mount);
client_send_400 (client, "Source is not available");
return;
return client_send_400 (client, "Source is not available");
}
cmd->handle.source (client, source, cmd->response);
ret = cmd->handle.source (client, source, cmd->response);
avl_tree_unlock(global.source_tree);
return ret;
}
}
int admin_handle_request (client_t *client, const char *uri)
{
const char *mount;
if (strcmp (uri, "/admin.cgi") != 0 && strncmp("/admin/", uri, 7) != 0)
return -1;
mount = httpp_get_query_param(client->parser, "mount");
const char *mount = httpp_get_query_param(client->parser, "mount");
if (strcmp (uri, "/admin.cgi") == 0)
{
const char *pass = httpp_get_query_param (client->parser, "pass");
if (pass == NULL)
{
client_send_400 (client, "missing pass parameter");
return 0;
}
return client_send_400 (client, "missing pass parameter");
uri++;
if (mount == NULL)
{
@ -367,15 +348,9 @@ int admin_handle_request (client_t *client, const char *uri)
{
/* no auth/stream required for this */
if (strcmp (uri, "buildm3u") == 0)
{
command_buildm3u (client, mount);
return 0;
}
return command_buildm3u (client, mount);
if (strcmp (uri, "showimage") == 0)
{
command_show_image (client, mount);
return 0;
}
return command_show_image (client, mount);
/* This is a mount request, but admin user is allowed */
if ((client->flags & CLIENT_AUTHENTICATED) == 0)
@ -387,49 +362,40 @@ int admin_handle_request (client_t *client, const char *uri)
default:
INFO1("Bad or missing password on mount modification "
"admin request (%s)", uri);
client_send_401 (client, NULL);
return client_send_401 (client, NULL);
/* fall through */
case 1:
return 0;
}
}
if (strcmp (uri, "streams") == 0)
auth_add_listener ("/admin/streams", client);
else
admin_mount_request (client, uri);
return 0;
return auth_add_listener ("/admin/streams", client);
return admin_mount_request (client, uri);
}
admin_handle_general_request (client, uri);
return 0;
return admin_handle_general_request (client, uri);
}
static void admin_handle_general_request(client_t *client, const char *uri)
static int admin_handle_general_request (client_t *client, const char *uri)
{
struct admin_command *cmd;
if ((client->flags & CLIENT_AUTHENTICATED) == 0)
{
INFO1("Bad or missing password on admin command request (%s)", uri);
client_send_401 (client, NULL);
return;
return client_send_401 (client, NULL);
}
cmd = find_admin_command (admin_general, uri);
if (cmd == NULL)
{
INFO1 ("processing file %s", uri);
command_stats (client, uri);
return;
return command_stats (client, uri);
}
if (cmd->handle.general == NULL)
{
client_send_400 (client, "unknown request");
return;
}
cmd->handle.general (client, cmd->response);
return client_send_400 (client, "unknown request");
return cmd->handle.general (client, cmd->response);
}
@ -437,17 +403,15 @@ static void admin_handle_general_request(client_t *client, const char *uri)
static int command_require (client_t *client, const char *name, const char **var)
{
*var = httpp_get_query_param((client)->parser, (name));
if (*var == NULL) {
client_send_400((client), "Missing parameter");
if (*var == NULL)
return -1;
}
return 0;
}
#define COMMAND_OPTIONAL(client,name,var) \
(var) = httpp_get_query_param((client)->parser, (name))
static void html_success(client_t *client, char *message)
static int html_success (client_t *client, const char *message)
{
client->respcode = 200;
snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
@ -455,12 +419,11 @@ static void html_success(client_t *client, char *message)
"<html><head><title>Admin request successful</title></head>"
"<body><p>%s</p></body></html>", message);
client->refbuf->len = strlen (client->refbuf->data);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
static void command_move_clients(client_t *client, source_t *source,
int response)
static int command_move_clients (client_t *client, source_t *source, int response)
{
const char *dest_source;
xmlDocPtr doc;
@ -474,9 +437,7 @@ static void command_move_clients(client_t *client, source_t *source,
if (!parameters_passed) {
doc = admin_build_sourcelist(source->mount);
thread_mutex_unlock (&source->lock);
admin_send_response(doc, client, response, "moveclients.xsl");
xmlFreeDoc(doc);
return;
return admin_send_response(doc, client, response, "moveclients.xsl");
}
INFO2 ("source is \"%s\", destination is \"%s\"", source->mount, dest_source);
@ -494,10 +455,10 @@ static void command_move_clients(client_t *client, source_t *source,
xmlNewChild(node, NULL, XMLSTR("message"), XMLSTR(buf));
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "response.xsl");
}
static int admin_function (const char *function, char *buf, unsigned int len)
{
if (strcmp (function, "reopenlog") == 0)
@ -523,7 +484,7 @@ static int admin_function (const char *function, char *buf, unsigned int len)
}
static void command_admin_function (client_t *client, int response)
static int command_admin_function (client_t *client, int response)
{
xmlDocPtr doc;
xmlNodePtr node;
@ -531,12 +492,9 @@ static void command_admin_function (client_t *client, int response)
char buf[256];
if (COMMAND_REQUIRE (client, "perform", perform) < 0)
return;
return client_send_400 (client, "missing arg, perform");
if (admin_function (perform, buf, sizeof buf) < 0)
{
client_send_400 (client, "No such handler");
return;
}
return client_send_400 (client, "No such handler");
doc = xmlNewDoc(XMLSTR("1.0"));
node = xmlNewDocNode(doc, NULL, XMLSTR("iceresponse"), NULL);
xmlDocSetRootElement(doc, node);
@ -544,8 +502,7 @@ static void command_admin_function (client_t *client, int response)
xmlNewChild(node, NULL, XMLSTR("message"), XMLSTR(buf));
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "response.xsl");
}
@ -574,7 +531,7 @@ static void add_relay_xmlnode (xmlNodePtr node, relay_server *relay, int from_ma
}
static void command_manage_relay (client_t *client, int response)
static int command_manage_relay (client_t *client, int response)
{
const char *relay_mount, *enable;
const char *msg;
@ -598,9 +555,7 @@ static void command_manage_relay (client_t *client, int response)
add_relay_xmlnode (node, relay, 1);
thread_mutex_unlock (&(config_locks()->relay_lock));
admin_send_response (doc, client, response, "managerelays.xsl");
xmlFreeDoc (doc);
return;
return admin_send_response (doc, client, response, "managerelays.xsl");
}
thread_mutex_lock (&(config_locks()->relay_lock));
@ -612,7 +567,18 @@ static void command_manage_relay (client_t *client, int response)
msg = "no such relay";
if (relay)
{
source_t *source = relay->source;
client_t *client;
thread_mutex_lock (&source->lock);
client = source->client;
relay->running = atoi (enable) ? 1 : 0;
if (client)
{
client->schedule_ms = 0;
worker_wakeup (client->worker);
}
thread_mutex_unlock (&source->lock);
msg = "relay has been changed";
}
thread_mutex_unlock (&(config_locks()->relay_lock));
@ -622,8 +588,7 @@ static void command_manage_relay (client_t *client, int response)
xmlDocSetRootElement(doc, node);
xmlNewChild(node, NULL, XMLSTR("message"), XMLSTR(msg));
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return admin_send_response(doc, client, response, "response.xsl");
}
@ -690,7 +655,7 @@ void admin_source_listeners (source_t *source, xmlNodePtr srcnode)
}
static void command_reset_stats (client_t *client, source_t *source, int response)
static int command_reset_stats (client_t *client, source_t *source, int response)
{
const char *msg = "Failed to reset values";
const char *name = httpp_get_query_param (client->parser, "setting");
@ -726,16 +691,15 @@ static void command_reset_stats (client_t *client, source_t *source, int respons
xmlDocSetRootElement(doc, node);
xmlNewChild(node, NULL, XMLSTR("message"), XMLSTR(msg));
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "response.xsl");
}
static void command_show_listeners(client_t *client, source_t *source,
int response)
static int command_show_listeners (client_t *client, source_t *source, int response)
{
xmlDocPtr doc;
xmlNodePtr node, srcnode;
unsigned long id = -1;
long id = -1;
const char *ID_str = NULL;
char buf[22];
@ -764,11 +728,11 @@ static void command_show_listeners(client_t *client, source_t *source,
}
thread_mutex_unlock (&source->lock);
admin_send_response(doc, client, response, "listclients.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "listclients.xsl");
}
static void command_show_image (client_t *client, const char *mount)
static int command_show_image (client_t *client, const char *mount)
{
source_t *source;
@ -781,17 +745,17 @@ static void command_show_image (client_t *client, const char *mount)
if (source->format->get_image (client, source->format) == 0)
{
thread_mutex_unlock (&source->lock);
fserve_setup_client (client, NULL);
return;
return fserve_setup_client (client);
}
thread_mutex_unlock (&source->lock);
}
else
avl_tree_unlock (global.source_tree);
client_send_404 (client, "No image available");
return client_send_404 (client, "No image available");
}
static void command_buildm3u (client_t *client, const char *mount)
static int command_buildm3u (client_t *client, const char *mount)
{
const char *username = NULL;
const char *password = NULL;
@ -800,7 +764,7 @@ static void command_buildm3u (client_t *client, const char *mount)
if (COMMAND_REQUIRE(client, "username", username) < 0 ||
COMMAND_REQUIRE(client, "password", password) < 0)
return;
return client_send_400 (client, "missing arg, username/password");
client->respcode = 200;
config = config_get_config();
@ -830,18 +794,17 @@ static void command_buildm3u (client_t *client, const char *mount)
config_release_config();
client->refbuf->len = strlen (client->refbuf->data);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
static void command_manageauth(client_t *client, source_t *source,
int response)
static int command_manageauth (client_t *client, source_t *source, int response)
{
xmlDocPtr doc;
xmlNodePtr node, srcnode, msgnode;
const char *action = NULL;
const char *username = NULL;
char *message = NULL;
const char *message = NULL;
int ret = AUTH_OK;
ice_config_t *config = config_get_config ();
mount_proxy *mountinfo = config_find_mount (config, source->mount);
@ -871,13 +834,13 @@ static void command_manageauth(client_t *client, source_t *source,
}
ret = mountinfo->auth->adduser(mountinfo->auth, username, password);
if (ret == AUTH_FAILED) {
message = strdup("User add failed - check the icecast error log");
message = "User add failed - check the icecast error log";
}
if (ret == AUTH_USERADDED) {
message = strdup("User added");
message = "User added";
}
if (ret == AUTH_USEREXISTS) {
message = strdup("User already exists - not added");
message = "User already exists - not added";
}
}
if (!strcmp(action, "delete"))
@ -889,10 +852,10 @@ static void command_manageauth(client_t *client, source_t *source,
}
ret = mountinfo->auth->deleteuser(mountinfo->auth, username);
if (ret == AUTH_FAILED) {
message = strdup("User delete failed - check the icecast error log");
message = "User delete failed - check the icecast error log";
}
if (ret == AUTH_USERDELETED) {
message = strdup("User deleted");
message = "User deleted";
}
}
@ -914,19 +877,16 @@ static void command_manageauth(client_t *client, source_t *source,
config_release_config ();
admin_send_response(doc, client, response, "manageauth.xsl");
free (message);
xmlFreeDoc(doc);
return;
return admin_send_response (doc, client, response, "manageauth.xsl");
} while (0);
thread_mutex_unlock (&source->lock);
config_release_config ();
client_send_400 (client, "missing parameter");
return client_send_400 (client, "missing parameter");
}
static void command_kill_source(client_t *client, source_t *source,
int response)
static int command_kill_source (client_t *client, source_t *source, int response)
{
xmlDocPtr doc;
xmlNodePtr node;
@ -940,12 +900,11 @@ static void command_kill_source(client_t *client, source_t *source,
source->flags &= ~SOURCE_RUNNING;
thread_mutex_unlock (&source->lock);
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "response.xsl");
}
static void command_kill_client(client_t *client, source_t *source,
int response)
static int command_kill_client (client_t *client, source_t *source, int response)
{
const char *idtext;
int id;
@ -955,7 +914,10 @@ static void command_kill_client(client_t *client, source_t *source,
char buf[50] = "";
if (COMMAND_REQUIRE(client, "id", idtext) < 0)
return;
{
thread_mutex_unlock (&source->lock);
return client_send_400 (client, "missing arg, id");
}
id = atoi(idtext);
@ -977,18 +939,16 @@ static void command_kill_client(client_t *client, source_t *source,
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
}
else {
memset(buf, '\000', sizeof(buf));
snprintf(buf, sizeof(buf)-1, "Client %d not found", id);
snprintf(buf, sizeof(buf), "Client %d not found", id);
xmlNewChild(node, NULL, XMLSTR("message"), XMLSTR(buf));
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("0"));
}
thread_mutex_unlock (&source->lock);
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "response.xsl");
}
static void command_fallback(client_t *client, source_t *source,
int response)
static int command_fallback (client_t *client, source_t *source, int response)
{
char *mount = strdup (source->mount);
mount_proxy *mountinfo;
@ -1004,21 +964,20 @@ static void command_fallback(client_t *client, source_t *source,
const char *fallback;
char buffer[200];
if (COMMAND_REQUIRE(client, "fallback", fallback) < 0)
return;
return client_send_400 (client, "missing arg, fallback");
xmlFree (mountinfo->fallback_mount);
mountinfo->fallback_mount = (char *)xmlCharStrdup (fallback);
snprintf (buffer, sizeof (buffer), "Fallback for \"%s\" configured", mountinfo->mountname);
config_release_config ();
html_success (client, buffer);
return;
return html_success (client, buffer);
}
config_release_config ();
client_send_400 (client, "no mount details available");
return client_send_400 (client, "no mount details available");
}
static void command_metadata(client_t *client, source_t *source,
int response)
static int command_metadata (client_t *client, source_t *source, int response)
{
const char *song, *title, *artist, *artwork, *charset, *url;
format_plugin_t *plugin;
@ -1060,7 +1019,8 @@ static void command_metadata(client_t *client, source_t *source,
}
if (song)
{
plugin->set_tag (plugin, "song", song, charset);
plugin->set_tag (plugin, "artist", NULL, NULL);
plugin->set_tag (plugin, "title", song, charset);
INFO2("Metadata song on %s set to \"%s\"", source->mount, song);
}
if (artist)
@ -1083,20 +1043,18 @@ static void command_metadata(client_t *client, source_t *source,
thread_mutex_unlock (&source->lock);
xmlNewChild(node, NULL, XMLSTR("message"), XMLSTR("Metadata update successful"));
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return;
return admin_send_response(doc, client, response, "response.xsl");
} while (0);
thread_mutex_unlock (&source->lock);
xmlNewChild(node, NULL, XMLSTR("message"),
XMLSTR("Mountpoint will not accept this URL update"));
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return admin_send_response(doc, client, response, "response.xsl");
}
static void command_shoutcast_metadata(client_t *client, source_t *source)
static int command_shoutcast_metadata (client_t *client, source_t *source)
{
const char *action;
const char *value;
@ -1105,15 +1063,14 @@ static void command_shoutcast_metadata(client_t *client, source_t *source)
if (COMMAND_REQUIRE(client, "mode", action) < 0)
{
thread_mutex_unlock (&source->lock);
return;
return client_send_400 (client, "missing arg, mode");
}
if ((source->flags & SOURCE_SHOUTCAST_COMPAT) == 0)
{
thread_mutex_unlock (&source->lock);
ERROR0 ("illegal request on non-shoutcast compatible stream");
client_send_400 (client, "Not a shoutcast compatible stream");
return;
return client_send_400 (client, "Not a shoutcast compatible stream");
}
if (strcmp (action, "updinfo") == 0)
@ -1122,7 +1079,7 @@ static void command_shoutcast_metadata(client_t *client, source_t *source)
if (COMMAND_REQUIRE (client, "song", value) < 0)
{
thread_mutex_unlock (&source->lock);
return;
return client_send_400 (client, "missing arg, song");
}
if (source->client && strcmp (client->connection.ip, source->client->connection.ip) != 0)
if (connection_check_admin_pass (client->parser) == 0)
@ -1134,45 +1091,38 @@ static void command_shoutcast_metadata(client_t *client, source_t *source)
source->format->set_tag (source->format, "title", value, NULL);
source->format->set_tag (source->format, NULL, NULL, NULL);
DEBUG2("Metadata on mountpoint %s changed to \"%s\"",
source->mount, value);
DEBUG2("Metadata on mountpoint %s changed to \"%s\"", source->mount, value);
thread_mutex_unlock (&source->lock);
html_success(client, "Metadata update successful");
return html_success(client, "Metadata update successful");
}
else
{
thread_mutex_unlock (&source->lock);
client_send_400 (client, "mountpoint will not accept URL updates");
}
return;
thread_mutex_unlock (&source->lock);
return client_send_400 (client, "mountpoint will not accept URL updates");
}
if (strcmp (action, "viewxml") == 0)
{
xmlDocPtr doc;
char *mount = strdup (source->mount);
DEBUG0("Got shoutcast viewxml request");
thread_mutex_unlock (&source->lock);
doc = stats_get_xml (STATS_ALL, mount);
admin_send_response (doc, client, XSLT, "viewxml.xsl");
xmlFreeDoc(doc);
free (mount);
return;
doc = stats_get_xml (STATS_ALL, source->mount);
return admin_send_response (doc, client, XSLT, "viewxml.xsl");
}
thread_mutex_unlock (&source->lock);
client_send_400 (client, "No such action");
return client_send_400 (client, "No such action");
}
static void command_stats_mount (client_t *client, source_t *source, int response)
static int command_stats_mount (client_t *client, source_t *source, int response)
{
thread_mutex_unlock (&source->lock);
command_stats (client, NULL);
return command_stats (client, NULL);
}
/* catch all function for admin requests. If file has xsl extension then
* transform it using the available stats, else send the XML tree of the
* stats
*/
static void command_stats (client_t *client, const char *filename)
static int command_stats (client_t *client, const char *filename)
{
admin_response_type response = RAW;
const char *show_mount = NULL;
@ -1185,12 +1135,11 @@ static void command_stats (client_t *client, const char *filename)
show_mount = httpp_get_query_param (client->parser, "mount");
doc = stats_get_xml (STATS_ALL, show_mount);
admin_send_response (doc, client, response, filename);
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, filename);
}
static void command_list_log (client_t *client, int response)
static int command_list_log (client_t *client, int response)
{
refbuf_t *content;
const char *logname = httpp_get_query_param (client->parser, "log");
@ -1198,10 +1147,7 @@ static void command_list_log (client_t *client, int response)
ice_config_t *config;
if (logname == NULL)
{
client_send_400 (client, "No log specified");
return;
}
return client_send_400 (client, "No log specified");
config = config_get_config ();
if (strcmp (logname, "errorlog") == 0)
@ -1215,8 +1161,7 @@ static void command_list_log (client_t *client, int response)
{
config_release_config();
WARN1 ("request to show unknown log \"%s\"", logname);
client_send_400 (client, "");
return;
return client_send_400 (client, "unknown");
}
content = refbuf_new (0);
log_contents (log, &content->data, &content->len);
@ -1224,17 +1169,16 @@ static void command_list_log (client_t *client, int response)
if (response == XSLT)
{
xmlNodePtr xmlnode, lognode;
xmlNodePtr xmlnode;
xmlDocPtr doc;
doc = xmlNewDoc(XMLSTR("1.0"));
xmlnode = xmlNewDocNode(doc, NULL, XMLSTR("icestats"), NULL);
xmlDocSetRootElement(doc, xmlnode);
lognode = xmlNewTextChild (xmlnode, NULL, XMLSTR("log"), XMLSTR(content->data));
xmlNewTextChild (xmlnode, NULL, XMLSTR("log"), XMLSTR(content->data));
refbuf_release (content);
admin_send_response (doc, client, XSLT, "showlog.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, XSLT, "showlog.xsl");
}
else
{
@ -1245,12 +1189,12 @@ static void command_list_log (client_t *client, int response)
http->next = content;
client->respcode = 200;
client_set_queue (client, http);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
}
void command_list_mounts(client_t *client, int response)
int command_list_mounts(client_t *client, int response)
{
DEBUG0("List mounts request");
@ -1269,7 +1213,7 @@ void command_list_mounts(client_t *client, int response)
client->refbuf->next = stats_get_streams (1);
else
client->refbuf->next = stats_get_streams (0);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
else
{
@ -1278,13 +1222,12 @@ void command_list_mounts(client_t *client, int response)
doc = admin_build_sourcelist(NULL);
avl_tree_unlock (global.source_tree);
admin_send_response(doc, client, response, "listmounts.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "listmounts.xsl");
}
}
static void command_updatemetadata(client_t *client, source_t *source,
int response)
static int command_updatemetadata(client_t *client, source_t *source, int response)
{
xmlDocPtr doc;
xmlNodePtr node, srcnode;
@ -1296,8 +1239,6 @@ static void command_updatemetadata(client_t *client, source_t *source,
xmlSetProp(srcnode, XMLSTR("mount"), XMLSTR(source->mount));
xmlDocSetRootElement(doc, node);
admin_send_response(doc, client, response,
"updatemetadata.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "updatemetadata.xsl");
}

View File

@ -26,11 +26,11 @@ typedef enum {
TEXT
} admin_response_type;
void command_list_mounts(client_t *client, int response);
int command_list_mounts (client_t *client, int response);
int admin_handle_request (client_t *client, const char *uri);
void admin_mount_request (client_t *client, const char *uri);
int admin_mount_request (client_t *client, const char *uri);
void admin_source_listeners (source_t *source, xmlNodePtr node);
void admin_send_response(xmlDocPtr doc, client_t *client,
int admin_send_response (xmlDocPtr doc, client_t *client,
admin_response_type response, const char *xslt_template);
#endif /* __ADMIN_H__ */

View File

@ -191,6 +191,7 @@ void auth_release (auth_t *authenticator)
authenticator->release (authenticator);
xmlFree (authenticator->type);
xmlFree (authenticator->realm);
xmlFree (authenticator->rejected_mount);
thread_mutex_unlock (&authenticator->lock);
thread_mutex_destroy (&authenticator->lock);
free (authenticator->mount);
@ -452,8 +453,7 @@ static int add_authenticated_listener (const char *mount, mount_proxy *mountinfo
{
/* If the file exists, then transform it, otherwise, write a 404 */
DEBUG0("Stats request, sending XSL transformed stats");
stats_transform_xslt (client, mount);
return 0;
return stats_transform_xslt (client, mount);
}
ret = source_add_listener (mount, mountinfo, client);
@ -468,8 +468,6 @@ static int add_authenticated_listener (const char *mount, mount_proxy *mountinfo
}
ret = fserve_client_create (client, mount);
}
if (ret == 0)
global_reduce_bitrate_sampling (global.out_bitrate);
return ret;
}
@ -534,8 +532,9 @@ void auth_postprocess_source (auth_client *auth_user)
/* Add a listener. Check for any mount information that states any
* authentication to be used.
*/
void auth_add_listener (const char *mount, client_t *client)
int auth_add_listener (const char *mount, client_t *client)
{
int ret = 0;
ice_config_t *config = config_get_config();
mount_proxy *mountinfo = config_find_mount (config, mount);
@ -554,8 +553,15 @@ void auth_add_listener (const char *mount, client_t *client)
if (mountinfo->no_mount)
{
config_release_config ();
client_send_403 (client, "mountpoint unavailable");
return;
return client_send_403 (client, "mountpoint unavailable");
}
if (mountinfo->redirect)
{
int len = strlen (mountinfo->redirect) + strlen (mount) + 3;
char *addr = alloca (len);
snprintf (addr, len, "%s%s", mountinfo->redirect, mount);
config_release_config ();
return client_send_302 (client, addr);
}
if (mountinfo->auth && mountinfo->auth->authenticate)
{
@ -565,10 +571,9 @@ void auth_add_listener (const char *mount, client_t *client)
{
config_release_config ();
WARN0 ("too many clients awaiting authentication");
client_send_403 (client, "busy, please try again later");
if (global.new_connections_slowdown < 10)
global.new_connections_slowdown++;
return;
return client_send_403 (client, "busy, please try again later");
}
auth_user = auth_client_setup (mount, client);
auth_user->process = auth_new_listener;
@ -576,7 +581,7 @@ void auth_add_listener (const char *mount, client_t *client)
DEBUG0 ("adding client for authentication");
queue_auth_client (auth_user, mountinfo);
config_release_config ();
return;
return 0;
}
}
else
@ -584,13 +589,13 @@ void auth_add_listener (const char *mount, client_t *client)
if (strcmp (mount, "/admin/streams") == 0)
{
config_release_config ();
client_send_401 (client, NULL);
return;
return client_send_401 (client, NULL);
}
}
}
add_authenticated_listener (mount, mountinfo, client);
ret = add_authenticated_listener (mount, mountinfo, client);
config_release_config ();
return ret;
}

View File

@ -104,7 +104,7 @@ typedef struct auth_tag
char *realm;
} auth_t;
void auth_add_listener (const char *mount, client_t *client);
int auth_add_listener (const char *mount, client_t *client);
int auth_release_listener (client_t *client, const char *mount, struct _mount_proxy *mountinfo);
int move_listener (client_t *client, struct _fbinfo *finfo);
int auth_check_source (client_t *client, const char *mount);

View File

@ -221,6 +221,7 @@ static int handle_returned_header (void *ptr, size_t size, size_t nmemb, void *s
if (strncasecmp (ptr, "Location: ", 10) == 0)
{
int len = strcspn ((char*)ptr+10, "\r\n");
free (atd->location);
atd->location = malloc (len+1);
snprintf (atd->location, len+1, "%s", (char *)ptr+10);
}
@ -391,7 +392,7 @@ static auth_result url_add_listener (auth_client *auth_user)
auth_t *auth = auth_user->auth;
auth_url *url = auth->state;
auth_thread_data *atd = auth_user->thread_data;
int res = 0, port;
int res = 0, port, ret = AUTH_FAILED;
const char *agent, *qargs;
char *user_agent, *username, *password;
char *mount, *ipaddr, *server;
@ -474,6 +475,8 @@ static auth_result url_add_listener (auth_client *auth_user)
curl_easy_setopt (atd->curl, CURLOPT_WRITEHEADER, auth_user);
curl_easy_setopt (atd->curl, CURLOPT_WRITEDATA, auth_user);
atd->errormsg[0] = '\0';
free (atd->location);
atd->location = NULL;
/* setup in case intro data is returned */
x = (void *)client->refbuf->data;
x->type = 0;
@ -486,32 +489,25 @@ static auth_result url_add_listener (auth_client *auth_user)
free (userpwd);
if (res)
{
url->reject_until = time (NULL) + 60; /* prevent further attempts for a while */
WARN2 ("auth to server %s failed with %s", url->addurl, atd->errormsg);
INFO0 ("will not auth new listeners for 60 seconds");
if (url->presume_innocent)
client->flags |= CLIENT_AUTHENTICATED;
return AUTH_FAILED;
}
if (atd->location)
{
client_send_302 (client, atd->location);
auth_user->client = NULL;
free (atd->location);
atd->location = NULL;
return AUTH_FAILED;
}
/* we received a response, lets see what it is */
if (client->flags & CLIENT_AUTHENTICATED)
{
if (client->flags & CLIENT_HAS_INTRO_CONTENT)
client->refbuf->next = x->head;
if (x->head == NULL)
client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
return AUTH_OK;
x->head = NULL;
ret = AUTH_OK;
}
if (res)
{
url->reject_until = time (NULL) + 60; /* prevent further attempts for a while */
WARN2 ("auth to server %s failed with %s", url->addurl, atd->errormsg);
INFO0 ("will not auth new listeners for 60 seconds");
if (url->presume_innocent)
{
client->flags |= CLIENT_AUTHENTICATED;
ret = AUTH_OK;
}
}
/* better cleanup memory */
while (x->head)
@ -523,14 +519,23 @@ static auth_result url_add_listener (auth_client *auth_user)
}
if (x->type)
mpeg_cleanup (&x->sync);
if (atoi (atd->errormsg) == 403)
if (atd->location)
{
client_send_302 (client, atd->location);
auth_user->client = NULL;
client_send_403 (client, atd->errormsg+4);
free (atd->location);
atd->location = NULL;
}
if (atd->errormsg[0])
else if (atd->errormsg[0])
{
if (atoi (atd->errormsg) == 403)
{
auth_user->client = NULL;
client_send_403 (client, atd->errormsg+4);
}
INFO2 ("client auth (%s) failed with \"%s\"", url->addurl, atd->errormsg);
return AUTH_FAILED;
}
return ret;
}
@ -697,6 +702,10 @@ static void *alloc_thread_data (auth_t *auth)
curl_easy_setopt (atd->curl, CURLOPT_PASSWDFUNCTION, my_getpass);
#endif
curl_easy_setopt (atd->curl, CURLOPT_ERRORBUFFER, &atd->errormsg[0]);
curl_easy_setopt (atd->curl, CURLOPT_FOLLOWLOCATION, 1);
#ifdef CURLOPT_POSTREDIR
curl_easy_setopt (atd->curl, CURLOPT_POSTREDIR, CURL_REDIR_POST_ALL);
#endif
INFO0 ("...handler data initialized");
return atd;
}

View File

@ -787,6 +787,7 @@ static int _parse_mount (xmlNodePtr node, void *arg)
{ "skip-accesslog", config_get_bool, &mount->skip_accesslog },
{ "charset", config_get_str, &mount->charset },
{ "qblock-size", config_get_int, &mount->queue_block_size },
{ "redirect", config_get_str, &mount->redirect },
{ "metadata-interval", config_get_int, &mount->mp3_meta_interval },
{ "mp3-metadata-interval",
config_get_int, &mount->mp3_meta_interval },
@ -826,7 +827,7 @@ static int _parse_mount (xmlNodePtr node, void *arg)
mount->min_queue_size = -1;
mount->mp3_meta_interval = -1;
mount->yp_public = -1;
mount->url_ogg_meta = 0;
mount->url_ogg_meta = 1;
mount->source_timeout = config->source_timeout;
mount->file_seekable = 1;
mount->access_log.logid = -1;
@ -1005,7 +1006,6 @@ static int _parse_master (xmlNodePtr node, void *arg)
{ NULL, NULL, NULL },
};
config->master_relay_auth = 1;
if (parse_xml_tags (node, icecast_tags))
return -1;
@ -1109,6 +1109,7 @@ static int _parse_root (xmlNodePtr node, ice_config_t *config)
{ NULL, NULL, NULL }
};
config->master_relay_auth = 1;
if (parse_xml_tags (node, icecast_tags))
return -1;

View File

@ -133,6 +133,7 @@ typedef struct _mount_proxy {
access_log access_log;
char *redirect;
char *stream_name;
char *stream_description;
char *stream_url;

View File

@ -48,29 +48,13 @@
int worker_count;
/* Return client_t ready for use. The provided socket can be SOCK_ERROR to
* allocate a dummy client_t. Must be called with global lock held.
*/
client_t *client_create (sock_t sock)
void client_register (client_t *client)
{
client_t *client = calloc (1, sizeof (client_t));
if (sock != SOCK_ERROR)
if (client && client->connection.sock)
{
refbuf_t *r;
if (connection_init (&client->connection, sock, NULL) < 0)
{
free (client);
return NULL;
}
r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
r->len = 0;
client->shared_data = r;
client->flags |= CLIENT_ACTIVE;
global.clients++;
}
global.clients++;
stats_event_args (NULL, "clients", "%d", global.clients);
return client;
}
@ -146,6 +130,8 @@ int client_read_bytes (client_t *client, void *buf, unsigned len)
int (*con_read)(struct connection_tag *handle, void *buf, size_t len) = connection_read;
int bytes;
if (len == 0)
return 0;
if (client->refbuf && client->pos < client->refbuf->len)
{
unsigned remaining = client->refbuf->len - client->pos;
@ -168,7 +154,7 @@ int client_read_bytes (client_t *client, void *buf, unsigned len)
}
void client_send_302(client_t *client, const char *location)
int client_send_302(client_t *client, const char *location)
{
client_set_queue (client, NULL);
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
@ -179,24 +165,25 @@ void client_send_302(client_t *client, const char *location)
"Moved <a href=\"%s\">here</a>\r\n", location, location);
client->respcode = 302;
client->refbuf->len = strlen (client->refbuf->data);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
void client_send_400(client_t *client, char *message) {
int client_send_400(client_t *client, const char *message)
{
client_set_queue (client, NULL);
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
"HTTP/1.0 400 Bad Request\r\n"
"Content-Type: text/html\r\n\r\n"
"<b>%s</b>\r\n", message);
"<b>%s</b>\r\n", message?message:"");
client->respcode = 400;
client->refbuf->len = strlen (client->refbuf->data);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
void client_send_401 (client_t *client, const char *realm)
int client_send_401 (client_t *client, const char *realm)
{
ice_config_t *config = config_get_config ();
@ -213,11 +200,11 @@ void client_send_401 (client_t *client, const char *realm)
config_release_config();
client->respcode = 401;
client->refbuf->len = strlen (client->refbuf->data);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
void client_send_403(client_t *client, const char *reason)
int client_send_403(client_t *client, const char *reason)
{
if (reason == NULL)
reason = "Forbidden";
@ -228,23 +215,26 @@ void client_send_403(client_t *client, const char *reason)
"Content-Type: text/html\r\n\r\n", reason);
client->respcode = 403;
client->refbuf->len = strlen (client->refbuf->data);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
void client_send_403redirect (client_t *client, const char *mount, const char *reason)
int client_send_403redirect (client_t *client, const char *mount, const char *reason)
{
if (redirect_client (mount, client))
return;
client_send_403 (client, reason);
return 0;
return client_send_403 (client, reason);
}
void client_send_404(client_t *client, const char *message)
int client_send_404 (client_t *client, const char *message)
{
int ret = -1;
if (client->worker == NULL) /* client is not on any worker now */
{
client_destroy (client);
return;
return 0;
}
client_set_queue (client, NULL);
if (client->respcode == 0)
@ -258,12 +248,13 @@ void client_send_404(client_t *client, const char *message)
"<b>%s</b>\r\n", message);
client->respcode = 404;
client->refbuf->len = strlen (client->refbuf->data);
fserve_setup_client (client, NULL);
ret = fserve_setup_client (client);
}
return ret;
}
void client_send_416(client_t *client)
int client_send_416(client_t *client)
{
client_set_queue (client, NULL);
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
@ -271,7 +262,7 @@ void client_send_416(client_t *client)
"HTTP/1.0 416 Request Range Not Satisfiable\r\n\r\n");
client->respcode = 416;
client->refbuf->len = strlen (client->refbuf->data);
fserve_setup_client (client, NULL);
return fserve_setup_client (client);
}
@ -398,15 +389,19 @@ static void worker_control_create (worker_t *worker)
ERROR0 ("pipe failed, descriptor limit?");
abort();
}
sock_set_blocking (worker->wakeup_fd[0], 0);
}
static void worker_add_pending_clients (worker_t *worker)
static client_t **worker_add_pending_clients (worker_t *worker)
{
if (worker && worker->pending_clients)
if (worker->pending_clients)
{
unsigned count;
client_t **p;
thread_spin_lock (&worker->lock);
p = worker->last_p;
*worker->last_p = worker->pending_clients;
worker->last_p = worker->pending_clients_tail;
worker->count += worker->pending_count;
@ -416,21 +411,27 @@ static void worker_add_pending_clients (worker_t *worker)
worker->pending_count = 0;
thread_spin_unlock (&worker->lock);
DEBUG2 ("Added %d pending clients to %p", count, worker);
if (worker->wakeup_ms > worker->time_ms+5)
return p; /* only these new ones scheduled so process from here */
}
worker->wakeup_ms = worker->time_ms + 60000;
return &worker->clients;
}
static void worker_wait (worker_t *worker)
static client_t **worker_wait (worker_t *worker)
{
int ret, duration = 3;
int ret, duration = 2;
if (global.running == ICE_RUNNING)
{
duration = (int)(worker->wakeup_ms - timing_get_time());
if (duration > 60000) /* make duration between 3ms and 60s */
uint64_t tm = timing_get_time();
if (worker->wakeup_ms > tm)
duration = (int)(worker->wakeup_ms - tm);
if (duration > 60000) /* make duration between 2ms and 60s */
duration = 60000;
if (duration < 3)
duration = 3;
if (duration < 2)
duration = 2;
}
ret = util_timed_wait_for_fd (worker->wakeup_fd[0], duration);
@ -452,12 +453,10 @@ static void worker_wait (worker_t *worker)
} while (1);
}
worker_add_pending_clients (worker);
worker->time_ms = timing_get_time();
worker->current_time.tv_sec = (time_t)(worker->time_ms/1000);
worker->current_time.tv_nsec = worker->current_time.tv_sec - (worker->time_ms*1000);
worker->wakeup_ms = worker->time_ms + 60000;
return worker_add_pending_clients (worker);
}
@ -505,13 +504,16 @@ void *worker (void *arg)
{
worker_t *worker = arg;
long prev_count = -1;
client_t **prevp = &worker->clients;
worker->running = 1;
worker->wakeup_ms = (int64_t)0;
worker->time_ms = timing_get_time();
while (1)
{
client_t *client = worker->clients, **prevp = &worker->clients;
client_t *client = *prevp;
uint64_t sched_ms = worker->time_ms+6;
while (client)
{
@ -522,9 +524,8 @@ void *worker (void *arg)
int ret = 0;
client_t *nx = client->next_on_worker;
if (worker->running == 0 || client->schedule_ms <= worker->time_ms+10)
if (worker->running == 0 || client->schedule_ms <= sched_ms)
{
client->schedule_ms = worker->time_ms;
ret = client->ops->process (client);
if (ret < 0)
{
@ -559,7 +560,7 @@ void *worker (void *arg)
if (worker->count == 0 && worker->pending_count == 0)
break;
}
worker_wait (worker);
prevp = worker_wait (worker);
}
worker_relocate_clients (worker);
INFO0 ("shutting down");

View File

@ -121,16 +121,16 @@ struct _client_tag
int respcode;
};
client_t *client_create (sock_t sock);
void client_register (client_t *client);
void client_destroy(client_t *client);
void client_send_504(client_t *client, char *message);
void client_send_416(client_t *client);
void client_send_404(client_t *client, const char *message);
void client_send_401(client_t *client, const char *realm);
void client_send_403(client_t *client, const char *reason);
void client_send_403redirect (client_t *client, const char *mount, const char *reason);
void client_send_400(client_t *client, char *message);
void client_send_302(client_t *client, const char *location);
int client_send_416(client_t *client);
int client_send_404(client_t *client, const char *message);
int client_send_401(client_t *client, const char *realm);
int client_send_403(client_t *client, const char *reason);
int client_send_403redirect (client_t *client, const char *mount, const char *reason);
int client_send_400(client_t *client, const char *message);
int client_send_302(client_t *client, const char *location);
int client_send_bytes (client_t *client, const void *buf, unsigned len);
int client_read_bytes (client_t *client, void *buf, unsigned len);
void client_set_queue (client_t *client, refbuf_t *refbuf);
@ -153,6 +153,6 @@ void worker_wakeup (worker_t *worker);
#define CLIENT_SKIP_ACCESSLOG (0100)
#define CLIENT_HAS_MOVED (0200)
#define CLIENT_IP_BAN_LIFT (0400)
#define CLIENT_FORMAT_BIT (01000)
#define CLIENT_FORMAT_BIT (010000)
#endif /* __CLIENT_H__ */

View File

@ -28,7 +28,10 @@
#include <fnmatch.h>
#endif
#ifndef _WIN32
#ifdef _MSC_VER
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
@ -348,11 +351,14 @@ static void add_generic_text (avl_tree *t, const char *ip, time_t now)
static void add_banned_ip (avl_tree *t, const char *ip, time_t now)
{
struct banned_entry *entry = calloc (1, sizeof (struct banned_entry));
snprintf (&entry->ip[0], sizeof (entry->ip), "%s", ip);
if (now)
entry->timeout = now;
avl_insert (t, entry);
if (t)
{
struct banned_entry *entry = calloc (1, sizeof (struct banned_entry));
snprintf (&entry->ip[0], sizeof (entry->ip), "%s", ip);
if (now)
entry->timeout = now;
avl_insert (t, entry);
}
}
void connection_add_banned_ip (const char *ip, int duration)
@ -508,41 +514,25 @@ int connection_init (connection_t *con, sock_t sock, const char *addr)
{
if (con)
{
struct sockaddr_storage sa;
socklen_t slen = sizeof (sa);
memset (con, 0, sizeof (connection_t));
con->sock = sock;
if (sock == SOCK_ERROR)
return -1;
con->id = _next_connection_id();
if (addr)
{
con->ip = strdup (addr);
return 0;
}
if (getpeername (sock, (struct sockaddr *)&sa, &slen) == 0)
{
char *ip;
#ifdef HAVE_GETNAMEINFO
char buffer [200] = "unknown";
getnameinfo ((struct sockaddr *)&sa, slen, buffer, 200, NULL, 0, NI_NUMERICHOST);
if (strncmp (buffer, "::ffff:", 7) == 0)
ip = strdup (buffer+7);
if (strncmp (addr, "::ffff:", 7) == 0)
ip = strdup (addr+7);
else
ip = strdup (buffer);
#else
int len = 30;
ip = malloc (len);
strncpy (ip, inet_ntoa (sa.sin_addr), len);
#endif
ip = strdup (addr);
if (accept_ip_address (ip))
{
con->ip = ip;
con->id = _next_connection_id();
return 0;
}
free (ip);
}
memset (con, 0, sizeof (connection_t));
con->sock = SOCK_ERROR;
}
return -1;
@ -604,20 +594,25 @@ static sock_t wait_for_serversock (void)
{
case SIGINT:
case SIGTERM:
DEBUG0 ("signalfd received a termination");
global.running = ICE_HALTING;
connection_running = 0;
DEBUG0 ("signalfd received a termination");
break;
case SIGHUP:
INFO0 ("HUP received, reread scheduled");
global.schedule_config_reread = 1;
connection_running = 0;
INFO0 ("HUP received, reread scheduled");
break;
default:
WARN1 ("unexpected signal (%d)", fdsi.ssi_signo);
}
}
}
if (ufds[i].revents & (POLLNVAL|POLLERR))
{
ERROR0 ("signalfd descriptor became invalid, doing thread restart");
slave_restart(); // something odd happened
}
#endif
for(i=0; i < global.server_sockets; i++) {
if(ufds[i].revents & POLLIN)
@ -681,13 +676,14 @@ static sock_t wait_for_serversock (void)
static client_t *accept_client (void)
{
client_t *client;
client_t *client = NULL;
sock_t sock, serversock = wait_for_serversock ();
char addr [200];
if (serversock == SOCK_ERROR)
return NULL;
sock = sock_accept (serversock, NULL, 0);
sock = sock_accept (serversock, addr, 200);
if (sock == SOCK_ERROR)
{
if (sock_recoverable (sock_error()))
@ -696,12 +692,26 @@ static client_t *accept_client (void)
thread_sleep (500000);
return NULL;
}
global_lock ();
client = client_create (sock);
if (client)
do
{
connection_t *con = &client->connection;
int i;
int i, num;
refbuf_t *r;
if (sock_set_blocking (sock, 0) || sock_set_nodelay (sock))
{
WARN0 ("failed to set tcp options on client connection, dropping");
break;
}
client = calloc (1, sizeof (client_t));
if (client == NULL || connection_init (&client->connection, sock, addr) < 0)
break;
client->shared_data = r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
r->len = 0; // for building up the request coming in
global_lock ();
client_register (client);
for (i=0; i < global.server_sockets; i++)
{
if (global.serversock[i] == serversock)
@ -709,7 +719,7 @@ static client_t *accept_client (void)
client->server_conn = global.server_conn[i];
client->server_conn->refcount++;
if (client->server_conn->ssl && ssl_ok)
connection_uses_ssl (con);
connection_uses_ssl (&client->connection);
if (client->server_conn->shoutcast_compat)
client->ops = &shoutcast_source_ops;
else
@ -717,16 +727,13 @@ static client_t *accept_client (void)
break;
}
}
num = global.clients;
global_unlock ();
if (sock_set_blocking (con->sock, 0) || sock_set_nodelay (con->sock))
{
WARN0 ("failed to set tcp options on client connection, dropping");
client_destroy (client);
client = NULL;
}
stats_event_args (NULL, "clients", "%d", num);
return client;
}
global_unlock ();
} while (0);
free (client);
sock_close (sock);
return NULL;
}
@ -836,8 +843,7 @@ static int http_client_request (client_t *client)
refbuf_release (refbuf);
client->shared_data = NULL;
client->check_buffer = format_generic_write_to_client;
fserve_setup_client_fb (client, &fb);
return 0;
return fserve_setup_client_fb (client, &fb);
}
/* find a blank line */
do
@ -908,9 +914,9 @@ static int http_client_request (client_t *client)
break;
default:
WARN0("unhandled request type from client");
client_send_400 (client, "unknown request");
return 0;
return client_send_400 (client, "unknown request");
}
client->counter = 0;
return client->ops->process(client);
}
/* invalid http request */
@ -918,7 +924,12 @@ static int http_client_request (client_t *client)
}
if (ret && client->connection.error == 0)
{
client->schedule_ms = client->worker->time_ms + 100;
/* scale up the retry time, very short initially, usual case */
uint64_t diff = client->worker->time_ms - client->counter;
diff >>= 1;
if (diff > 200)
diff = 200;
client->schedule_ms = client->worker->time_ms + 6 + diff;
return 0;
}
}
@ -978,10 +989,10 @@ static void *connection_thread (void *arg)
{
/* do a small delay here so the client has chance to send the request after
* getting a connect. */
client->schedule_ms = timing_get_time();
client->counter = client->schedule_ms = timing_get_time();
client->connection.con_time = client->schedule_ms/1000;
client->connection.discon_time = client->connection.con_time + header_timeout;
client->schedule_ms += 5;
client->schedule_ms += 6;
client_add_worker (client);
stats_event_inc (NULL, "connections");
}
@ -1002,6 +1013,7 @@ static void *connection_thread (void *arg)
memset (&allowed_ip, 0, sizeof (allowed_ip));
memset (&useragents, 0, sizeof (useragents));
global_unlock();
connection_close_sigfd ();
INFO0 ("connection thread finished");
@ -1027,7 +1039,6 @@ void connection_thread_shutdown ()
if (conn_tid)
{
connection_running = 0;
connection_close_sigfd ();
INFO0("shutting down connection thread");
thread_join (conn_tid);
conn_tid = NULL;
@ -1231,8 +1242,7 @@ static int _handle_source_request (client_t *client)
if (uri[0] != '/')
{
WARN0 ("source mountpoint not starting with /");
client_send_401 (client, NULL);
return 0;
return client_send_401 (client, NULL);
}
switch (auth_check_source (client, uri))
{
@ -1242,7 +1252,7 @@ static int _handle_source_request (client_t *client)
break;
default: /* failed */
INFO1("Source (%s) attempted to login with invalid or missing password", uri);
client_send_401 (client, NULL);
return client_send_401 (client, NULL);
}
return 0;
}
@ -1259,7 +1269,7 @@ static int _handle_stats_request (client_t *client)
if (strcmp (uri, "/admin/streams") == 0 && connection_check_relay_pass (client->parser))
stats_add_listener (client, STATS_SLAVE|STATS_GENERAL);
else
auth_add_listener (uri, client);
return auth_add_listener (uri, client);
}
return 0;
}
@ -1300,23 +1310,19 @@ static void check_for_filtering (ice_config_t *config, client_t *client, char *u
static int _handle_get_request (client_t *client)
{
int port;
char *serverhost = NULL;
int serverport = 0;
int serverport = 0, ret = 0;
aliases *alias;
ice_config_t *config;
char *uri = util_normalise_uri (httpp_getvar (client->parser, HTTPP_VAR_URI));
int client_limit_reached = 0;
if (uri == NULL)
{
client_send_400 (client, "invalid request URI");
return 0;
}
return client_send_400 (client, "invalid request URI");
DEBUG1 ("start with %s", uri);
config = config_get_config();
check_for_filtering (config, client, uri);
port = config->port;
if (client->server_conn)
{
serverhost = client->server_conn->bind_address;
@ -1354,19 +1360,18 @@ static int _handle_get_request (client_t *client)
stats_event_inc(NULL, "client_connections");
/* Dispatch all admin requests */
if (admin_handle_request (client, uri) == 0)
{
free (uri);
return 0;
}
/* drop non-admin GET requests here if clients limit reached */
if (client_limit_reached)
client_send_403 (client, "Too many clients connected");
if (strcmp (uri, "/admin.cgi") == 0 || strncmp("/admin/", uri, 7) == 0)
ret = admin_handle_request (client, uri);
else
auth_add_listener (uri, client);
{
/* drop non-admin GET requests here if clients limit reached */
if (client_limit_reached)
ret = client_send_403 (client, "Too many clients connected");
else
ret = auth_add_listener (uri, client);
}
free (uri);
return 0;
return ret;
}

View File

@ -62,12 +62,15 @@ void event_config_read (void)
else {
restart_logging (&new_config);
config_set_config (&new_config, &old_config);
config = config_get_config_unlocked();
config_release_config();
config = config_get_config();
yp_recheck_config (config);
fserve_recheck_mime_types (config);
stats_global (config);
workers_adjust (config->workers_count);
config_release_config();
connection_thread_shutdown();
slave_restart();
config_clear (&old_config);

View File

@ -30,25 +30,25 @@
#include <fnmatch.h>
#ifdef HAVE_FNMATCH__H
#include "fnmatch_.h"
#endif
#ifdef HAVE_FNMATCH__H
#include "fnmatch_.h"
#endif
/* ugly hack needed in some win32 build cases */
#ifdef WIN32
int __mb_cur_max;
unsigned short* _pctype;
#endif
#ifdef WIN32
int __mb_cur_max;
unsigned short* _pctype;
#endif
#ifdef HAVE_ALLOCA_H
#include <alloca.h>
#include <alloca.h>
#endif
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <stddef.h>
#include <stddef.h>
#ifdef HAVE_STDBOOL_H
#include <stdbool.h>
#include <stdbool.h>
#endif
#include <stdlib.h>
#include <string.h>

View File

@ -67,6 +67,7 @@ format_type_t format_get_type(const char *contenttype)
return FORMAT_TYPE_GENERIC;
}
void format_plugin_clear (format_plugin_t *format, client_t *client)
{
if (format == NULL)
@ -106,7 +107,7 @@ int format_get_plugin (format_plugin_t *plugin, client_t *client)
}
int format_file_read (client_t *client, format_plugin_t *plugin, FILE *intro)
int format_file_read (client_t *client, format_plugin_t *plugin, FILE *fp)
{
refbuf_t *refbuf = client->refbuf;
size_t bytes = -1;
@ -116,7 +117,7 @@ int format_file_read (client_t *client, format_plugin_t *plugin, FILE *intro)
{
if (refbuf == NULL)
{
if (intro == NULL)
if (fp == NULL)
return -2;
refbuf = client->refbuf = refbuf_new (4096);
client->pos = refbuf->len;
@ -125,7 +126,7 @@ int format_file_read (client_t *client, format_plugin_t *plugin, FILE *intro)
}
if (client->pos < refbuf->len)
break;
if (client->flags & CLIENT_HAS_INTRO_CONTENT)
if (fp == NULL || client->flags & CLIENT_HAS_INTRO_CONTENT)
{
if (refbuf->next)
{
@ -144,8 +145,8 @@ int format_file_read (client_t *client, format_plugin_t *plugin, FILE *intro)
continue;
}
if (fseek (intro, client->intro_offset, SEEK_SET) < 0 ||
(bytes = fread (refbuf->data, 1, 4096, intro)) <= 0)
if (fseek (fp, client->intro_offset, SEEK_SET) < 0 ||
(bytes = fread (refbuf->data, 1, 4096, fp)) <= 0)
{
return bytes < 0 ? -2 : -1;
}
@ -172,8 +173,6 @@ int format_generic_write_to_client (client_t *client)
const char *buf = refbuf->data + client->pos;
unsigned int len = refbuf->len - client->pos;
if (len > 5000) /* make sure we don't send huge amounts in one go */
len = 4096;
ret = client_send_bytes (client, buf, len);
if (ret > 0)
@ -205,8 +204,13 @@ int format_general_headers (format_plugin_t *plugin, client_t *client)
protocol = "ICY";
if (strstr (useragent, "Shoutcast Server")) /* hack for sc_serv */
contenttypehdr = "content-type";
if (strstr (useragent, "BlackBerry"))
contenttype="audio/aac";
// if (strstr (useragent, "Sonos"))
// contenttypehdr = "content-type";
if (plugin->type == FORMAT_TYPE_AAC)
{
if (strstr (useragent, "BlackBerry"))
contenttype="audio/aac";
}
}
bytes = snprintf (ptr, remaining, "%s 200 OK\r\n"
"%s: %s\r\n", protocol, contenttypehdr, contenttype);

View File

@ -227,6 +227,8 @@ static void format_mp3_apply_settings (format_plugin_t *format, mount_proxy *mou
{
mp3_state *source_mp3 = format->_state;
if (source_mp3 == NULL)
return;
source_mp3->interval = -1;
free (format->charset);
format->charset = NULL;
@ -349,14 +351,17 @@ static void mp3_set_title (source_t *source)
}
if (source_mp3->url_artist && source_mp3->url_title)
{
stats_event (source->mount, "title", source_mp3->url_title);
r = snprintf (p->data, size, "%c%s%s - %s", len_byte, streamtitle,
source_mp3->url_artist, source_mp3->url_title);
flv_meta_append_string (flvmeta, "artist", source_mp3->url_artist);
}
else
{
r = snprintf (p->data, size, "%c%s%s", len_byte, streamtitle, source_mp3->url_title);
stats_event (source->mount, "title", p->data+14);
}
logging_playlist (source->mount, p->data+14, source->listeners);
stats_event (source->mount, "title", p->data+14);
strcat (p->data+14, "';");
flv_meta_append_string (flvmeta, "title", source_mp3->url_title);
if (r > 0)
@ -381,7 +386,7 @@ static void mp3_set_title (source_t *source)
flv_meta_append_string (flvmeta, NULL, NULL);
refbuf_release (source_mp3->metadata);
source_mp3->metadata = p;
stats_event_time (source->mount, "metadata_updated");
stats_event_time (source->mount, "metadata_updated", STATS_GENERAL);
}
}
@ -390,11 +395,11 @@ static void mp3_set_title (source_t *source)
* which is 0 or greater. Check the client in_metadata value afterwards
* to see if all metadata has been sent
*/
static int send_stream_metadata (client_t *client, refbuf_t *refbuf, unsigned int remaining)
static int send_stream_metadata (client_t *client, refbuf_t *refbuf)
{
int ret = 0;
char *metadata;
int meta_len;
char *metadata, *merge;
int meta_len, block_len;
refbuf_t *associated = refbuf->associated;
mp3_client_data *client_mp3 = client->format_data;
@ -443,61 +448,32 @@ static int send_stream_metadata (client_t *client, refbuf_t *refbuf, unsigned in
}
/* if there is normal stream data to send as well as metadata then try
* to merge them into 1 write call */
if (remaining)
block_len = refbuf->len - client->pos;
merge = alloca (block_len + meta_len);
memcpy (merge, metadata, meta_len);
memcpy (merge + meta_len, refbuf->data + client->pos, block_len);
ret = client_send_bytes (client, merge, meta_len+block_len);
if (ret >= meta_len)
{
char *merge = alloca (remaining + meta_len);
memcpy (merge, refbuf->data + client->pos, remaining);
memcpy (merge+remaining, metadata, meta_len);
ret = client_send_bytes (client, merge, remaining+meta_len);
if (ret > (int)remaining)
{
/* did we write all of it? */
if (ret < (int)remaining + meta_len)
{
client_mp3->metadata_offset = (ret - remaining);
client->flags |= CLIENT_IN_METADATA;
client->schedule_ms += 200;
client_mp3->since_meta_block += remaining;
}
else
{
client->flags &= ~CLIENT_IN_METADATA;
client_mp3->metadata_offset = 0;
client_mp3->since_meta_block = 0;
client->pos += remaining;
}
client->queue_pos += remaining;
return ret;
}
/* although we are not actually in metadata yet, we know we can do another merge next time */
client->flags |= CLIENT_IN_METADATA;
client->schedule_ms += 200;
if (ret > 0)
{
client_mp3->since_meta_block += ret;
client->pos += ret;
client->queue_pos += ret;
}
return ret > 0 ? ret : 0;
}
ret = client_send_bytes (client, metadata, meta_len);
if (ret == meta_len)
{
client_mp3->metadata_offset = 0;
client->queue_pos += (ret - meta_len);
client->counter += (ret - meta_len);
client_mp3->since_meta_block = (ret - meta_len);
client->pos += (ret - meta_len);
client->flags &= ~CLIENT_IN_METADATA;
client_mp3->since_meta_block = 0;
client->pos += remaining;
return ret;
client_mp3->metadata_offset = 0;
}
if (ret > 0)
client_mp3->metadata_offset += ret;
client->schedule_ms += 200;
client->flags |= CLIENT_IN_METADATA;
return ret > 0 ? ret : 0;
else
{
client->flags |= CLIENT_IN_METADATA;
if (ret > 0)
client_mp3->metadata_offset += ret;
client->schedule_ms += 150;
}
return ret;
}
@ -506,56 +482,37 @@ static int send_stream_metadata (client_t *client, refbuf_t *refbuf, unsigned in
*/
static int format_mp3_write_buf_to_client (client_t *client)
{
int ret, written = 0;
int ret = -1, len;
mp3_client_data *client_mp3 = client->format_data;
refbuf_t *refbuf = client->refbuf;
char *buf = refbuf->data + client->pos;
unsigned int len = refbuf->len - client->pos;
do
if (client_mp3->interval && client_mp3->interval == client_mp3->since_meta_block)
return send_stream_metadata (client, refbuf);
len = refbuf->len - client->pos;
if (client_mp3->interval && len > client_mp3->interval - client_mp3->since_meta_block)
len = client_mp3->interval - client_mp3->since_meta_block;
if (len > 2900)
len = 2900; // do not send a huge amount out in one go
if (len)
{
/* see if we need to send the current metadata to the client */
if (client_mp3->interval)
char *buf = refbuf->data + client->pos;
ret = client_send_bytes (client, buf, len);
if (ret < len)
client->schedule_ms += 50;
if (ret > 0)
{
size_t remaining = client_mp3->interval -
client_mp3->since_meta_block;
/* leading up to sending the metadata block */
if ((client->flags & CLIENT_IN_METADATA) || remaining <= len)
{
ret = send_stream_metadata (client, refbuf, remaining);
if (client->flags & CLIENT_IN_METADATA)
break;
buf = refbuf->data + client->pos;
len = refbuf->len - client->pos;
if (ret <= (int)remaining || len > client_mp3->interval)
break;
written += ret;
}
client_mp3->since_meta_block += ret;
client->pos += ret;
client->queue_pos += ret;
client->counter += ret;
}
/* write any mp3, maybe after the metadata block */
if (len)
{
ret = client_send_bytes (client, buf, len);
if (ret > 0)
{
client_mp3->since_meta_block += ret;
client->pos += ret;
client->queue_pos += ret;
}
if (ret < (int)len)
break;
written += ret;
}
ret = 0;
} while (0);
if (ret < 0)
client->schedule_ms += (written ? 25 : 50);
if (ret > 0)
written += ret;
return written == 0 ? -1 : written;
}
client->schedule_ms += 4;
return ret;
}
@ -606,17 +563,22 @@ static int complete_read (source_t *source)
source_mp3->read_data = refbuf_new (source_mp3->queue_block_size);
source_mp3->read_count = 0;
}
if (source_mp3->update_metadata)
{
mp3_set_title (source);
source_mp3->update_metadata = 0;
}
if (source_mp3->read_count < source_mp3->read_data->len)
{
char *buf = source_mp3->read_data->data + source_mp3->read_count;
int read_in = source_mp3->read_data->len - source_mp3->read_count;
int bytes = client_read_bytes (client, buf, read_in);
if (bytes < 0)
return 0;
//DEBUG2 ("read in %d of %d bytes", bytes, read_in);
rate_add (format->in_bitrate, bytes, client->worker->current_time.tv_sec);
source_mp3->read_count += bytes;
format->read_bytes += bytes;
if (bytes > 0)
{
rate_add (format->in_bitrate, bytes, client->worker->current_time.tv_sec);
source_mp3->read_count += bytes;
format->read_bytes += bytes;
}
}
if (source_mp3->read_count < source_mp3->read_data->len)
return 0;
@ -631,7 +593,15 @@ int mpeg_process_buffer (client_t *client, format_plugin_t *plugin)
int unprocessed = -1;
if (refbuf)
{
unprocessed = mpeg_complete_frames (&source_mp3->file_sync, refbuf, 0);
if (source_mp3->metadata && refbuf->associated != source_mp3->metadata)
{
refbuf_release (refbuf->associated);
refbuf->associated = source_mp3->metadata;
refbuf_addref (source_mp3->metadata);
}
}
return unprocessed;
}
@ -708,11 +678,6 @@ static refbuf_t *mp3_get_no_meta (source_t *source)
return NULL;
}
source->client->queue_pos += refbuf->len;
if (source_mp3->update_metadata)
{
mp3_set_title (source);
source_mp3->update_metadata = 0;
}
refbuf->associated = source_mp3->metadata;
refbuf_addref (source_mp3->metadata);
refbuf->flags |= SOURCE_BLOCK_SYNC;
@ -740,11 +705,6 @@ static refbuf_t *mp3_get_filter_meta (source_t *source)
source_mp3->read_data = NULL;
src = (unsigned char *)refbuf->data;
if (source_mp3->update_metadata)
{
mp3_set_title (source);
source_mp3->update_metadata = 0;
}
/* fill the buffer with the read data */
bytes = source_mp3->read_count;
refbuf->len = 0;

View File

@ -206,7 +206,7 @@ static void apply_ogg_settings (format_plugin_t *format, mount_proxy *mount)
{
ogg_state_t *ogg_info = format->_state;
if (mount == NULL || format == NULL)
if (mount == NULL || format == NULL || ogg_info == NULL)
return;
if (mount->filter_theora)
ogg_info->filter_theora = 1;
@ -334,7 +334,7 @@ static void update_comments (source_t *source)
}
stats_event (source->mount, "artist", artist);
stats_event (source->mount, "title", title);
stats_event_time (source->mount, "metadata_updated");
stats_event_time (source->mount, "metadata_updated", STATS_GENERAL);
codec = ogg_info->codecs;
while (codec)
@ -581,6 +581,7 @@ static int write_buf_to_client (client_t *client)
client->pos += ret;
client->queue_pos += ret;
written += ret;
client->counter += ret;
}
if (ret < (int)len)

View File

@ -440,7 +440,7 @@ static void vorbis_set_tag (format_plugin_t *plugin, const char *tag, const char
}
value = util_conv_string (in_value, charset, "UTF-8");
if (value == NULL)
if (value == NULL && in_value)
value = strdup (in_value);
if (strcmp (tag, "artist") == 0)

View File

@ -54,6 +54,7 @@
#include "util.h"
#include "admin.h"
#include "compat.h"
#include "slave.h"
#include "fserve.h"
#include "format_mp3.h"
@ -103,7 +104,7 @@ void fserve_initialize(void)
fserve_recheck_mime_types (config);
config_release_config();
stats_event (NULL, "file_connections", "0");
stats_event_flags (NULL, "file_connections", "0", STATS_COUNTERS);
fserve_running = 1;
INFO0("file serving started");
}
@ -358,7 +359,6 @@ static int fill_http_headers (client_t *client, const char *path, struct stat *f
/* Date: is required on all HTTP1.1 responses */
char currenttime[50];
time_t now;
int strflen;
struct tm result;
off_t endpos;
fh_node * fh = client->shared_data;
@ -374,7 +374,7 @@ static int fill_http_headers (client_t *client, const char *path, struct stat *f
endpos = 0;
now = client->worker->current_time.tv_sec;
strflen = strftime(currenttime, 50, "%a, %d-%b-%Y %X GMT",
strftime(currenttime, 50, "%a, %d-%b-%Y %X GMT",
gmtime_r (&now, &result));
client->respcode = 206;
type = fserve_content_type (path);
@ -434,6 +434,7 @@ int fserve_client_create (client_t *httpclient, const char *path)
char *fullpath;
int m3u_requested = 0, m3u_file_available = 1;
int xspf_requested = 0, xspf_file_available = 1;
int ret = -1;
ice_config_t *config;
fh_node *fh;
fbinfo finfo;
@ -453,10 +454,14 @@ int fserve_client_create (client_t *httpclient, const char *path)
/* the m3u can be generated, but send an m3u file if available */
if (m3u_requested == 0 && xspf_requested == 0)
{
WARN2 ("req for file \"%s\" %s", fullpath, strerror (errno));
client_send_404 (httpclient, "The file you requested could not be found");
if (redirect_client (path, httpclient) == 0)
{
if ((httpclient->flags & CLIENT_SKIP_ACCESSLOG) == 0)
WARN2 ("req for file \"%s\" %s", fullpath, strerror (errno));
ret = client_send_404 (httpclient, "The file you requested could not be found");
}
free (fullpath);
return -1;
return ret;
}
m3u_file_available = 0;
xspf_file_available = 0;
@ -466,7 +471,9 @@ int fserve_client_create (client_t *httpclient, const char *path)
if (m3u_requested && m3u_file_available == 0)
{
const char *host = httpp_getvar (httpclient->parser, "host");
const char *host = httpp_getvar (httpclient->parser, "host"),
*args = httpp_getvar (httpclient->parser, HTTPP_VAR_QUERYARGS),
*at = "", *user = "", *pass ="";
char *sourceuri = strdup (path);
char *dot = strrchr (sourceuri, '.');
char *protocol = "http";
@ -484,6 +491,12 @@ int fserve_client_create (client_t *httpclient, const char *path)
host = NULL;
*dot = 0;
if (httpclient->username && httpclient->password)
{
at = "@";
user = httpclient->username;
pass = httpclient->password;
}
httpclient->respcode = 200;
if (host == NULL)
{
@ -491,11 +504,12 @@ int fserve_client_create (client_t *httpclient, const char *path)
snprintf (httpclient->refbuf->data, BUFSIZE,
"HTTP/1.0 200 OK\r\n"
"Content-Type: audio/x-mpegurl\r\n\r\n"
"%s://%s:%d%s\r\n",
"%s://%s%s%s%s%s:%d%s%s\r\n",
protocol,
user, at[0]?":":"", pass, at,
config->hostname, config->port,
sourceuri
);
sourceuri,
args?args:"");
config_release_config();
}
else
@ -503,17 +517,17 @@ int fserve_client_create (client_t *httpclient, const char *path)
snprintf (httpclient->refbuf->data, BUFSIZE,
"HTTP/1.0 200 OK\r\n"
"Content-Type: audio/x-mpegurl\r\n\r\n"
"%s://%s%s\r\n",
"%s://%s%s%s%s%s%s%s\r\n",
protocol,
user, at[0]?":":"", pass, at,
host,
sourceuri
);
sourceuri,
args?args:"");
}
httpclient->refbuf->len = strlen (httpclient->refbuf->data);
fserve_setup_client_fb (httpclient, NULL);
free (sourceuri);
free (fullpath);
return 0;
return fserve_setup_client_fb (httpclient, NULL);
}
if (xspf_requested && xspf_file_available == 0)
{
@ -524,29 +538,25 @@ int fserve_client_create (client_t *httpclient, const char *path)
*eol = '\0';
doc = stats_get_xml (0, reference);
free (reference);
admin_send_response (doc, httpclient, XSLT, "xspf.xsl");
xmlFreeDoc(doc);
return 0;
return admin_send_response (doc, httpclient, XSLT, "xspf.xsl");
}
/* on demand file serving check */
config = config_get_config();
if (config->fileserve == 0)
{
DEBUG1 ("on demand file \"%s\" refused", fullpath);
client_send_404 (httpclient, "The file you requested could not be found");
config_release_config();
DEBUG1 ("on demand file \"%s\" refused", fullpath);
free (fullpath);
return -1;
return client_send_404 (httpclient, "The file you requested could not be found");
}
config_release_config();
if (S_ISREG (file_buf.st_mode) == 0)
{
client_send_404 (httpclient, "The file you requested could not be found");
WARN1 ("found requested file but there is no handler for it: %s", fullpath);
free (fullpath);
return -1;
return client_send_404 (httpclient, "The file you requested could not be found");
}
finfo.flags = 0;
@ -558,9 +568,8 @@ int fserve_client_create (client_t *httpclient, const char *path)
if (fh == NULL)
{
WARN1 ("Problem accessing file \"%s\"", fullpath);
client_send_404 (httpclient, "File not readable");
free (fullpath);
return -1;
return client_send_404 (httpclient, "File not readable");
}
free (fullpath);
@ -569,16 +578,15 @@ int fserve_client_create (client_t *httpclient, const char *path)
if (fill_http_headers (httpclient, path, &file_buf) < 0)
{
thread_mutex_unlock (&fh->lock);
client_send_416 (httpclient);
return -1;
return client_send_416 (httpclient);
}
stats_event_inc (NULL, "file_connections");
thread_mutex_unlock (&fh->lock);
fserve_setup_client_fb (httpclient, NULL);
return 0;
stats_event_inc (NULL, "file_connections");
return fserve_setup_client_fb (httpclient, NULL);
}
// fh must be locked before calling this
static void fh_release (fh_node *fh)
{
@ -604,6 +612,20 @@ static void fh_release (fh_node *fh)
}
static void _free_fserve_buffers (client_t *client)
{
refbuf_t *buf = client->refbuf;
while (buf)
{
refbuf_t *old = buf;
buf = old->next;
old->next = NULL;
refbuf_release (old);
}
client->refbuf = NULL;
}
static void file_release (client_t *client)
{
fh_node *fh = client->shared_data;
@ -619,6 +641,7 @@ static void file_release (client_t *client)
stats_event (fh->finfo.mount, NULL, NULL);
fh_release (fh);
}
_free_fserve_buffers (client);
if (client->flags & CLIENT_AUTHENTICATED)
{
const char *mount = httpp_getvar (client->parser, HTTPP_VAR_URI);
@ -657,8 +680,7 @@ static void fserve_move_listener (client_t *client)
fh_node *fh = client->shared_data;
fbinfo f;
refbuf_release (client->refbuf);
client->refbuf = NULL;
_free_fserve_buffers (client);
client->shared_data = NULL;
thread_mutex_lock (&fh->lock);
remove_from_fh (fh, client);
@ -708,15 +730,14 @@ static int prefile_send (client_t *client)
refbuf_release (client->refbuf);
client->refbuf = refbuf_new(len);
client->pos = len;
return 0;
return client->ops->process (client);
}
}
if (client->respcode)
return -1;
client_send_404 (client, NULL);
thread_mutex_lock (&fh->lock);
fh_release (fh);
return 0;
return client_send_404 (client, NULL);
}
else
{
@ -741,7 +762,6 @@ static int prefile_send (client_t *client)
if (written > 30000)
break;
}
client->schedule_ms = worker->time_ms + 100;
return 0;
}
@ -783,7 +803,7 @@ static int file_send (client_t *client)
* this, eg admin requests */
if (throttle_sends > 1 && now - client->connection.con_time > 1)
{
client->schedule_ms += 500;
client->schedule_ms += 300;
loop = 1;
}
while (loop && written < 30000)
@ -840,14 +860,18 @@ static int throttled_file_send (client_t *client)
limit = (unsigned long)(limit * 1.05);
if (secs)
rate = (client->counter+1400)/secs;
// DEBUG3 ("counter %lld, duration %ld, limit %u", client->counter, secs, rate);
thread_mutex_lock (&fh->lock);
if (rate > limit)
if (rate > limit || secs < 3)
{
client->schedule_ms += 1000/(limit/1400);
rate_add (fh->format->out_bitrate, 0, worker->time_ms);
thread_mutex_unlock (&fh->lock);
global_add_bitrates (global.out_bitrate, 0, worker->time_ms);
return 0;
if (secs > 2)
{
thread_mutex_unlock (&fh->lock);
global_add_bitrates (global.out_bitrate, 0, worker->time_ms);
return 0;
}
}
if (fh->stats_update <= now)
{
@ -879,10 +903,10 @@ static int throttled_file_send (client_t *client)
bytes = client->check_buffer (client);
if (bytes < 0)
bytes = 0;
//DEBUG3 ("bytes %d, counter %ld, %ld", bytes, client->counter, client->worker->time_ms - (client->timer_start*1000));
rate_add (fh->format->out_bitrate, bytes, worker->time_ms);
thread_mutex_unlock (&fh->lock);
global_add_bitrates (global.out_bitrate, bytes, worker->time_ms);
client->counter += bytes;
client->schedule_ms += (1000/(limit/1400*2));
/* progessive slowdown if max bandwidth is exceeded. */
@ -899,17 +923,17 @@ struct _client_functions throttled_file_content_ops =
};
/* return 0 for success, 1 for fallback invalid */
/* return 0 for success, -1 for fallback invalid */
int fserve_setup_client_fb (client_t *client, fbinfo *finfo)
{
if (finfo)
{
fh_node *fh;
if (finfo->flags & FS_FALLBACK && finfo->limit == 0)
return 1;
return -1;
fh = open_fh (finfo, client);
if (fh == NULL)
return 1;
return -1;
client->shared_data = fh;
if (fh->finfo.limit)
@ -934,7 +958,10 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo)
client->ops = &buffer_content_ops;
client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
if (client->flags & CLIENT_ACTIVE)
{
client->schedule_ms = client->worker->time_ms;
return client->ops->process (client);
}
else
{
worker_t *worker = client->worker;
@ -945,10 +972,10 @@ int fserve_setup_client_fb (client_t *client, fbinfo *finfo)
}
void fserve_setup_client (client_t *client, const char *mount)
int fserve_setup_client (client_t *client)
{
client->check_buffer = format_generic_write_to_client;
fserve_setup_client_fb (client, NULL);
return fserve_setup_client_fb (client, NULL);
}
@ -1058,9 +1085,9 @@ void fserve_recheck_mime_types (ice_config_t *config)
}
void fserve_kill_client (client_t *client, const char *mount, int response)
int fserve_kill_client (client_t *client, const char *mount, int response)
{
int c = 2, id;
int loop = 2, id;
fbinfo finfo;
xmlDocPtr doc;
xmlNodePtr node;
@ -1074,10 +1101,8 @@ void fserve_kill_client (client_t *client, const char *mount, int response)
idtext = httpp_get_query_param (client->parser, "id");
if (idtext == NULL)
{
client_send_400 (client, "missing parameter id");
return;
}
return client_send_400 (client, "missing parameter id");
id = atoi(idtext);
doc = xmlNewDoc(XMLSTR("1.0"));
@ -1085,7 +1110,7 @@ void fserve_kill_client (client_t *client, const char *mount, int response)
xmlDocSetRootElement(doc, node);
snprintf (buf, sizeof(buf), "Client %d not found", id);
while (c)
while (1)
{
fh_node *fh = open_fh (&finfo, NULL);
if (fh)
@ -1100,20 +1125,20 @@ void fserve_kill_client (client_t *client, const char *mount, int response)
listener->connection.error = 1;
snprintf (buf, sizeof(buf), "Client %d removed", id);
v = "1";
loop = 0;
break;
}
node = avl_get_next (node);
}
fh_release (fh);
break;
}
c--;
finfo.flags = FS_FALLBACK;
if (loop == 0) break;
loop--;
if (loop == 1) finfo.flags = FS_FALLBACK;
}
xmlNewChild (node, NULL, XMLSTR("message"), XMLSTR(buf));
xmlNewChild (node, NULL, XMLSTR("return"), XMLSTR(v));
admin_send_response (doc, client, response, "response.xsl");
xmlFreeDoc(doc);
return admin_send_response (doc, client, response, "response.xsl");
}
@ -1164,7 +1189,7 @@ int fserve_list_clients_xml (xmlNodePtr srcnode, fbinfo *finfo)
}
void fserve_list_clients (client_t *client, const char *mount, int response, int show_listeners)
int fserve_list_clients (client_t *client, const char *mount, int response, int show_listeners)
{
int ret;
fbinfo finfo;
@ -1193,9 +1218,7 @@ void fserve_list_clients (client_t *client, const char *mount, int response, int
char buf[20];
snprintf (buf, sizeof(buf), "%u", ret);
xmlNewChild (srcnode, NULL, XMLSTR("listeners"), XMLSTR(buf));
admin_send_response (doc, client, response, "listclients.xsl");
return admin_send_response (doc, client, response, "listclients.xsl");
}
else
client_send_400 (client, "mount does not exist");
xmlFreeDoc(doc);
return client_send_400 (client, "mount does not exist");
}

View File

@ -36,12 +36,12 @@ int fserve_client_create(client_t *httpclient, const char *path);
char *fserve_content_type (const char *path);
void fserve_recheck_mime_types (ice_config_t *config);
void fserve_setup_client (client_t *client, const char *mount);
int fserve_setup_client (client_t *client);
int fserve_setup_client_fb (client_t *client, fbinfo *finfo);
void fserve_set_override (const char *mount, const char *dest);
void fserve_list_clients (client_t *client, const char *mount, int response, int show_listeners);
int fserve_list_clients (client_t *client, const char *mount, int response, int show_listeners);
int fserve_list_clients_xml (xmlNodePtr srcnode, fbinfo *finfo);
void fserve_kill_client (client_t *client, const char *mount, int response);
int fserve_kill_client (client_t *client, const char *mount, int response);
extern int fserve_running;

View File

@ -22,6 +22,7 @@
#ifdef WIN32
#define _WIN32_WINNT 0x0400
/* For getpid() */
//#include <winsock2.h>
#include <process.h>
#include <windows.h>
#endif
@ -316,7 +317,7 @@ int main(int argc, char **argv)
** only, so that we can read a configfile
*/
res = _parse_config_opts(argc, argv, filename, 512);
if (res == 1) {
if (res == 1) {
#if !defined(_WIN32) || defined(_CONSOLE)
/* startup all the modules */
_initialize_subsystems();

View File

@ -198,13 +198,13 @@ void slave_initialize(void)
#endif
_slave_thread ();
slave_running = 0;
yp_stop ();
workers_adjust(0);
}
void slave_shutdown(void)
{
yp_stop();
thread_rwlock_destroy (&slaves_lock);
thread_rwlock_destroy (&workers_lock);
thread_spin_destroy (&relay_start_lock);
@ -344,6 +344,7 @@ static int open_relay_connection (client_t *client, relay_server *relay, relay_s
char *esc_authorisation;
unsigned len = strlen(relay->username) + strlen(relay->password) + 2;
DEBUG2 ("using username %s for %s", relay->username, relay->localmount);
auth_header = malloc (len);
snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
esc_authorisation = util_base64_encode(auth_header);
@ -420,10 +421,13 @@ static int open_relay_connection (client_t *client, relay_server *relay, relay_s
{
ERROR2("Error from relay request: %s (%s)", relay->localmount,
httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
client->parser = NULL;
break;
}
sock_set_blocking (streamsock, 0);
thread_mutex_lock (&relay->source->lock);
client->parser = parser; // old parser will be free in the format clear
thread_mutex_unlock (&relay->source->lock);
client->connection.discon_time = 0;
client->connection.con_time = time (NULL);
client_set_queue (client, NULL);
@ -486,6 +490,8 @@ static void *start_relay_stream (void *arg)
sources = ++global.sources;
stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock();
/* set the start time, because we want to decrease the sources on all failures */
client->connection.con_time = time (NULL);
do
{
ice_config_t *config = config_get_config();
@ -520,15 +526,14 @@ static void *start_relay_stream (void *arg)
if (relay->on_demand)
src->flags &= ~SOURCE_ON_DEMAND;
else
{
yp_remove (relay->localmount);
src->yp_public = -1;
}
INFO2 ("listener count remaining on %s is %d", src->mount, src->listeners);
src->flags &= ~SOURCE_PAUSE_LISTENERS;
thread_mutex_unlock (&src->lock);
global_lock();
global.sources--;
stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock();
}
thread_spin_lock (&relay_start_lock);
@ -543,10 +548,11 @@ static void *start_relay_stream (void *arg)
static int relay_install (relay_server *relay)
{
client_t *client;
client_t *client = calloc (1, sizeof (client_t));
connection_init (&client->connection, SOCK_ERROR, NULL);
global_lock();
client = client_create (SOCK_ERROR);
client_register (client);
global_unlock();
client->shared_data = relay;
client->ops = &relay_startup_ops;
@ -733,6 +739,7 @@ static size_t streamlist_header (void *ptr, size_t size, size_t nmemb, void *str
else
WARN1 ("Failed response from master \"%s\"", (char*)ptr);
}
DEBUG1 ("header is %s", ptr);
return passed_len;
}
@ -850,9 +857,10 @@ static void *streamlist_thread (void *arg)
curl_easy_setopt (handle, CURLOPT_INTERFACE, master->bind);
master->ok = 0;
if (curl_easy_perform (handle) != 0)
if (curl_easy_perform (handle) != 0 || master->ok == 0)
{
/* fall back to traditional request */
INFO0 ("/admin/streams failed trying streamlist");
snprintf (url, sizeof (url), "%s://%s:%d/admin/streamlist.txt%s",
protocol, master->server, port, master->args);
curl_easy_setopt (handle, CURLOPT_URL, url);
@ -932,21 +940,34 @@ static void update_master_as_slave (ice_config_t *config)
static void slave_startup (void)
{
ice_config_t *config;
ice_config_t *config = config_get_config();
#ifdef HAVE_GETRLIMIT
struct rlimit rlimit;
if (getrlimit (RLIMIT_NOFILE, &rlimit) == 0)
{
INFO2 ("max file descriptors %ld (hard limit %ld)",
(long)rlimit.rlim_cur, (long)rlimit.rlim_max);
if (rlimit.rlim_cur < rlimit.rlim_max)
{
long old = rlimit.rlim_cur;
rlimit.rlim_cur = rlimit.rlim_max;
if (setrlimit (RLIMIT_NOFILE, &rlimit) < 0)
rlimit.rlim_cur = old;
}
WARN1 ("process has %ld max file descriptor limit", (long)rlimit.rlim_cur);
}
if (getrlimit (RLIMIT_CORE, &rlimit) == 0)
{
if (rlimit.rlim_cur < rlimit.rlim_max)
{
rlimit.rlim_cur = rlimit.rlim_max;
setrlimit (RLIMIT_CORE, &rlimit);
}
}
#endif
update_settings = 0;
update_all_mounts = 0;
config = config_get_config();
update_master_as_slave (config);
stats_global (config);
workers_adjust (config->workers_count);
@ -1203,11 +1224,13 @@ static int relay_read (client_t *client)
return 0;
}
DEBUG1 ("all listeners have now been checked on %s", relay->localmount);
client->parser = NULL;
free (source->fallback.mount);
source->fallback.mount = NULL;
source->flags &= ~(SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC);
if (relay->cleanup)
{
connection_close (&client->connection);
if (source->listeners)
{
INFO1 ("listeners on terminating relay %s, rechecking", relay->localmount);
@ -1219,9 +1242,8 @@ static int relay_read (client_t *client)
}
INFO1 ("shutting down relay %s", relay->localmount);
thread_mutex_unlock (&source->lock);
stats_event (relay->localmount, NULL, NULL); // needed???
stats_event (relay->localmount, NULL, NULL);
slave_update_all_mounts();
connection_close (&client->connection);
return -1;
}
do {
@ -1241,10 +1263,15 @@ static int relay_read (client_t *client)
}
}
else
{
INFO1 ("Relay %s is now disabled", relay->localmount);
client->schedule_ms = client->worker->time_ms + 3600000;
}
source->flags &= ~SOURCE_ON_DEMAND;
source_clear_source (source);
slave_update_all_mounts();
} while (0);
stats_event (relay->localmount, NULL, NULL);
slave_update_all_mounts();
thread_mutex_unlock (&source->lock);
connection_close (&client->connection);
@ -1270,7 +1297,7 @@ static int relay_startup (client_t *client)
relay_server *relay = get_relay_details (client);
worker_t *worker = client->worker;
if (relay->cleanup)
if (relay->cleanup || relay->running == 0)
{
source_t *source = relay->source;
if (source == NULL)
@ -1279,10 +1306,10 @@ static int relay_startup (client_t *client)
relay->running = 0;
DEBUG1 ("cleanup detected on %s", relay->localmount);
client->ops = &relay_client_ops;
client->schedule_ms += 25;
client->schedule_ms = worker->time_ms + 20;
return 0;
}
if (global.running != ICE_RUNNING || relay->running == 0) /* wait for cleanup */
if (global.running != ICE_RUNNING) /* wait for cleanup */
{
client->schedule_ms = client->worker->time_ms + 50;
return 0;
@ -1322,7 +1349,7 @@ static int relay_startup (client_t *client)
src->flags |= SOURCE_ON_DEMAND;
if (worker->current_time.tv_sec % 10 == 0)
{
mount_proxy * mountinfo = config_find_mount (config_get_config(), src->mount);
mount_proxy *mountinfo = config_find_mount (config_get_config(), src->mount);
if (mountinfo && mountinfo->fallback_mount)
{
source_t *fallback;
@ -1343,7 +1370,7 @@ static int relay_startup (client_t *client)
}
if (start_relay == 0)
{
client->schedule_ms += 60000;
client->schedule_ms = worker->time_ms + 60000;
return 0;
}
INFO1 ("Detected listeners on relay %s", relay->localmount);
@ -1354,7 +1381,7 @@ static int relay_startup (client_t *client)
if (relays_connecting > 3)
{
thread_spin_unlock (&relay_start_lock);
client->schedule_ms += 200;
client->schedule_ms = worker->time_ms + 200;
if (global.new_connections_slowdown < 5)
global.new_connections_slowdown++;
return 0;

View File

@ -112,7 +112,7 @@ struct _client_functions listener_pause_ops =
struct _client_functions listener_wait_ops =
{
wait_for_other_listeners,
NULL
client_destroy
};
struct _client_functions source_client_http_ops =
@ -316,6 +316,7 @@ static int _free_source (void *p)
WARN3("active listeners on mountpoint %s (%ld, %ld)", source->mount, source->listeners, source->termination_count);
avl_tree_free (source->clients, NULL);
thread_mutex_unlock (&source->lock);
thread_mutex_destroy (&source->lock);
INFO1 ("freeing source \"%s\"", source->mount);
@ -331,6 +332,7 @@ void source_free_source (source_t *source)
{
INFO1 ("source %s to be freed", source->mount);
avl_tree_wlock (global.source_tree);
thread_mutex_lock (&source->lock);
DEBUG1 ("removing source %s from tree", source->mount);
avl_delete (global.source_tree, source, _free_source);
avl_tree_unlock (global.source_tree);
@ -354,20 +356,21 @@ client_t *source_find_client(source_t *source, int id)
*/
static void update_source_stats (source_t *source)
{
unsigned long incoming_rate = (long)(8 * rate_avg (source->format->in_bitrate));
unsigned long incoming_rate = (long)rate_avg (source->format->in_bitrate);
unsigned long kbytes_sent = source->bytes_sent_since_update/1024;
unsigned long kbytes_read = source->bytes_read_since_update/1024;
source->format->sent_bytes += kbytes_sent*1024;
stats_event_args (source->mount, "outgoing_kbitrate", "%ld",
(long)(8 * rate_avg (source->format->out_bitrate))/1024);
stats_event_args (source->mount, "incoming_bitrate", "%ld", incoming_rate);
stats_event_args (source->mount, "incoming_bitrate", "%ld", (8 * incoming_rate));
stats_event_args (source->mount, "total_bytes_read",
"%"PRIu64, source->format->read_bytes);
stats_event_args (source->mount, "total_bytes_sent",
"%"PRIu64, source->format->sent_bytes);
stats_event_args (source->mount, "total_mbytes_sent",
"%"PRIu64, source->format->sent_bytes/(1024*1024));
stats_event_args (source->mount, "queue_size", "%u", source->queue_size);
if (source->client->connection.con_time)
{
worker_t *worker = source->client->worker;
@ -379,6 +382,7 @@ static void update_source_stats (source_t *source)
source->bytes_sent_since_update %= 1024;
source->bytes_read_since_update %= 1024;
source->listener_send_trigger = incoming_rate;
}
@ -390,7 +394,7 @@ int source_read (source_t *source)
{
client_t *client = source->client;
refbuf_t *refbuf = NULL;
int skip = 1, loop = 3;
int skip = 1, loop = 2;
time_t current = client->worker->current_time.tv_sec;
int fds = 0;
@ -441,9 +445,7 @@ int source_read (source_t *source)
}
if (source->limit_rate)
{
unsigned long incoming_rate = 8 * rate_avg (source->format->in_bitrate);
if (source->limit_rate < incoming_rate)
if (source->limit_rate < (8 * source->incoming_rate))
{
rate_add (source->format->in_bitrate, 0, current);
source->skip_duration += 300;
@ -464,6 +466,8 @@ int source_read (source_t *source)
}
if (fds == 0)
{
if (source->last_read + (time_t)3 == current)
WARN1 ("Nothing received on %s for 3 seconds", source->mount);
if (source->last_read + (time_t)source->timeout < current)
{
DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
@ -479,8 +483,8 @@ int source_read (source_t *source)
break;
}
source->skip_duration = (long)(source->skip_duration * 0.9);
if (source->skip_duration < 5) /* not too low or else it will not be able to increase */
source->skip_duration = 5;
if (source->skip_duration < 10) /* not too low or else it will not be able to increase */
source->skip_duration = 10;
source->last_read = current;
do
@ -598,6 +602,15 @@ static int source_client_read (client_t *client)
}
else
{
if (source->listeners)
{
INFO1 ("listeners on terminating source %s, rechecking", source->mount);
source->termination_count = source->listeners;
source->flags &= ~SOURCE_PAUSE_LISTENERS;
source->flags |= SOURCE_LISTENERS_SYNC;
thread_mutex_unlock (&source->lock);
return 0;
}
INFO1 ("no more listeners on %s", source->mount);
client->connection.discon_time = 0;
client->ops = &source_client_halt_ops;
@ -622,7 +635,7 @@ static int source_queue_advance (client_t *client)
refbuf = client->refbuf;
/* move to the next buffer if we have finished with the current one */
if (client->pos == refbuf->len)
if (client->pos >= refbuf->len)
{
if (refbuf->next == NULL)
{
@ -897,9 +910,9 @@ int listener_waiting_on_source (source_t *source, client_t *client)
static int send_listener (source_t *source, client_t *client)
{
int bytes;
int loop = 10; /* max number of iterations in one go */
int loop = 8; /* max number of iterations in one go */
long total_written = 0;
int ret = 0;
int ret = 0, lag;
if (source->flags & SOURCE_LISTENERS_SYNC)
return listener_waiting_on_source (source, client);
@ -916,19 +929,18 @@ static int send_listener (source_t *source, client_t *client)
}
if (source_running (source) == 0)
{
DEBUG0 ("source not running, listener will wait 200ms");
client->schedule_ms += 200;
DEBUG0 ("source not running, listener will wait");
client->schedule_ms += 100;
return 0;
}
if (client->refbuf == NULL)
client->check_buffer = source_queue_advance;
// do we migrate this listener to the same handler as the source client
if (source->client->worker != client->worker)
if (listener_change_worker (client, source))
return 1;
lag = source->client->queue_pos - client->queue_pos;
/* progessive slowdown if nearing max bandwidth. */
if (global.max_rate)
{
@ -945,12 +957,14 @@ static int send_listener (source_t *source, client_t *client)
if (throttle_sends > 0)
{
/* make lagging listeners, lag further on high bandwidth use */
if (source->client->queue_pos - client->queue_pos > 8192)
if (lag > (source->incoming_rate*2))
client->schedule_ms += 150;
}
}
if (source->incoming_rate > 1 && lag < (source->incoming_rate<<1))
loop = lag / (source->incoming_rate>>1);
while (loop)
while (1)
{
/* jump out if client connection has died */
if (client->connection.error)
@ -960,8 +974,11 @@ static int send_listener (source_t *source, client_t *client)
}
/* lets not send too much to one client in one go, but don't
sleep for too long if more data can be sent */
if (total_written > source->listener_send_trigger)
if (loop == 0 || total_written > source->listener_send_trigger)
{
client->schedule_ms = client->worker->time_ms + 15;
break;
}
bytes = client->check_buffer (client);
if (bytes < 0)
break; /* can't write any more */
@ -1012,14 +1029,14 @@ void source_init (source_t *source)
stats_event_flags (source->mount, "listener_peak", "0", STATS_COUNTERS);
stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
stats_event_flags (source->mount, "listener_connections", "0", STATS_COUNTERS);
stats_event_time (source->mount, "stream_start");
stats_event_time (source->mount, "stream_start", STATS_COUNTERS);
stats_event_flags (source->mount, "total_mbytes_sent", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "total_bytes_sent", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "total_bytes_read", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "outgoing_kbitrate", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "incoming_bitrate", "0", STATS_COUNTERS);
stats_event_flags (source->mount, "connected", "0", STATS_COUNTERS);
stats_event (source->mount, "source_ip", source->client->connection.ip);
stats_event_flags (source->mount, "source_ip", source->client->connection.ip, STATS_COUNTERS);
source->last_read = time(NULL);
source->prev_listeners = -1;
@ -1106,10 +1123,16 @@ void source_set_override (const char *mount, const char *dest)
void source_set_fallback (source_t *source, const char *dest_mount)
{
int bitrate = (int)(rate_avg (source->format->in_bitrate) * 1.02);
int bitrate = 0;
client_t *client = source->client;
time_t connected = client->worker->current_time.tv_sec - client->connection.con_time;
if (dest_mount == NULL)
return;
if (connected > 20)
bitrate = (int)(rate_avg (source->format->in_bitrate) * 1.02);
if (bitrate == 0 && source->limit_rate)
bitrate = source->limit_rate;
source->fallback.flags = FS_FALLBACK;
source->fallback.mount = strdup (dest_mount);
source->fallback.limit = bitrate;
@ -1123,20 +1146,22 @@ void source_shutdown (source_t *source, int with_fallback)
INFO1("Source \"%s\" exiting", source->mount);
if (source->client->connection.con_time)
update_source_stats (source);
source->flags &= ~SOURCE_ON_DEMAND;
source->termination_count = source->listeners;
mountinfo = config_find_mount (config_get_config(), source->mount);
if (mountinfo)
if (source->client->connection.con_time)
{
if (mountinfo->on_disconnect)
source_run_script (mountinfo->on_disconnect, source->mount);
auth_stream_end (mountinfo, source->mount);
if (with_fallback && global.running == ICE_RUNNING)
source_set_fallback (source, mountinfo->fallback_mount);
/* only do these if source has been running */
update_source_stats (source);
if (mountinfo)
{
if (mountinfo->on_disconnect)
source_run_script (mountinfo->on_disconnect, source->mount);
auth_stream_end (mountinfo, source->mount);
}
}
if (mountinfo && with_fallback && global.running == ICE_RUNNING)
source_set_fallback (source, mountinfo->fallback_mount);
config_release_config();
source->flags |= (SOURCE_TERMINATING | SOURCE_LISTENERS_SYNC);
}
@ -1404,14 +1429,19 @@ static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
*/
void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
{
char *listen_url;
int len;
/* set global settings first */
source->queue_size_limit = config->queue_size_limit;
source->min_queue_size = config->min_queue_size;
source->timeout = config->source_timeout;
source->default_burst_size = config->burst_size;
stats_event_args (source->mount, "listenurl", "http://%s:%d%s",
config->hostname, config->port, source->mount);
len = strlen (config->hostname) + strlen(source->mount) + 16;
listen_url = alloca (len);
snprintf (listen_url, len, "http://%s:%d%s", config->hostname, config->port, source->mount);
stats_event_flags (source->mount, "listenurl", listen_url, STATS_COUNTERS);
source_apply_mount (source, mountinfo);
@ -1608,6 +1638,7 @@ static int check_duplicate_logins (source_t *source, client_t *client, auth_t *a
{
if (auth->drop_existing_listener)
{
INFO2 ("Found %s on %s, dropping previous account", existing_client->username, source->mount);
existing_client->connection.error = 1;
return 1;
}
@ -1703,14 +1734,13 @@ static int source_listener_release (source_t *source, client_t *client)
config_release_config();
return ret;
}
client_send_404 (client, NULL); // failed on-demand relay?
return 0;
return client_send_404 (client, NULL); // failed on-demand relay?
}
int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client)
{
int loop = 10, bitrate = 0;
int loop = 10, bitrate = 0, do_process = 0;
int within_limits;
source_t *source;
mount_proxy *minfo = mountinfo;
@ -1726,8 +1756,7 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl
if (loop == 0)
{
WARN0 ("preventing a fallback loop");
client_send_403 (client, "Fallback through too many mountpoints");
return -1;
return client_send_403 (client, "Fallback through too many mountpoints");
}
avl_tree_rlock (global.source_tree);
source = source_find_mount_raw (mount);
@ -1770,12 +1799,6 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl
if (client->flags & CLIENT_IS_SLAVE)
{
if (source->client == NULL && (source->flags & SOURCE_ON_DEMAND) == 0)
{
thread_mutex_unlock (&source->lock);
client_send_403 (client, "Slave relay reading from time unregulated stream");
return -1;
}
INFO0 ("client is from a slave, bypassing limits");
break;
}
@ -1792,8 +1815,7 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl
{
thread_mutex_unlock (&source->lock);
INFO0 ("server-wide outgoing bandwidth limit reached");
client_send_403redirect (client, passed_mount, "server bandwidth reached");
return -1;
return client_send_403redirect (client, passed_mount, "server bandwidth reached");
}
}
}
@ -1804,8 +1826,7 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl
if (check_duplicate_logins (source, client, mountinfo->auth) == 0)
{
thread_mutex_unlock (&source->lock);
client_send_403 (client, "Account already in use");
return -1;
return client_send_403 (client, "Account already in use");
}
/* set a per-mount disconnect time if auth hasn't set one already */
@ -1846,8 +1867,7 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl
/* now we fail the client */
thread_mutex_unlock (&source->lock);
client_send_403redirect (client, passed_mount, "max listeners reached");
return -1;
return client_send_403redirect (client, passed_mount, "max listeners reached");
} while (1);
client->connection.sent_bytes = 0;
@ -1855,17 +1875,26 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl
client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
memset (client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE);
source_setup_listener (source, client);
if ((client->flags & CLIENT_ACTIVE) && (source->flags & SOURCE_RUNNING))
do_process = 1;
else
{
client->flags |= CLIENT_ACTIVE; // from an auth thread context
worker_wakeup (client->worker);
}
thread_mutex_unlock (&source->lock);
global_reduce_bitrate_sampling (global.out_bitrate);
stats_event_inc (NULL, "listeners");
stats_event_inc (NULL, "listener_connections");
source_setup_listener (source, client);
client->flags |= CLIENT_ACTIVE;
worker_wakeup (client->worker);
thread_mutex_unlock (&source->lock);
if (do_process) // send something back quickly
return client->ops->process (client);
return 0;
}
/* call with the source lock held, but expect the lock released on exit
* as the listener may of changed threads and therefore lock needed to be
* released

View File

@ -53,6 +53,7 @@ typedef struct source_tag
unsigned long peak_listeners;
unsigned long listeners;
unsigned long prev_listeners;
long incoming_rate;
int yp_public;

View File

@ -102,21 +102,21 @@ typedef struct _stats_tag
avl_tree *source_tree;
/* list of listeners for stats */
event_listener_t *event_listeners, *listeners_removed;
event_listener_t *event_listeners;
mutex_t listeners_lock;
} stats_t;
static volatile int _stats_running = 0;
static stats_t _stats;
static mutex_t _stats_mutex;
static int _compare_stats(void *a, void *b, void *arg);
static int _compare_source_stats(void *a, void *b, void *arg);
static int _free_stats(void *key);
static int _free_source_stats(void *key);
static stats_node_t *_find_node(avl_tree *tree, const char *name);
static stats_node_t *_find_node(const avl_tree *tree, const char *name);
static stats_source_t *_find_source(avl_tree *tree, const char *source);
static void process_event (stats_event_t *event);
static void _add_stats_to_stats_client (client_t *client, const char *fmt, va_list ap);
@ -149,14 +149,11 @@ void stats_initialize(void)
_stats.source_tree = avl_tree_new(_compare_source_stats, NULL);
_stats.event_listeners = NULL;
_stats.listeners_removed = NULL;
/* set up global mutex */
thread_mutex_create(&_stats_mutex);
thread_mutex_create (&_stats.listeners_lock);
_stats_running = 1;
stats_event_time (NULL, "server_start");
stats_event_time (NULL, "server_start", STATS_GENERAL);
/* global currently active stats */
stats_event_flags (NULL, "clients", "0", STATS_COUNTERS);
@ -185,9 +182,9 @@ void stats_shutdown(void)
_stats_running = 0;
thread_mutex_destroy(&_stats_mutex);
avl_tree_free(_stats.source_tree, _free_source_stats);
avl_tree_free(_stats.global_tree, _free_stats);
thread_mutex_destroy (&_stats.listeners_lock);
}
@ -277,21 +274,26 @@ static char *_get_stats(const char *source, const char *name)
stats_source_t *src = NULL;
char *value = NULL;
thread_mutex_lock(&_stats_mutex);
if (source == NULL) {
avl_tree_rlock (_stats.global_tree);
stats = _find_node(_stats.global_tree, name);
if (stats) value = (char *)strdup(stats->value);
avl_tree_unlock (_stats.global_tree);
} else {
avl_tree_rlock (_stats.source_tree);
src = _find_source(_stats.source_tree, source);
if (src) {
if (src)
{
avl_tree_rlock (src->stats_tree);
avl_tree_unlock (_stats.source_tree);
stats = _find_node(src->stats_tree, name);
if (stats) value = (char *)strdup(stats->value);
avl_tree_unlock (src->stats_tree);
}
else
avl_tree_unlock (_stats.source_tree);
}
if (stats) value = (char *)strdup(stats->value);
thread_mutex_unlock(&_stats_mutex);
return value;
}
@ -353,7 +355,7 @@ void stats_event_dec(const char *source, const char *name)
/* note: you must call this function only when you have exclusive access
** to the avl_tree
*/
static stats_node_t *_find_node(avl_tree *stats_tree, const char *name)
static stats_node_t *_find_node(const avl_tree *stats_tree, const char *name)
{
stats_node_t *stats;
avl_node *node;
@ -413,6 +415,8 @@ static void modify_node_event (stats_node_t *node, stats_event_t *event)
{
node->flags = event->flags;
event->action &= ~STATS_EVENT_HIDDEN;
if (event->value == NULL)
return;
}
if (event->action != STATS_EVENT_SET)
{
@ -486,10 +490,90 @@ static void process_global_event (stats_event_t *event)
}
static void process_source_stat (stats_source_t *src_stats, stats_event_t *event)
{
if (event->name)
{
stats_node_t *node = _find_node (src_stats->stats_tree, event->name);
if (node == NULL)
{
/* adding node */
if (event->action != STATS_EVENT_REMOVE && event->value)
{
DEBUG3 ("new node on %s \"%s\" (%s)", src_stats->source, event->name, event->value);
node = (stats_node_t *)calloc (1,sizeof(stats_node_t));
node->name = (char *)strdup (event->name);
node->value = (char *)strdup (event->value);
node->flags = event->flags;
if (src_stats->flags & STATS_HIDDEN)
node->flags |= STATS_HIDDEN;
stats_listener_send (node->flags, "EVENT %s %s %s\n", src_stats->source, event->name, event->value);
avl_insert (src_stats->stats_tree, (void *)node);
}
return;
}
if (event->action == STATS_EVENT_REMOVE)
{
DEBUG2 ("delete node %s from %s", event->name, src_stats->source);
stats_listener_send (node->flags, "DELETE %s %s\n", src_stats->source, event->name);
avl_delete (src_stats->stats_tree, (void *)node, _free_stats);
return;
}
modify_node_event (node, event);
stats_listener_send (node->flags, "EVENT %s %s %s\n", src_stats->source, node->name, node->value);
return;
}
if (event->action == STATS_EVENT_REMOVE && event->name == NULL)
{
avl_tree_unlock (src_stats->stats_tree);
avl_tree_wlock (_stats.source_tree);
avl_tree_wlock (src_stats->stats_tree);
avl_delete (_stats.source_tree, (void *)src_stats, _free_source_stats);
avl_tree_unlock (_stats.source_tree);
return;
}
/* change source flags status */
if (event->action & STATS_EVENT_HIDDEN)
{
avl_node *node = avl_get_first (src_stats->stats_tree);
int visible = 0;
if ((event->flags&STATS_HIDDEN) == (src_stats->flags&STATS_HIDDEN))
return;
if (src_stats->flags & STATS_HIDDEN)
{
stats_node_t *ct = _find_node (src_stats->stats_tree, "content-type");
const char *type = "audio/mpeg";
if (ct)
type = ct->name;
src_stats->flags &= ~STATS_HIDDEN;
stats_listener_send (src_stats->flags, "NEW %s %s\n", type, src_stats->source);
visible = 1;
}
else
{
stats_listener_send (src_stats->flags, "DELETE %s\n", src_stats->source);
src_stats->flags |= STATS_HIDDEN;
}
while (node)
{
stats_node_t *stats = (stats_node_t*)node->key;
if (visible)
{
stats->flags &= ~STATS_HIDDEN;
stats_listener_send (stats->flags, "EVENT %s %s %s\n", src_stats->source, stats->name, stats->value);
}
else
stats->flags |= STATS_HIDDEN;
node = avl_get_next (node);
}
}
}
static void process_source_event (stats_event_t *event)
{
stats_source_t *snode = _find_source(_stats.source_tree, event->source);
stats_node_t *node = NULL;
stats_source_t *snode;
avl_tree_wlock (_stats.source_tree);
snode = _find_source(_stats.source_tree, event->source);
@ -518,82 +602,12 @@ static void process_source_event (stats_event_t *event)
}
avl_tree_wlock (snode->stats_tree);
avl_tree_unlock (_stats.source_tree);
if (event->name)
{
node = _find_node (snode->stats_tree, event->name);
if (node == NULL)
{
/* adding node */
if (event->action != STATS_EVENT_REMOVE && event->value)
{
DEBUG3 ("new node on %s \"%s\" (%s)", event->source, event->name, event->value);
node = (stats_node_t *)calloc(1,sizeof(stats_node_t));
node->name = (char *)strdup(event->name);
node->value = (char *)strdup(event->value);
node->flags = event->flags;
if (snode->flags & STATS_HIDDEN)
node->flags |= STATS_HIDDEN;
stats_listener_send (node->flags, "EVENT %s %s %s\n", event->source, event->name, event->value);
avl_insert(snode->stats_tree, (void *)node);
}
avl_tree_unlock (snode->stats_tree);
return;
}
if (event->action == STATS_EVENT_REMOVE)
{
DEBUG1 ("delete node %s", event->name);
stats_listener_send (node->flags, "DELETE %s %s\n", event->source, event->name);
avl_delete(snode->stats_tree, (void *)node, _free_stats);
avl_tree_unlock (snode->stats_tree);
return;
}
modify_node_event (node, event);
stats_listener_send (node->flags, "EVENT %s %s %s\n", event->source, node->name, node->value);
avl_tree_unlock (snode->stats_tree);
return;
}
/* change source flags status */
if (event->action & STATS_EVENT_HIDDEN)
{
avl_node *node = avl_get_first (snode->stats_tree);
int visible = 0;
if ((event->flags&STATS_HIDDEN) == (snode->flags&STATS_HIDDEN))
{
avl_tree_unlock (snode->stats_tree);
return;
}
if (snode->flags & STATS_HIDDEN)
{
snode->flags &= ~STATS_HIDDEN;
stats_listener_send (snode->flags, "NEW %s\n", snode->source);
visible = 1;
}
else
{
stats_listener_send (snode->flags, "DELETE %s\n", snode->source);
snode->flags |= STATS_HIDDEN;
}
while (node)
{
stats_node_t *stats = (stats_node_t*)node->key;
if (visible)
{
stats->flags &= ~STATS_HIDDEN;
stats_listener_send (stats->flags, "EVENT %s %s %s\n", snode->source, stats->name, stats->value);
}
else
stats->flags |= STATS_HIDDEN;
node = avl_get_next (node);
}
avl_tree_unlock (snode->stats_tree);
return;
}
process_source_stat (snode, event);
avl_tree_unlock (snode->stats_tree);
}
void stats_event_time (const char *mount, const char *name)
void stats_event_time (const char *mount, const char *name, int flags)
{
time_t now = time(NULL);
struct tm local;
@ -601,13 +615,13 @@ void stats_event_time (const char *mount, const char *name)
localtime_r (&now, &local);
strftime (buffer, sizeof (buffer), ICECAST_TIME_FMT, &local);
stats_event_flags (mount, name, buffer, STATS_GENERAL);
stats_event_flags (mount, name, buffer, flags);
}
static int stats_listeners_send (client_t *client)
{
int loop = 8;
int loop = 8, total = 0;
int ret = 0;
event_listener_t *listener = client->shared_data;
@ -624,23 +638,24 @@ static int stats_listeners_send (client_t *client)
return -1;
}
client->schedule_ms = client->worker->time_ms;
thread_mutex_lock(&_stats_mutex);
while (loop)
thread_mutex_lock (&_stats.listeners_lock);
while (1)
{
refbuf_t *refbuf = client->refbuf;
if (refbuf == NULL)
{
client->schedule_ms = client->worker->time_ms + 300;
client->schedule_ms = client->worker->time_ms + 100;
break;
}
if (loop == 0 || total > 32768)
break;
ret = format_generic_write_to_client (client);
if (ret < 0)
if (ret > 0)
{
client->schedule_ms = client->worker->time_ms + 200;
break;
total += ret;
listener->content_len -= ret;
}
listener->content_len -= ret;
if (client->pos == refbuf->len)
{
client->refbuf = refbuf->next;
@ -652,18 +667,20 @@ static int stats_listeners_send (client_t *client)
if (listener->content_len)
WARN1 ("content length is %u", listener->content_len);
listener->recent_block = NULL;
client->schedule_ms = client->worker->time_ms + 100;
break;
}
loop--;
}
else
{
client->schedule_ms = client->worker->time_ms + 200;
client->schedule_ms = client->worker->time_ms + 250;
break; /* short write, so stop for now */
}
loop--;
}
thread_mutex_unlock(&_stats_mutex);
if (loop == 0)
client->schedule_ms = client->worker->time_ms;
thread_mutex_unlock (&_stats.listeners_lock);
if (client->connection.error || global.running != ICE_RUNNING)
return -1;
return 0;
}
@ -686,21 +703,25 @@ static void clear_stats_queue (client_t *client)
static void stats_listener_send (int mask, const char *fmt, ...)
{
va_list ap;
event_listener_t *listener = _stats.event_listeners,
**trail = &_stats.event_listeners;
event_listener_t *listener;
va_start(ap, fmt);
thread_mutex_lock (&_stats.listeners_lock);
listener = _stats.event_listeners;
while (listener)
{
if (listener->mask & mask)
_add_stats_to_stats_client (listener->client, fmt, ap);
trail = &listener->next;
listener = listener->next;
}
thread_mutex_unlock (&_stats.listeners_lock);
va_end(ap);
}
/* called after each xml reload */
void stats_global (ice_config_t *config)
{
stats_event_flags (NULL, "server_id", config->server_id, STATS_GENERAL);
@ -711,15 +732,6 @@ void stats_global (ice_config_t *config)
global.max_rate = config->max_bandwidth;
throttle_sends = 0;
thread_spin_unlock (&global.spinlock);
#if 0
/* restart a master stats connection */
config->master = calloc (1, sizeof ice_master_details);
config->master->hostname = xmlCharStrdup ("127.0.0.1");
config->master->port = 8000;
config->master->username = xmlCharStrdup ("relay");
config->master->password = xmlCharStrdup ("relayme");
_stats.sock = sock_connect_wto_bind (server, port, bind, 10);
#endif
}
static void process_event (stats_event_t *event)
@ -811,6 +823,7 @@ static void _add_stats_to_stats_client (client_t *client, const char *fmt, va_li
return;
}
_add_node_to_stats_client (client, r);
client->schedule_ms = 0;
}
@ -884,6 +897,7 @@ static void _register_listener (client_t *client)
refbuf->len = 0;
/* the global stats */
avl_tree_rlock (_stats.global_tree);
node = avl_get_first(_stats.global_tree);
while (node)
{
@ -901,15 +915,23 @@ static void _register_listener (client_t *client)
}
node = avl_get_next(node);
}
avl_tree_unlock (_stats.global_tree);
/* now the stats for each source */
avl_tree_rlock (_stats.source_tree);
node = avl_get_first(_stats.source_tree);
while (node)
{
avl_node *node2;
stats_node_t *metadata_stat = NULL;
stats_source_t *snode = (stats_source_t *)node->key;
if (snode->flags & listener->mask)
{
if (_append_to_buffer (refbuf, size, "NEW %s\n", snode->source) < 0)
stats_node_t *ct = _find_node (snode->stats_tree, "content-type");
const char *type = "audio/mpeg";
if (ct)
type = ct->name;
if (_append_to_buffer (refbuf, size, "NEW %s %s\n", type, snode->source) < 0)
{
_add_node_to_stats_client (client, refbuf);
refbuf = refbuf_new (size);
@ -918,11 +940,14 @@ static void _register_listener (client_t *client)
}
}
node = avl_get_next(node);
avl_tree_rlock (snode->stats_tree);
node2 = avl_get_first(snode->stats_tree);
while (node2)
{
stats_node_t *stat = node2->key;
if (stat->flags & listener->mask)
if (metadata_stat == NULL && strcmp (stat->name, "metadata_updated") == 0)
metadata_stat = stat;
else if (stat->flags & listener->mask)
{
if (_append_to_buffer (refbuf, size, "EVENT %s %s %s\n", snode->source, stat->name, stat->value) < 0)
{
@ -934,19 +959,39 @@ static void _register_listener (client_t *client)
}
node2 = avl_get_next (node2);
}
while (metadata_stat)
{
if (_append_to_buffer (refbuf, size, "EVENT %s %s %s\n", snode->source, metadata_stat->name, metadata_stat->value) < 0)
{
_add_node_to_stats_client (client, refbuf);
refbuf = refbuf_new (size);
refbuf->len = 0;
continue;
}
break;
}
avl_tree_unlock (snode->stats_tree);
}
avl_tree_unlock (_stats.source_tree);
_add_node_to_stats_client (client, refbuf);
/* now we register to receive future event notices */
thread_mutex_lock (&_stats.listeners_lock);
listener->next = _stats.event_listeners;
_stats.event_listeners = listener;
thread_mutex_unlock (&_stats.listeners_lock);
}
static void stats_client_release (client_t *client)
{
event_listener_t *listener = _stats.event_listeners,
**trail = &_stats.event_listeners;
event_listener_t *listener, **trail;
INFO0 ("removing stats client");
thread_mutex_lock (&_stats.listeners_lock);
listener = _stats.event_listeners;
trail = &_stats.event_listeners;
while (listener)
{
if (listener == client->shared_data)
@ -955,6 +1000,7 @@ static void stats_client_release (client_t *client)
char buffer [20];
*trail = listener->next;
thread_mutex_unlock (&_stats.listeners_lock);
clear_stats_queue (client);
free (listener->source);
free (listener);
@ -967,6 +1013,7 @@ static void stats_client_release (client_t *client)
trail = &listener->next;
listener = listener->next;
}
thread_mutex_unlock (&_stats.listeners_lock);
}
@ -993,27 +1040,28 @@ void stats_add_listener (client_t *client, int mask)
listener->recent_block = client->refbuf;
listener->client = client;
thread_mutex_lock(&_stats_mutex);
_register_listener (client);
thread_mutex_unlock(&_stats_mutex);
client->flags |= CLIENT_ACTIVE;
}
void stats_transform_xslt(client_t *client, const char *uri)
int stats_transform_xslt (client_t *client, const char *uri)
{
xmlDocPtr doc;
char *xslpath = util_get_path_from_normalised_uri (uri, 0);
const char *mount = httpp_get_query_param (client->parser, "mount");
int ret;
if (mount == NULL && client->server_conn->shoutcast_mount && strcmp (uri, "/7.xsl") == 0)
mount = client->server_conn->shoutcast_mount;
doc = stats_get_xml (STATS_PUBLIC, mount);
xslt_transform(doc, xslpath, client);
ret = xslt_transform (doc, xslpath, client);
xmlFreeDoc(doc);
free (xslpath);
return ret;
}
xmlDocPtr stats_get_xml (int flags, const char *show_mount)
@ -1088,6 +1136,7 @@ static int _free_source_stats(void *key)
stats_source_t *node = (stats_source_t *)key;
stats_listener_send (node->flags, "DELETE %s\n", node->source);
DEBUG1 ("delete source node %s", node->source);
avl_tree_unlock (node->stats_tree);
avl_tree_free(node->stats_tree, _free_stats);
free(node->source);
free(node);
@ -1164,7 +1213,9 @@ void stats_clear_virtual_mounts (void)
if (source == NULL)
{
stats_node_t *node = _find_node (src->stats_tree, "fallback");
stats_node_t *node;
avl_tree_wlock (src->stats_tree);
node = _find_node (src->stats_tree, "fallback");
if (node == NULL)
{
/* no source_t and no fallback file stat, so delete */
@ -1172,6 +1223,7 @@ void stats_clear_virtual_mounts (void)
avl_delete (_stats.source_tree, src, _free_source_stats);
continue;
}
avl_tree_unlock (src->stats_tree);
}
snode = avl_get_next (snode);
@ -1182,12 +1234,12 @@ void stats_clear_virtual_mounts (void)
void stats_global_calc (void)
{
event_listener_t *listener;
stats_event_t event;
avl_node *anode;
char buffer [VAL_BUFSIZE];
connection_stats ();
avl_tree_rlock (_stats.global_tree);
anode = avl_get_first(_stats.global_tree);
while (anode)
{
@ -1197,17 +1249,132 @@ void stats_global_calc (void)
stats_listener_send (node->flags, "EVENT global %s %s\n", node->name, node->value);
anode = avl_get_next (anode);
}
avl_tree_unlock (_stats.global_tree);
build_event (&event, NULL, "outgoing_kbitrate", buffer);
event.flags = STATS_COUNTERS|STATS_HIDDEN;
snprintf (buffer, sizeof(buffer), "%" PRIu64,
(int64_t)global_getrate_avg (global.out_bitrate) * 8 / 1024);
process_event (&event);
/* retrieve the list of closing down clients */
thread_mutex_lock (&_stats_mutex);
listener = _stats.listeners_removed;
_stats.listeners_removed = NULL;
thread_mutex_unlock (&_stats_mutex);
}
long stats_handle (const char *mount)
{
stats_source_t *src_stats;
avl_tree_wlock (_stats.source_tree);
src_stats = _find_source(_stats.source_tree, mount);
if (src_stats == NULL)
{
src_stats = (stats_source_t *)calloc (1, sizeof (stats_source_t));
if (src_stats == NULL)
abort();
DEBUG1 ("new source stat %s", mount);
src_stats->source = (char *)strdup (mount);
src_stats->stats_tree = avl_tree_new (_compare_stats, NULL);
src_stats->flags = STATS_SLAVE|STATS_GENERAL|STATS_HIDDEN;
avl_insert (_stats.source_tree, (void *)src_stats);
}
avl_tree_wlock (src_stats->stats_tree);
avl_tree_unlock (_stats.source_tree);
return (long)src_stats;
}
void stats_lock (long handle)
{
stats_source_t *src_stats = (stats_source_t *)handle;
if (src_stats)
avl_tree_wlock (src_stats->stats_tree);
}
void stats_release (long handle)
{
stats_source_t *src_stats = (stats_source_t *)handle;
if (src_stats)
avl_tree_unlock (src_stats->stats_tree);
}
// assume source stats are write locked
void stats_set (long handle, const char *name, const char *value)
{
if (handle)
{
stats_source_t *src_stats = (stats_source_t *)handle;
stats_event_t event;
build_event (&event, src_stats->source, name, (char *)value);
process_source_stat (src_stats, &event);
}
}
void stats_set_args (long handle, const char *name, const char *format, ...)
{
va_list val;
int ret;
stats_source_t *src_stats = (stats_source_t *)handle;
char buf[1024];
if (name == NULL)
return;
va_start (val, format);
ret = vsnprintf (buf, sizeof (buf), format, val);
va_end (val);
if (ret < 0 || (unsigned int)ret >= sizeof (buf))
{
WARN2 ("problem with formatting %s stat %s",
src_stats == NULL ? "global" : src_stats->source, name);
return;
}
stats_set (handle, name, buf);
}
void stats_set_flags (long handle, const char *name, const char *value, int flags)
{
stats_source_t *src_stats = (stats_source_t *)handle;
stats_event_t event;
build_event (&event, src_stats->source, name, value);
event.flags = flags;
if (value)
event.action |= STATS_EVENT_HIDDEN;
else
event.action = STATS_EVENT_HIDDEN;
process_source_stat (src_stats, &event);
}
void stats_set_conv(long handle, const char *name, const char *value, const char *charset)
{
const char *metadata = value;
xmlBufferPtr conv = xmlBufferCreate ();
if (charset && value)
{
xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset);
if (handle)
{
xmlBufferPtr raw = xmlBufferCreate ();
xmlBufferAdd (raw, (const xmlChar *)value, strlen (value));
if (xmlCharEncInFunc (handle, conv, raw) > 0)
metadata = (char *)xmlBufferContent (conv);
xmlBufferFree (raw);
xmlCharEncCloseFunc (handle);
}
else
WARN1 ("No charset found for \"%s\"", charset);
}
stats_set (handle, name, metadata);
xmlBufferFree (conv);
}

View File

@ -46,16 +46,24 @@ void stats_event_add(const char *source, const char *name, unsigned long value);
void stats_event_sub(const char *source, const char *name, unsigned long value);
void stats_event_dec(const char *source, const char *name);
void stats_event_flags (const char *source, const char *name, const char *value, int flags);
void stats_event_time (const char *mount, const char *name);
void stats_event_time (const char *mount, const char *name, int flags);
void *stats_connection(void *arg);
void stats_add_listener (client_t *client, int hidden_level);
void stats_global_calc(void);
void stats_transform_xslt(client_t *client, const char *uri);
int stats_transform_xslt(client_t *client, const char *uri);
void stats_sendxml(client_t *client);
xmlDocPtr stats_get_xml(int flags, const char *show_mount);
char *stats_get_value(const char *source, const char *name);
long stats_handle (const char *mount);
void stats_lock (long handle);
void stats_release (long handle);
void stats_set (long handle, const char *name, const char *value);
void stats_set_args (long handle, const char *name, const char *format, ...);
void stats_set_flags (long handle, const char *name, const char *value, int flags);
void stats_set_conv (long handle, const char *name, const char *value, const char *charset);
#endif /* __STATS_H__ */

View File

@ -324,20 +324,16 @@ char *util_url_unescape (const char *src)
for(i=0; i < len; i++) {
switch(src[i]) {
case '%':
if(i+2 >= len) {
free(decoded);
return NULL;
if (i+2 >= len || hex(src[i+1]) == -1 || hex(src[i+2]) == -1)
{
/* no matching pattern so assume just the % */
*dst++ = '%';
}
if(hex(src[i+1]) == -1 || hex(src[i+2]) == -1 ) {
free(decoded);
return NULL;
else
{
*dst++ = hex(src[i+1]) * 16 + hex(src[i+2]);
i+= 2;
}
*dst++ = hex(src[i+1]) * 16 + hex(src[i+2]);
i+= 2;
break;
case '#':
done = 1;
break;
case 0:
ERROR0("Fatal internal logic error in util_url_unescape()");
@ -841,3 +837,23 @@ int get_line(FILE *file, char *buf, size_t siz)
return 0;
}
#ifdef _MSC_VER
int msvc_snprintf (char *buf, int len, const char *fmt, ...)
{
int ret;
va_list ap;
va_start(ap, fmt);
ret = _vsnprintf (buf, len, fmt, ap);
if (ret < 0)
buf[len-1] = 0;
va_end(ap);
return ret;
}
int msvc_vsnprintf (char *buf, int len, const char *fmt, va_list ap)
{
int ret = _vsnprintf (buf, len, fmt, ap);
if (ret < 0)
buf[len-1] = 0;
return ret;
}
#endif

View File

@ -84,6 +84,75 @@ int xsltSaveResultToString(xmlChar **doc_txt_ptr, int * doc_txt_len, xmlDocPtr r
}
#endif
struct bufs
{
refbuf_t *head, **tail;
int len;
};
static int xslt_write_callback (void *ctxt, const char *data, int len)
{
struct bufs *x = ctxt;
refbuf_t *r;
int loop = 10;
if (len == 0)
return 0;
if (len < 0 || len > 2000000)
{
ERROR1 ("%d length requested", len);
return -1;
}
while (loop)
{
int size = len > 4096 ? len : 4096;
if (*x->tail == NULL)
{
*x->tail = refbuf_new (size);
(*x->tail)->len = 0;
}
r = *x->tail;
if (r->len + len > size)
{
x->tail = &r->next;
loop--;
continue;
}
memcpy (r->data + r->len, data, len);
r->len += len;
x->len += len;
break;
}
return len;
}
int xslt_SaveResultToBuf (refbuf_t **bptr, int *len, xmlDocPtr result, xsltStylesheetPtr style)
{
xmlOutputBufferPtr buf;
struct bufs x;
if (result->children == NULL)
{
*bptr = NULL;
*len = 0;
return 0;
}
memset (&x, 0, sizeof (x));
x.tail = &x.head;
buf = xmlOutputBufferCreateIO (xslt_write_callback, NULL, &x, NULL);
if (buf == NULL)
return -1;
xsltSaveResultTo (buf, result, style);
*bptr = x.head;
*len = x.len;
xmlOutputBufferClose(buf);
return 0;
}
/* Keep it small... */
#define CACHESIZE 3
@ -180,13 +249,14 @@ static xsltStylesheetPtr xslt_get_stylesheet(const char *fn) {
return cache[i].stylesheet;
}
void xslt_transform(xmlDocPtr doc, const char *xslfilename, client_t *client)
int xslt_transform (xmlDocPtr doc, const char *xslfilename, client_t *client)
{
xmlDocPtr res;
xsltStylesheetPtr cur;
xmlChar *string;
int len, problem = 0;
const char *mediatype = NULL;
int len;
refbuf_t *content = NULL;
char **params = NULL;
xmlSetGenericErrorFunc ("", log_parse_failure);
xsltSetGenericErrorFunc ("", log_parse_failure);
@ -198,54 +268,67 @@ void xslt_transform(xmlDocPtr doc, const char *xslfilename, client_t *client)
{
thread_mutex_unlock(&xsltlock);
ERROR1 ("problem reading stylesheet \"%s\"", xslfilename);
client_send_404 (client, "Could not parse XSLT file");
return;
return client_send_404 (client, "Could not parse XSLT file");
}
if (client->parser->queryvars)
{
// annoying but we need to surround the args with ' when passing them in
int i, arg_count = client->parser->queryvars->length * 2;
avl_node *node = avl_get_first (client->parser->queryvars);
params = calloc (arg_count+1, sizeof (char *));
for (i = 0; node && i < arg_count; node = avl_get_next (node))
{
http_var_t *param = (http_var_t *)node->key;
params[i++] = param->name;
params[i] = (char*)alloca (strlen (param->value) +3);
sprintf (params[i++], "\'%s\'", param->value);
}
params[i] = NULL;
}
res = xsltApplyStylesheet(cur, doc, NULL);
res = xsltApplyStylesheet (cur, doc, (const char **)params);
free (params);
if (res == NULL || xsltSaveResultToString (&string, &len, res, cur) < 0)
problem = 1;
/* lets find out the content type to use */
if (cur->mediaType)
mediatype = (char *)cur->mediaType;
if (res == NULL || xslt_SaveResultToBuf (&content, &len, res, cur) < 0)
{
thread_mutex_unlock (&xsltlock);
xmlFreeDoc (res);
WARN1 ("problem applying stylesheet \"%s\"", xslfilename);
return client_send_404 (client, "XSLT problem");
}
else
{
/* check method for the default, a missing method assumes xml */
if (cur->method && xmlStrcmp (cur->method, XMLSTR("html")) == 0)
mediatype = "text/html";
else
if (cur->method && xmlStrcmp (cur->method, XMLSTR("text")) == 0)
mediatype = "text/plain";
else
mediatype = "text/xml";
}
if (problem == 0)
{
/* the 100 is to allow for the hardcoded headers */
unsigned int full_len = strlen (mediatype) + len + 100;
refbuf_t *refbuf = refbuf_new (full_len);
refbuf_t *refbuf = refbuf_new (100);
const char *mediatype = NULL;
if (string == NULL)
string = xmlCharStrdup ("");
snprintf (refbuf->data, full_len,
"HTTP/1.0 200 OK\r\nContent-Type: %s\r\nContent-Length: %d\r\n\r\n%s",
mediatype, len, string);
/* lets find out the content type to use */
if (cur->mediaType)
mediatype = (char *)cur->mediaType;
else
{
/* check method for the default, a missing method assumes xml */
if (cur->method && xmlStrcmp (cur->method, XMLSTR("html")) == 0)
mediatype = "text/html";
else
if (cur->method && xmlStrcmp (cur->method, XMLSTR("text")) == 0)
mediatype = "text/plain";
else
mediatype = "text/xml";
}
snprintf (refbuf->data, 100,
"HTTP/1.0 200 OK\r\nContent-Type: %s\r\nContent-Length: %d\r\n\r\n",
mediatype, len);
thread_mutex_unlock (&xsltlock);
client->respcode = 200;
client_set_queue (client, NULL);
client->refbuf = refbuf;
refbuf->len = strlen (refbuf->data);
fserve_setup_client (client, NULL);
xmlFree (string);
refbuf->next = content;
}
else
{
WARN1 ("problem applying stylesheet \"%s\"", xslfilename);
client_send_404 (client, "XSLT problem");
}
thread_mutex_unlock (&xsltlock);
xmlFreeDoc(res);
return fserve_setup_client (client);
}

View File

@ -34,7 +34,7 @@
#include "stats.h"
void xslt_transform(xmlDocPtr doc, const char *xslfilename, client_t *client);
int xslt_transform (xmlDocPtr doc, const char *xslfilename, client_t *client);
void xslt_initialize(void);
void xslt_shutdown(void);

View File

@ -456,6 +456,12 @@ static int do_yp_add (ypdata_t *yp, char *s, unsigned len)
add_yp_info (yp, value, YP_AUDIO_INFO);
free (value);
if (yp->server_name[0] == 0 || yp->server_genre[0] == 0 || yp->server_type[0] == 0 || yp->bitrate[0] == 0)
{
INFO1 ("mount %s requires stats (sn, genre, type, bitrate)", yp->mount);
yp_schedule (yp, 600);
return -1;
}
ret = snprintf (s, len, "action=add&sn=%s&genre=%s&cpswd=%s&desc="
"%s&url=%s&listenurl=%s&type=%s&stype=%s&b=%s&%s\r\n",
yp->server_name, yp->server_genre, yp->cluster_password,

View File

@ -4,9 +4,9 @@
#include "stdafx.h"
#include "Icecast2win.h"
#include "Icecast2winDlg.h"
#include "xslt.h"
extern "C" {
#include "xslt.h"
void _initialize_subsystems(void);
void _shutdown_subsystems(void);
}

View File

@ -3,7 +3,7 @@
[Setup]
AppName=Icecast2-KH
AppVerName=Icecast v2.3.2-kh29
AppVerName=Icecast v2.3.2-kh30
AppPublisherURL=http://www.icecast.org
AppSupportURL=http://www.icecast.org
AppUpdatesURL=http://www.icecast.org
@ -13,7 +13,7 @@ AllowNoIcons=yes
LicenseFile=..\COPYING
InfoAfterFile=..\README
OutputDir=.
OutputBaseFilename=icecast2_win32_v2.3.2-kh29_setup
OutputBaseFilename=icecast2_win32_v2.3.2-kh30_setup
WizardImageFile=icecast2logo2.bmp
WizardImageStretch=no
VersionInfoVersion=2.3.2

View File

@ -1,10 +1,10 @@
#include <config.h>
#define _WIN32_WINNT 0x0400
#include <windows.h>
#include <stdio.h>
#include <direct.h>
extern "C" {
#include <config.h>
#include "thread/thread.h"
#include "avl/avl.h"
#include "log/log.h"