1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-12-04 14:46:30 -05:00

Cleaned up version of Ciaran Anscomb's relaying patch.

svn path=/trunk/httpp/; revision=3760
This commit is contained in:
Michael Smith 2002-08-05 14:48:04 +00:00
parent 3727b2b32c
commit f1dc57ae2b
17 changed files with 437 additions and 126 deletions

4
TODO
View File

@ -27,3 +27,7 @@ FEATURES
- support W3C Extended Logging (http://www.w3.org/TR/WD-logfile.html) - support W3C Extended Logging (http://www.w3.org/TR/WD-logfile.html)
toggle between this and Apache Combined Log Format in the config file. toggle between this and Apache Combined Log Format in the config file.
default to apache style. default to apache style.
- allow using get_predata() stuff to send an "intro" to any newly-connected
user?

View File

@ -3,12 +3,12 @@
<admin>jack@icecast.org</admin> <admin>jack@icecast.org</admin>
<limits> <limits>
<clients>100</clients> <clients>2000</clients>
<sources>2</sources> <sources>2</sources>
<threadpool>5</threadpool> <threadpool>5</threadpool>
<client-timeout>30</client-timeout> <client-timeout>30</client-timeout>
<header-timeout>15</header-timeout> <header-timeout>15</header-timeout>
<source-timeout>10</source-timeout> <source-timeout>100</source-timeout>
</limits> </limits>
<source-password>hackme</source-password> <source-password>hackme</source-password>
@ -25,8 +25,12 @@
<port>8000</port> <port>8000</port>
<!--<bind-address>127.0.0.1</bind-address>--> <!--<bind-address>127.0.0.1</bind-address>-->
<!--<master-server>127.0.0.1</master-server>-->
<!--<master-server-port>8001</master-server-port>-->
<!--<master-update-interval>120</master-update-interval>-->
<paths> <paths>
<basedir>/usr/local/icecast</basedir> <basedir>/home/msmith/icecast/icecast</basedir>
<logdir>/tmp</logdir> <logdir>/tmp</logdir>
</paths> </paths>
@ -37,9 +41,9 @@
<security> <security>
<chroot>0</chroot> <chroot>0</chroot>
<!--<changeowner> <!-- <changeowner>
<user>nobody</user> <user>msmith</user>
<group>nogroup</group> <group>nogroup</group>
</changeowner> --> </changeowner> -->
</security> </security>
</icecast> </icecast>

View File

@ -7,10 +7,10 @@ SUBDIRS = avl thread httpp net log timing
bin_PROGRAMS = icecast bin_PROGRAMS = icecast
noinst_HEADERS = config.h os.h logging.h sighandler.h connection.h global.h\ noinst_HEADERS = config.h os.h logging.h sighandler.h connection.h global.h\
util.h source.h stats.h refbuf.h client.h format.h format_vorbis.h\ util.h slave.h source.h stats.h refbuf.h client.h format.h format_vorbis.h\
compat.h format_mp3.h compat.h format_mp3.h
icecast_SOURCES = config.c main.c logging.c sighandler.c connection.c global.c\ icecast_SOURCES = config.c main.c logging.c sighandler.c connection.c global.c\
util.c source.c stats.c refbuf.c client.c format.c format_vorbis.c\ util.c slave.c source.c stats.c refbuf.c client.c format.c format_vorbis.c\
format_mp3.c format_mp3.c
icecast_LDADD = net/libicenet.la thread/libicethread.la httpp/libicehttpp.la\ icecast_LDADD = net/libicenet.la thread/libicethread.la httpp/libicehttpp.la\

View File

@ -23,6 +23,7 @@
#define CONFIG_DEFAULT_CHUID 0 #define CONFIG_DEFAULT_CHUID 0
#define CONFIG_DEFAULT_USER NULL #define CONFIG_DEFAULT_USER NULL
#define CONFIG_DEFAULT_GROUP NULL #define CONFIG_DEFAULT_GROUP NULL
#define CONFIG_MASTER_UPDATE_INTERVAL 120
#ifndef _WIN32 #ifndef _WIN32
#define CONFIG_DEFAULT_BASE_DIR "/usr/local/icecast" #define CONFIG_DEFAULT_BASE_DIR "/usr/local/icecast"
@ -141,6 +142,9 @@ static void _set_defaults(void)
_configuration.hostname = (char *)strdup(CONFIG_DEFAULT_HOSTNAME); _configuration.hostname = (char *)strdup(CONFIG_DEFAULT_HOSTNAME);
_configuration.port = CONFIG_DEFAULT_PORT; _configuration.port = CONFIG_DEFAULT_PORT;
_configuration.bind_address = NULL; _configuration.bind_address = NULL;
_configuration.master_server = NULL;
_configuration.master_server_port = CONFIG_DEFAULT_PORT;
_configuration.master_update_interval = CONFIG_MASTER_UPDATE_INTERVAL;
_configuration.base_dir = (char *)strdup(CONFIG_DEFAULT_BASE_DIR); _configuration.base_dir = (char *)strdup(CONFIG_DEFAULT_BASE_DIR);
_configuration.log_dir = (char *)strdup(CONFIG_DEFAULT_LOG_DIR); _configuration.log_dir = (char *)strdup(CONFIG_DEFAULT_LOG_DIR);
_configuration.access_log = (char *)strdup(CONFIG_DEFAULT_ACCESS_LOG); _configuration.access_log = (char *)strdup(CONFIG_DEFAULT_ACCESS_LOG);
@ -166,8 +170,15 @@ static void _parse_root(xmlDocPtr doc, xmlNodePtr node)
if (_configuration.admin) free(_configuration.admin); if (_configuration.admin) free(_configuration.admin);
_configuration.admin = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); _configuration.admin = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "source-password") == 0) { } else if (strcmp(node->name, "source-password") == 0) {
if (_configuration.source_password) free(_configuration.source_password); char *mount, *pass;
_configuration.source_password = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); if ((mount = (char *)xmlGetProp(node, "mount")) != NULL) {
pass = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
/* FIXME: This is a placeholder for per-mount passwords */
}
else {
if (_configuration.source_password) free(_configuration.source_password);
_configuration.source_password = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
}
} else if (strcmp(node->name, "hostname") == 0) { } else if (strcmp(node->name, "hostname") == 0) {
if (_configuration.hostname) free(_configuration.hostname); if (_configuration.hostname) free(_configuration.hostname);
_configuration.hostname = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); _configuration.hostname = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
@ -178,6 +189,15 @@ static void _parse_root(xmlDocPtr doc, xmlNodePtr node)
} else if (strcmp(node->name, "bind-address") == 0) { } else if (strcmp(node->name, "bind-address") == 0) {
if (_configuration.bind_address) free(_configuration.bind_address); if (_configuration.bind_address) free(_configuration.bind_address);
_configuration.bind_address = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); _configuration.bind_address = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "master-server") == 0) {
if (_configuration.master_server) free(_configuration.master_server);
_configuration.master_server = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
} else if (strcmp(node->name, "master-server-port") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.master_server_port = atoi(tmp);
} else if (strcmp(node->name, "master-update-interval") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
_configuration.master_update_interval = atoi(tmp);
} else if (strcmp(node->name, "limits") == 0) { } else if (strcmp(node->name, "limits") == 0) {
_parse_limits(doc, node->xmlChildrenNode); _parse_limits(doc, node->xmlChildrenNode);
} else if (strcmp(node->name, "directory") == 0) { } else if (strcmp(node->name, "directory") == 0) {

View File

@ -33,6 +33,9 @@ typedef struct ice_config_tag
char *hostname; char *hostname;
int port; int port;
char *bind_address; char *bind_address;
char *master_server;
int master_server_port;
int master_update_interval;
char *base_dir; char *base_dir;
char *log_dir; char *log_dir;

View File

@ -57,7 +57,7 @@ static mutex_t _queue_mutex;
static thread_queue_t *_conhands = NULL; static thread_queue_t *_conhands = NULL;
static rwlock_t _source_shutdown_rwlock; rwlock_t _source_shutdown_rwlock;
static void *_handle_connection(void *arg); static void *_handle_connection(void *arg);
@ -69,6 +69,7 @@ void connection_initialize(void)
thread_mutex_create(&_queue_mutex); thread_mutex_create(&_queue_mutex);
thread_rwlock_create(&_source_shutdown_rwlock); thread_rwlock_create(&_source_shutdown_rwlock);
thread_cond_create(&_pool_cond); thread_cond_create(&_pool_cond);
thread_cond_create(&global.shutdown_cond);
_initialized = 1; _initialized = 1;
} }
@ -77,6 +78,7 @@ void connection_shutdown(void)
{ {
if (!_initialized) return; if (!_initialized) return;
thread_cond_destroy(&global.shutdown_cond);
thread_cond_destroy(&_pool_cond); thread_cond_destroy(&_pool_cond);
thread_rwlock_destroy(&_source_shutdown_rwlock); thread_rwlock_destroy(&_source_shutdown_rwlock);
thread_mutex_destroy(&_queue_mutex); thread_mutex_destroy(&_queue_mutex);
@ -85,16 +87,6 @@ void connection_shutdown(void)
_initialized = 0; _initialized = 0;
} }
static connection_t *_create_connection(void)
{
connection_t *con;
con = (connection_t *)malloc(sizeof(connection_t));
memset(con, 0, sizeof(connection_t));
return con;
}
static unsigned long _next_connection_id(void) static unsigned long _next_connection_id(void)
{ {
unsigned long id; unsigned long id;
@ -106,6 +98,17 @@ static unsigned long _next_connection_id(void)
return id; return id;
} }
connection_t *create_connection(sock_t sock, char *ip) {
connection_t *con;
con = (connection_t *)malloc(sizeof(connection_t));
memset(con, 0, sizeof(connection_t));
con->sock = sock;
con->con_time = time(NULL);
con->id = _next_connection_id();
con->ip = ip;
return con;
}
static connection_t *_accept_connection(void) static connection_t *_accept_connection(void)
{ {
int sock; int sock;
@ -121,12 +124,7 @@ static connection_t *_accept_connection(void)
sock = sock_accept(global.serversock, ip, 16); sock = sock_accept(global.serversock, ip, 16);
if (sock >= 0) { if (sock >= 0) {
con = _create_connection(); con = create_connection(sock, ip);
con->sock = sock;
con->con_time = time(NULL);
con->id = _next_connection_id();
con->ip = ip;
return con; return con;
} }
@ -246,6 +244,9 @@ void connection_accept_loop(void)
} }
} }
/* Give all the other threads notification to shut down */
thread_cond_broadcast(&global.shutdown_cond);
_destroy_pool(); _destroy_pool();
/* wait for all the sources to shutdown */ /* wait for all the sources to shutdown */
@ -283,6 +284,44 @@ static connection_t *_get_connection(void)
return con; return con;
} }
int connection_create_source(connection_t *con, http_parser_t *parser, char *mount) {
source_t *source;
char *contenttype;
/* check to make sure this source wouldn't
** be over the limit
*/
global_lock();
if (global.sources >= config_get_config()->source_limit) {
printf("TOO MANY SOURCE, KICKING THIS ONE\n");
INFO1("Source (%s) logged in, but there are too many sources", mount);
global_unlock();
return 0;
}
global.sources++;
global_unlock();
stats_event_inc(NULL, "sources");
contenttype = httpp_getvar(parser, "content-type");
if (contenttype != NULL) {
format_type_t format = format_get_type(contenttype);
if (format < 0) {
WARN1("Content-type \"%s\" not supported, dropping source", contenttype);
return 0;
} else {
source = source_create(con, parser, mount, format);
}
} else {
WARN0("No content-type header, cannot handle source");
return 0;
}
source->shutdown_rwlock = &_source_shutdown_rwlock;
sock_set_blocking(con->sock, SOCK_NONBLOCK);
thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED);
return 1;
}
static void *_handle_connection(void *arg) static void *_handle_connection(void *arg)
{ {
char header[4096]; char header[4096];
@ -327,8 +366,6 @@ static void *_handle_connection(void *arg)
} }
if (parser->req_type == httpp_req_source) { if (parser->req_type == httpp_req_source) {
char *contenttype;
printf("DEBUG: source logging in\n"); printf("DEBUG: source logging in\n");
stats_event_inc(NULL, "source_connections"); stats_event_inc(NULL, "source_connections");
@ -355,47 +392,11 @@ static void *_handle_connection(void *arg)
} }
avl_tree_unlock(global.source_tree); avl_tree_unlock(global.source_tree);
/* check to make sure this source wouldn't if (!connection_create_source(con, parser, httpp_getvar(parser, HTTPP_VAR_URI))) {
** be over the limit
*/
global_lock();
if (global.sources >= config_get_config()->source_limit) {
printf("TOO MANY SOURCE, KICKING THIS ONE\n");
INFO1("Source (%s) logged in, but there are too many sources", httpp_getvar(parser, HTTPP_VAR_URI));
connection_close(con); connection_close(con);
httpp_destroy(parser); httpp_destroy(parser);
global_unlock();
continue;
}
global.sources++;
global_unlock();
stats_event_inc(NULL, "sources");
contenttype = httpp_getvar(parser, "content-type");
if (contenttype != NULL) {
format_type_t format = format_get_type(contenttype);
if (format < 0) {
WARN1("Content-type \"%s\" not supported, dropping source", contenttype);
connection_close(con);
httpp_destroy(parser);
continue;
} else {
source = source_create(con, parser, httpp_getvar(parser, HTTPP_VAR_URI), format);
}
} else {
WARN0("No content-type header, cannot handle source");
connection_close(con);
httpp_destroy(parser);
continue;
} }
source->shutdown_rwlock = &_source_shutdown_rwlock;
sock_set_blocking(con->sock, SOCK_NONBLOCK);
thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED);
continue; continue;
} else if (parser->req_type == httpp_req_stats) { } else if (parser->req_type == httpp_req_stats) {
printf("DEBUG: stats connection...\n"); printf("DEBUG: stats connection...\n");
@ -438,6 +439,32 @@ static void *_handle_connection(void *arg)
stats_sendxml(client); stats_sendxml(client);
continue; continue;
} }
if (strcmp(httpp_getvar(parser, HTTPP_VAR_URI), "/allstreams.txt") == 0) {
if (strcmp((httpp_getvar(parser, "ice-password") != NULL) ? httpp_getvar(parser, "ice-password") : "", (config_get_config()->source_password != NULL) ? config_get_config()->source_password : "") != 0) {
printf("DEBUG: bad password for allstreams.txt\n");
INFO0("Client attempted to fetch allstreams.txt with bad password");
if (parser->req_type == httpp_req_get) {
client->respcode = 404;
bytes = sock_write(client->con->sock, "HTTP/1.0 404 Source Not Found\r\nContent-Type: text/html\r\n\r\n"\
"<b>The source you requested could not be found.</b>\r\n");
if (bytes > 0) client->con->sent_bytes = bytes;
}
} else {
avl_node *node;
source_t *s;
avl_tree_rlock(global.source_tree);
node = avl_get_first(global.source_tree);
while (node) {
s = (source_t *)node->key;
sock_write(client->con->sock, "%s\r\n", s->mount);
node = avl_get_next(node);
}
avl_tree_unlock(global.source_tree);
}
client_destroy(client);
continue;
}
global_lock(); global_lock();
if (global.clients >= config_get_config()->client_limit) { if (global.clients >= config_get_config()->client_limit) {

View File

@ -3,6 +3,9 @@
#include <sys/types.h> #include <sys/types.h>
#include "compat.h" #include "compat.h"
#include "httpp.h"
#include "thread.h"
#include "sock.h"
typedef struct connection_tag typedef struct connection_tag
{ {
@ -22,5 +25,10 @@ void connection_initialize(void);
void connection_shutdown(void); void connection_shutdown(void);
void connection_accept_loop(void); void connection_accept_loop(void);
void connection_close(connection_t *con); void connection_close(connection_t *con);
connection_t *create_connection(sock_t sock, char *ip);
int connection_create_source(connection_t *con, http_parser_t *parser,
char *mount);
extern rwlock_t _source_shutdown_rwlock;
#endif /* __CONNECTION_H__ */ #endif /* __CONNECTION_H__ */

View File

@ -6,6 +6,8 @@
#define ICE_RUNNING 1 #define ICE_RUNNING 1
#define ICE_HALTING 2 #define ICE_HALTING 2
#include "thread/thread.h"
typedef struct ice_global_tag typedef struct ice_global_tag
{ {
int serversock; int serversock;
@ -16,6 +18,8 @@ typedef struct ice_global_tag
int clients; int clients;
avl_tree *source_tree; avl_tree *source_tree;
cond_t shutdown_cond;
} ice_global_t; } ice_global_t;
extern ice_global_t global; extern ice_global_t global;

View File

@ -49,33 +49,12 @@ void httpp_initialize(http_parser_t *parser, http_varlist_t *defaults)
} }
} }
int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len) static int split_headers(char *data, unsigned long len, char **line)
{ {
char *data, *tmp;
char *line[MAX_HEADERS]; /* limited to 32 lines, should be more than enough */
int i, l, retlen;
int lines;
char *req_type = NULL;
char *uri = NULL;
char *version = NULL;
char *name = NULL;
char *value = NULL;
int whitespace, where;
int slen;
if (http_data == NULL)
return 0;
/* make a local copy of the data, including 0 terminator */
data = (char *)malloc(len+1);
if (data == NULL) return 0;
memcpy(data, http_data, len);
data[len] = 0;
/* first we count how many lines there are /* first we count how many lines there are
** and set up the line[] array ** and set up the line[] array
*/ */
lines = 0; int i, lines = 0;
line[lines] = data; line[lines] = data;
for (i = 0; i < len && lines < MAX_HEADERS; i++) { for (i = 0; i < len && lines < MAX_HEADERS; i++) {
if (data[i] == '\r') if (data[i] == '\r')
@ -93,7 +72,134 @@ int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len)
i++; i++;
while (data[i] == '\n') i++; while (data[i] == '\n') i++;
retlen = i;
return lines;
}
static void parse_headers(http_parser_t *parser, char **line, int lines)
{
int i,l;
int whitespace, where, slen;
char *name = NULL;
char *value = NULL;
/* parse the name: value lines. */
for (l = 1; l < lines; l++) {
where = 0;
whitespace = 0;
name = line[l];
value = NULL;
slen = strlen(line[l]);
for (i = 0; i < slen; i++) {
if (line[l][i] == ':') {
whitespace = 1;
line[l][i] = '\0';
} else {
if (whitespace) {
whitespace = 0;
while (i < slen && line[l][i] == ' ')
i++;
if (i < slen)
value = &line[l][i];
break;
}
}
}
if (name != NULL && value != NULL) {
httpp_setvar(parser, _lowercase(name), value);
name = NULL;
value = NULL;
}
}
}
int httpp_parse_response(http_parser_t *parser, char *http_data, unsigned long len, char *uri)
{
char *data;
char *line[MAX_HEADERS];
int lines, slen,i, whitespace=0, where=0,code;
char *version=NULL, *resp_code=NULL, *message=NULL;
if(http_data == NULL)
return 0;
/* make a local copy of the data, including 0 terminator */
data = (char *)malloc(len+1);
if (data == NULL) return 0;
memcpy(data, http_data, len);
data[len] = 0;
lines = split_headers(data, len, line);
/* In this case, the first line contains:
* VERSION RESPONSE_CODE MESSAGE, such as
* HTTP/1.0 200 OK
*/
slen = strlen(line[0]);
version = line[0];
for(i=0; i < slen; i++) {
if(line[0][i] == ' ') {
line[0][i] = 0;
whitespace = 1;
}
else if(whitespace) {
whitespace = 0;
where++;
if(where == 1)
resp_code = &line[0][i];
else {
message = &line[0][i];
break;
}
}
}
if(version == NULL || resp_code == NULL || message == NULL) {
free(data);
return 0;
}
code = atoi(resp_code);
if(code < 200 || code >= 300) {
httpp_setvar(parser, HTTPP_VAR_ERROR_MESSAGE, message);
free(data);
return 0;
}
httpp_setvar(parser, HTTPP_VAR_URI, uri);
httpp_setvar(parser, HTTPP_VAR_REQ_TYPE, "RELAY");
parse_headers(parser, line, lines);
free(data);
return 1;
}
int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len)
{
char *data, *tmp;
char *line[MAX_HEADERS]; /* limited to 32 lines, should be more than enough */
int i;
int lines;
char *req_type = NULL;
char *uri = NULL;
char *version = NULL;
int whitespace, where, slen;
if (http_data == NULL)
return 0;
/* make a local copy of the data, including 0 terminator */
data = (char *)malloc(len+1);
if (data == NULL) return 0;
memcpy(data, http_data, len);
data[len] = 0;
lines = split_headers(data, len, line);
/* parse the first line special /* parse the first line special
** the format is: ** the format is:
@ -189,48 +295,18 @@ int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len)
return 0; return 0;
} }
if (parser->uri != NULL) { if (parser->uri != NULL) {
httpp_setvar(parser, HTTPP_VAR_URI, parser->uri); httpp_setvar(parser, HTTPP_VAR_URI, parser->uri);
} else { } else {
free(data); free(data);
return 0; return 0;
} }
/* parse the name: value lines. */ parse_headers(parser, line, lines);
for (l = 1; l < lines; l++) {
where = 0;
whitespace = 0;
name = line[l];
value = NULL;
slen = strlen(line[l]);
for (i = 0; i < slen; i++) {
if (line[l][i] == ':') {
whitespace = 1;
line[l][i] = '\0';
} else {
if (whitespace) {
whitespace = 0;
while (i < slen && line[l][i] == ' ')
i++;
if (i < slen)
value = &line[l][i];
break;
}
}
}
if (name != NULL && value != NULL) {
httpp_setvar(parser, _lowercase(name), value);
name = NULL;
value = NULL;
}
}
free(data); free(data);
return retlen; return 1;
} }
void httpp_setvar(http_parser_t *parser, char *name, char *value) void httpp_setvar(http_parser_t *parser, char *name, char *value)

View File

@ -12,6 +12,7 @@
#define HTTPP_VAR_VERSION "__version" #define HTTPP_VAR_VERSION "__version"
#define HTTPP_VAR_URI "__uri" #define HTTPP_VAR_URI "__uri"
#define HTTPP_VAR_REQ_TYPE "__req_type" #define HTTPP_VAR_REQ_TYPE "__req_type"
#define HTTPP_VAR_ERROR_MESSAGE "__errormessage"
typedef enum httpp_request_type_tag { typedef enum httpp_request_type_tag {
httpp_req_none, httpp_req_get, httpp_req_post, httpp_req_head, httpp_req_source, httpp_req_play, httpp_req_stats, httpp_req_unknown httpp_req_none, httpp_req_get, httpp_req_post, httpp_req_head, httpp_req_source, httpp_req_play, httpp_req_stats, httpp_req_unknown
@ -30,13 +31,13 @@ typedef struct http_varlist_tag {
typedef struct http_parser_tag { typedef struct http_parser_tag {
httpp_request_type_e req_type; httpp_request_type_e req_type;
char *uri; char *uri;
avl_tree *vars; avl_tree *vars;
} http_parser_t; } http_parser_t;
http_parser_t *httpp_create_parser(void); http_parser_t *httpp_create_parser(void);
void httpp_initialize(http_parser_t *parser, http_varlist_t *defaults); void httpp_initialize(http_parser_t *parser, http_varlist_t *defaults);
int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len); int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len);
int httpp_parse_response(http_parser_t *parser, char *http_data, unsigned long len, char *uri);
void httpp_setvar(http_parser_t *parser, char *name, char *value); void httpp_setvar(http_parser_t *parser, char *name, char *value);
char *httpp_getvar(http_parser_t *parser, char *name); char *httpp_getvar(http_parser_t *parser, char *name);
void httpp_destroy(http_parser_t *parser); void httpp_destroy(http_parser_t *parser);

View File

@ -23,6 +23,7 @@
#include "connection.h" #include "connection.h"
#include "refbuf.h" #include "refbuf.h"
#include "client.h" #include "client.h"
#include "slave.h"
#include "stats.h" #include "stats.h"
#include "logging.h" #include "logging.h"
@ -56,6 +57,7 @@ static void _shutdown_subsystems(void)
{ {
refbuf_shutdown(); refbuf_shutdown();
stats_shutdown(); stats_shutdown();
slave_shutdown();
global_shutdown(); global_shutdown();
connection_shutdown(); connection_shutdown();
config_shutdown(); config_shutdown();
@ -316,6 +318,9 @@ int main(int argc, char **argv)
return 1; return 1;
} }
/* Do this after logging init */
slave_initialize();
INFO0("icecast server started"); INFO0("icecast server started");
/* REM 3D Graphics */ /* REM 3D Graphics */

View File

@ -127,6 +127,7 @@ static char *_lookup(const char *what, char *buff, int len)
if (host == NULL) { if (host == NULL) {
buff = NULL; buff = NULL;
} else { } else {
// still need to be locked here?
temp = inet_ntoa(*(struct in_addr *)host->h_addr); temp = inet_ntoa(*(struct in_addr *)host->h_addr);
strncpy(buff, temp, len); strncpy(buff, temp, len);
} }

135
src/slave.c Normal file
View File

@ -0,0 +1,135 @@
/* slave.c
* by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
*
* Periodically requests a list of streams from a master server
* and creates source threads for any it doesn't already have.
* */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>
#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#else
#include <winsock2.h>
#define snprintf _snprintf
#define strcasecmp stricmp
#define strncasecmp strnicmp
#endif
#include "os.h"
#include "thread.h"
#include "avl.h"
#include "sock.h"
#include "log.h"
#include "httpp.h"
#include "config.h"
#include "global.h"
#include "util.h"
#include "connection.h"
#include "refbuf.h"
#include "client.h"
#include "stats.h"
#include "format.h"
#include "logging.h"
#include "source.h"
#define CATMODULE "slave"
static void *_slave_thread(void *arg);
long _slave_thread_id;
static int _initialized = 0;
void slave_initialize(void) {
if (_initialized) return;
/* Don't create a slave thread if it isn't configured */
if (config_get_config()->master_server == NULL)
return;
_initialized = 1;
_slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED);
}
void slave_shutdown(void) {
if (!_initialized) return;
_initialized = 0;
thread_join(_slave_thread_id);
}
static void *_slave_thread(void *arg) {
sock_t mastersock, streamsock;
char buf[256];
char header[4096];
connection_t *con;
http_parser_t *parser;
int interval = config_get_config()->master_update_interval;
while (_initialized) {
if (config_get_config()->master_update_interval > ++interval) {
thread_sleep(1000000);
continue;
}
else
interval = 0;
mastersock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
if (mastersock == SOCK_ERROR) {
printf("DEBUG: failed to contact master server\n");
continue;
}
sock_write(mastersock, "GET /allstreams.txt HTTP/1.0\r\nice-password: %s\r\n\r\n", config_get_config()->source_password);
while (sock_read_line(mastersock, buf, sizeof(buf))) {
buf[strlen(buf)] = 0;
avl_tree_rlock(global.source_tree);
if (!source_find_mount(buf)) {
avl_tree_unlock(global.source_tree);
printf("DEBUG: adding source for %s\n", buf);
streamsock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0);
if (streamsock == SOCK_ERROR) {
printf("DEBUG: failed to get stream from master server\n");
continue;
}
con = create_connection(streamsock, NULL);
sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", buf);
memset(header, 0, sizeof(header));
if (util_read_header(con->sock, header, 4096) == 0) {
connection_close(con);
continue;
}
parser = httpp_create_parser();
httpp_initialize(parser, NULL);
if(!httpp_parse_response(parser, header, strlen(header), buf)) {
if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) {
ERROR1("Error parsing relay request: %s",
httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
}
else
ERROR0("Error parsing relay request");
connection_close(con);
httpp_destroy(parser);
continue;
}
if (!connection_create_source(con, parser,
httpp_getvar(parser, HTTPP_VAR_URI))) {
connection_close(con);
httpp_destroy(parser);
}
continue;
}
avl_tree_unlock(global.source_tree);
}
sock_close(mastersock);
}
thread_exit(0);
return NULL;
}

7
src/slave.h Normal file
View File

@ -0,0 +1,7 @@
#ifndef __SLAVE_H__
#define __SLAVE_H__
void slave_initialize(void);
void slave_shutdown(void);
#endif /* __SLAVE_H__ */

View File

@ -373,7 +373,7 @@ done:
stats_event_dec(NULL, "sources"); stats_event_dec(NULL, "sources");
stats_event(source->mount, "listeners", NULL); stats_event(source->mount, "listeners", NULL);
printf("DEBUG: souce_main() is now exiting...\n"); printf("DEBUG: source_main() is now exiting...\n");
global_lock(); global_lock();
global.sources--; global.sources--;
@ -394,7 +394,7 @@ done:
static int _compare_clients(void *compare_arg, void *a, void *b) static int _compare_clients(void *compare_arg, void *a, void *b)
{ {
connection_t *cona = (connection_t *)a; connection_t *cona = (connection_t *)a;
connection_t *conb = (connection_t *)b; connection_t *conb = (connection_t *)b;
if (cona->id < conb->id) return -1; if (cona->id < conb->id) return -1;
if (cona->id > conb->id) return 1; if (cona->id > conb->id) return 1;

View File

@ -465,6 +465,18 @@ void thread_cond_broadcast_c(cond_t *cond, int line, char *file)
pthread_cond_broadcast(&cond->sys_cond); pthread_cond_broadcast(&cond->sys_cond);
} }
void thread_cond_timedwait_c(cond_t *cond, int millis, int line, char *file)
{
struct timespec time;
time.tv_sec = millis/1000;
time.tv_nsec = (millis - time.tv_sec*1000)*1000000;
pthread_mutex_lock(&cond->cond_mutex);
pthread_cond_timedwait(&cond->sys_cond, &cond->cond_mutex, &time);
pthread_mutex_unlock(&cond->cond_mutex);
}
void thread_cond_wait_c(cond_t *cond, int line, char *file) void thread_cond_wait_c(cond_t *cond, int line, char *file)
{ {
pthread_mutex_lock(&cond->cond_mutex); pthread_mutex_lock(&cond->cond_mutex);
@ -472,6 +484,8 @@ void thread_cond_wait_c(cond_t *cond, int line, char *file)
pthread_mutex_unlock(&cond->cond_mutex); pthread_mutex_unlock(&cond->cond_mutex);
} }
static int rwlocknum = 0;
void thread_rwlock_create_c(rwlock_t *rwlock, int line, char *file) void thread_rwlock_create_c(rwlock_t *rwlock, int line, char *file)
{ {
pthread_rwlock_init(&rwlock->sys_rwlock, NULL); pthread_rwlock_init(&rwlock->sys_rwlock, NULL);

View File

@ -86,6 +86,7 @@ typedef struct rwlock_tag {
#define thread_cond_signal(x) thread_cond_signal_c(x,__LINE__,__FILE__) #define thread_cond_signal(x) thread_cond_signal_c(x,__LINE__,__FILE__)
#define thread_cond_broadcast(x) thread_cond_broadcast_c(x,__LINE__,__FILE__) #define thread_cond_broadcast(x) thread_cond_broadcast_c(x,__LINE__,__FILE__)
#define thread_cond_wait(x) thread_cond_wait_c(x,__LINE__,__FILE__) #define thread_cond_wait(x) thread_cond_wait_c(x,__LINE__,__FILE__)
#define thread_cond_timedwait(x,t) thread_cond_wait_c(x,t,__LINE__,__FILE__)
#define thread_rwlock_create(x) thread_rwlock_create_c(x,__LINE__,__FILE__) #define thread_rwlock_create(x) thread_rwlock_create_c(x,__LINE__,__FILE__)
#define thread_rwlock_rlock(x) thread_rwlock_rlock_c(x,__LINE__,__FILE__) #define thread_rwlock_rlock(x) thread_rwlock_rlock_c(x,__LINE__,__FILE__)
#define thread_rwlock_wlock(x) thread_rwlock_wlock_c(x,__LINE__,__FILE__) #define thread_rwlock_wlock(x) thread_rwlock_wlock_c(x,__LINE__,__FILE__)
@ -113,6 +114,7 @@ void thread_cond_create_c(cond_t *cond, int line, char *file);
void thread_cond_signal_c(cond_t *cond, int line, char *file); void thread_cond_signal_c(cond_t *cond, int line, char *file);
void thread_cond_broadcast_c(cond_t *cond, int line, char *file); void thread_cond_broadcast_c(cond_t *cond, int line, char *file);
void thread_cond_wait_c(cond_t *cond, int line, char *file); void thread_cond_wait_c(cond_t *cond, int line, char *file);
void thread_cond_timedwait_c(cond_t *cond, int millis, int line, char *file);
void thread_cond_destroy(cond_t *cond); void thread_cond_destroy(cond_t *cond);
void thread_rwlock_create_c(rwlock_t *rwlock, int line, char *file); void thread_rwlock_create_c(rwlock_t *rwlock, int line, char *file);
void thread_rwlock_rlock_c(rwlock_t *rwlock, int line, char *file); void thread_rwlock_rlock_c(rwlock_t *rwlock, int line, char *file);