diff --git a/TODO b/TODO index 728cda33..cab86b66 100644 --- a/TODO +++ b/TODO @@ -27,3 +27,7 @@ FEATURES - support W3C Extended Logging (http://www.w3.org/TR/WD-logfile.html) toggle between this and Apache Combined Log Format in the config file. default to apache style. + +- allow using get_predata() stuff to send an "intro" to any newly-connected + user? + diff --git a/conf/icecast.xml b/conf/icecast.xml index f98dec37..458c20b1 100644 --- a/conf/icecast.xml +++ b/conf/icecast.xml @@ -3,12 +3,12 @@ jack@icecast.org - 100 + 2000 2 5 30 15 - 10 + 100 hackme @@ -25,8 +25,12 @@ 8000 + + + + - /usr/local/icecast + /home/msmith/icecast/icecast /tmp @@ -37,9 +41,9 @@ 0 - + --> diff --git a/src/Makefile.am b/src/Makefile.am index b3008350..d89e1fb2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,10 +7,10 @@ SUBDIRS = avl thread httpp net log timing bin_PROGRAMS = icecast noinst_HEADERS = config.h os.h logging.h sighandler.h connection.h global.h\ - util.h source.h stats.h refbuf.h client.h format.h format_vorbis.h\ + util.h slave.h source.h stats.h refbuf.h client.h format.h format_vorbis.h\ compat.h format_mp3.h icecast_SOURCES = config.c main.c logging.c sighandler.c connection.c global.c\ - util.c source.c stats.c refbuf.c client.c format.c format_vorbis.c\ + util.c slave.c source.c stats.c refbuf.c client.c format.c format_vorbis.c\ format_mp3.c icecast_LDADD = net/libicenet.la thread/libicethread.la httpp/libicehttpp.la\ diff --git a/src/config.c b/src/config.c index 7a8b7ed6..8f403024 100644 --- a/src/config.c +++ b/src/config.c @@ -23,6 +23,7 @@ #define CONFIG_DEFAULT_CHUID 0 #define CONFIG_DEFAULT_USER NULL #define CONFIG_DEFAULT_GROUP NULL +#define CONFIG_MASTER_UPDATE_INTERVAL 120 #ifndef _WIN32 #define CONFIG_DEFAULT_BASE_DIR "/usr/local/icecast" @@ -141,6 +142,9 @@ static void _set_defaults(void) _configuration.hostname = (char *)strdup(CONFIG_DEFAULT_HOSTNAME); _configuration.port = CONFIG_DEFAULT_PORT; _configuration.bind_address = NULL; + _configuration.master_server = NULL; + _configuration.master_server_port = CONFIG_DEFAULT_PORT; + _configuration.master_update_interval = CONFIG_MASTER_UPDATE_INTERVAL; _configuration.base_dir = (char *)strdup(CONFIG_DEFAULT_BASE_DIR); _configuration.log_dir = (char *)strdup(CONFIG_DEFAULT_LOG_DIR); _configuration.access_log = (char *)strdup(CONFIG_DEFAULT_ACCESS_LOG); @@ -166,8 +170,15 @@ static void _parse_root(xmlDocPtr doc, xmlNodePtr node) if (_configuration.admin) free(_configuration.admin); _configuration.admin = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); } else if (strcmp(node->name, "source-password") == 0) { - if (_configuration.source_password) free(_configuration.source_password); - _configuration.source_password = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + char *mount, *pass; + if ((mount = (char *)xmlGetProp(node, "mount")) != NULL) { + pass = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + /* FIXME: This is a placeholder for per-mount passwords */ + } + else { + if (_configuration.source_password) free(_configuration.source_password); + _configuration.source_password = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } } else if (strcmp(node->name, "hostname") == 0) { if (_configuration.hostname) free(_configuration.hostname); _configuration.hostname = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); @@ -178,6 +189,15 @@ static void _parse_root(xmlDocPtr doc, xmlNodePtr node) } else if (strcmp(node->name, "bind-address") == 0) { if (_configuration.bind_address) free(_configuration.bind_address); _configuration.bind_address = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "master-server") == 0) { + if (_configuration.master_server) free(_configuration.master_server); + _configuration.master_server = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + } else if (strcmp(node->name, "master-server-port") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.master_server_port = atoi(tmp); + } else if (strcmp(node->name, "master-update-interval") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + _configuration.master_update_interval = atoi(tmp); } else if (strcmp(node->name, "limits") == 0) { _parse_limits(doc, node->xmlChildrenNode); } else if (strcmp(node->name, "directory") == 0) { diff --git a/src/config.h b/src/config.h index d9b29933..37fd87f3 100644 --- a/src/config.h +++ b/src/config.h @@ -33,6 +33,9 @@ typedef struct ice_config_tag char *hostname; int port; char *bind_address; + char *master_server; + int master_server_port; + int master_update_interval; char *base_dir; char *log_dir; diff --git a/src/connection.c b/src/connection.c index 2bdff0ac..42aeb59d 100644 --- a/src/connection.c +++ b/src/connection.c @@ -57,7 +57,7 @@ static mutex_t _queue_mutex; static thread_queue_t *_conhands = NULL; -static rwlock_t _source_shutdown_rwlock; +rwlock_t _source_shutdown_rwlock; static void *_handle_connection(void *arg); @@ -69,6 +69,7 @@ void connection_initialize(void) thread_mutex_create(&_queue_mutex); thread_rwlock_create(&_source_shutdown_rwlock); thread_cond_create(&_pool_cond); + thread_cond_create(&global.shutdown_cond); _initialized = 1; } @@ -77,6 +78,7 @@ void connection_shutdown(void) { if (!_initialized) return; + thread_cond_destroy(&global.shutdown_cond); thread_cond_destroy(&_pool_cond); thread_rwlock_destroy(&_source_shutdown_rwlock); thread_mutex_destroy(&_queue_mutex); @@ -85,16 +87,6 @@ void connection_shutdown(void) _initialized = 0; } -static connection_t *_create_connection(void) -{ - connection_t *con; - - con = (connection_t *)malloc(sizeof(connection_t)); - memset(con, 0, sizeof(connection_t)); - - return con; -} - static unsigned long _next_connection_id(void) { unsigned long id; @@ -106,6 +98,17 @@ static unsigned long _next_connection_id(void) return id; } +connection_t *create_connection(sock_t sock, char *ip) { + connection_t *con; + con = (connection_t *)malloc(sizeof(connection_t)); + memset(con, 0, sizeof(connection_t)); + con->sock = sock; + con->con_time = time(NULL); + con->id = _next_connection_id(); + con->ip = ip; + return con; +} + static connection_t *_accept_connection(void) { int sock; @@ -121,12 +124,7 @@ static connection_t *_accept_connection(void) sock = sock_accept(global.serversock, ip, 16); if (sock >= 0) { - con = _create_connection(); - - con->sock = sock; - con->con_time = time(NULL); - con->id = _next_connection_id(); - con->ip = ip; + con = create_connection(sock, ip); return con; } @@ -246,6 +244,9 @@ void connection_accept_loop(void) } } + /* Give all the other threads notification to shut down */ + thread_cond_broadcast(&global.shutdown_cond); + _destroy_pool(); /* wait for all the sources to shutdown */ @@ -283,6 +284,44 @@ static connection_t *_get_connection(void) return con; } +int connection_create_source(connection_t *con, http_parser_t *parser, char *mount) { + source_t *source; + char *contenttype; + /* check to make sure this source wouldn't + ** be over the limit + */ + global_lock(); + if (global.sources >= config_get_config()->source_limit) { + printf("TOO MANY SOURCE, KICKING THIS ONE\n"); + INFO1("Source (%s) logged in, but there are too many sources", mount); + global_unlock(); + return 0; + } + global.sources++; + global_unlock(); + + stats_event_inc(NULL, "sources"); + + contenttype = httpp_getvar(parser, "content-type"); + + if (contenttype != NULL) { + format_type_t format = format_get_type(contenttype); + if (format < 0) { + WARN1("Content-type \"%s\" not supported, dropping source", contenttype); + return 0; + } else { + source = source_create(con, parser, mount, format); + } + } else { + WARN0("No content-type header, cannot handle source"); + return 0; + } + source->shutdown_rwlock = &_source_shutdown_rwlock; + sock_set_blocking(con->sock, SOCK_NONBLOCK); + thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED); + return 1; +} + static void *_handle_connection(void *arg) { char header[4096]; @@ -327,8 +366,6 @@ static void *_handle_connection(void *arg) } if (parser->req_type == httpp_req_source) { - char *contenttype; - printf("DEBUG: source logging in\n"); stats_event_inc(NULL, "source_connections"); @@ -355,47 +392,11 @@ static void *_handle_connection(void *arg) } avl_tree_unlock(global.source_tree); - /* check to make sure this source wouldn't - ** be over the limit - */ - global_lock(); - if (global.sources >= config_get_config()->source_limit) { - printf("TOO MANY SOURCE, KICKING THIS ONE\n"); - INFO1("Source (%s) logged in, but there are too many sources", httpp_getvar(parser, HTTPP_VAR_URI)); + if (!connection_create_source(con, parser, httpp_getvar(parser, HTTPP_VAR_URI))) { connection_close(con); httpp_destroy(parser); - global_unlock(); - continue; - } - global.sources++; - global_unlock(); - - stats_event_inc(NULL, "sources"); - - contenttype = httpp_getvar(parser, "content-type"); - - if (contenttype != NULL) { - format_type_t format = format_get_type(contenttype); - if (format < 0) { - WARN1("Content-type \"%s\" not supported, dropping source", contenttype); - connection_close(con); - httpp_destroy(parser); - continue; - } else { - source = source_create(con, parser, httpp_getvar(parser, HTTPP_VAR_URI), format); - } - } else { - WARN0("No content-type header, cannot handle source"); - connection_close(con); - httpp_destroy(parser); - continue; } - source->shutdown_rwlock = &_source_shutdown_rwlock; - - sock_set_blocking(con->sock, SOCK_NONBLOCK); - - thread_create("Source Thread", source_main, (void *)source, THREAD_DETACHED); continue; } else if (parser->req_type == httpp_req_stats) { printf("DEBUG: stats connection...\n"); @@ -438,6 +439,32 @@ static void *_handle_connection(void *arg) stats_sendxml(client); continue; } + + if (strcmp(httpp_getvar(parser, HTTPP_VAR_URI), "/allstreams.txt") == 0) { + if (strcmp((httpp_getvar(parser, "ice-password") != NULL) ? httpp_getvar(parser, "ice-password") : "", (config_get_config()->source_password != NULL) ? config_get_config()->source_password : "") != 0) { + printf("DEBUG: bad password for allstreams.txt\n"); + INFO0("Client attempted to fetch allstreams.txt with bad password"); + if (parser->req_type == httpp_req_get) { + client->respcode = 404; + bytes = sock_write(client->con->sock, "HTTP/1.0 404 Source Not Found\r\nContent-Type: text/html\r\n\r\n"\ + "The source you requested could not be found.\r\n"); + if (bytes > 0) client->con->sent_bytes = bytes; + } + } else { + avl_node *node; + source_t *s; + avl_tree_rlock(global.source_tree); + node = avl_get_first(global.source_tree); + while (node) { + s = (source_t *)node->key; + sock_write(client->con->sock, "%s\r\n", s->mount); + node = avl_get_next(node); + } + avl_tree_unlock(global.source_tree); + } + client_destroy(client); + continue; + } global_lock(); if (global.clients >= config_get_config()->client_limit) { diff --git a/src/connection.h b/src/connection.h index 75e367ef..2d617f9f 100644 --- a/src/connection.h +++ b/src/connection.h @@ -3,6 +3,9 @@ #include #include "compat.h" +#include "httpp.h" +#include "thread.h" +#include "sock.h" typedef struct connection_tag { @@ -22,5 +25,10 @@ void connection_initialize(void); void connection_shutdown(void); void connection_accept_loop(void); void connection_close(connection_t *con); +connection_t *create_connection(sock_t sock, char *ip); +int connection_create_source(connection_t *con, http_parser_t *parser, + char *mount); + +extern rwlock_t _source_shutdown_rwlock; #endif /* __CONNECTION_H__ */ diff --git a/src/global.h b/src/global.h index e15ca8c4..65ba9eec 100644 --- a/src/global.h +++ b/src/global.h @@ -6,6 +6,8 @@ #define ICE_RUNNING 1 #define ICE_HALTING 2 +#include "thread/thread.h" + typedef struct ice_global_tag { int serversock; @@ -16,6 +18,8 @@ typedef struct ice_global_tag int clients; avl_tree *source_tree; + + cond_t shutdown_cond; } ice_global_t; extern ice_global_t global; diff --git a/src/httpp/httpp.c b/src/httpp/httpp.c index aeb93469..cae45c57 100644 --- a/src/httpp/httpp.c +++ b/src/httpp/httpp.c @@ -49,33 +49,12 @@ void httpp_initialize(http_parser_t *parser, http_varlist_t *defaults) } } -int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len) +static int split_headers(char *data, unsigned long len, char **line) { - char *data, *tmp; - char *line[MAX_HEADERS]; /* limited to 32 lines, should be more than enough */ - int i, l, retlen; - int lines; - char *req_type = NULL; - char *uri = NULL; - char *version = NULL; - char *name = NULL; - char *value = NULL; - int whitespace, where; - int slen; - - if (http_data == NULL) - return 0; - - /* make a local copy of the data, including 0 terminator */ - data = (char *)malloc(len+1); - if (data == NULL) return 0; - memcpy(data, http_data, len); - data[len] = 0; - /* first we count how many lines there are ** and set up the line[] array */ - lines = 0; + int i, lines = 0; line[lines] = data; for (i = 0; i < len && lines < MAX_HEADERS; i++) { if (data[i] == '\r') @@ -93,7 +72,134 @@ int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len) i++; while (data[i] == '\n') i++; - retlen = i; + + return lines; +} + +static void parse_headers(http_parser_t *parser, char **line, int lines) +{ + int i,l; + int whitespace, where, slen; + char *name = NULL; + char *value = NULL; + + /* parse the name: value lines. */ + for (l = 1; l < lines; l++) { + where = 0; + whitespace = 0; + name = line[l]; + value = NULL; + slen = strlen(line[l]); + for (i = 0; i < slen; i++) { + if (line[l][i] == ':') { + whitespace = 1; + line[l][i] = '\0'; + } else { + if (whitespace) { + whitespace = 0; + while (i < slen && line[l][i] == ' ') + i++; + + if (i < slen) + value = &line[l][i]; + + break; + } + } + } + + if (name != NULL && value != NULL) { + httpp_setvar(parser, _lowercase(name), value); + name = NULL; + value = NULL; + } + } +} + +int httpp_parse_response(http_parser_t *parser, char *http_data, unsigned long len, char *uri) +{ + char *data; + char *line[MAX_HEADERS]; + int lines, slen,i, whitespace=0, where=0,code; + char *version=NULL, *resp_code=NULL, *message=NULL; + + if(http_data == NULL) + return 0; + + /* make a local copy of the data, including 0 terminator */ + data = (char *)malloc(len+1); + if (data == NULL) return 0; + memcpy(data, http_data, len); + data[len] = 0; + + lines = split_headers(data, len, line); + + /* In this case, the first line contains: + * VERSION RESPONSE_CODE MESSAGE, such as + * HTTP/1.0 200 OK + */ + slen = strlen(line[0]); + version = line[0]; + for(i=0; i < slen; i++) { + if(line[0][i] == ' ') { + line[0][i] = 0; + whitespace = 1; + } + else if(whitespace) { + whitespace = 0; + where++; + if(where == 1) + resp_code = &line[0][i]; + else { + message = &line[0][i]; + break; + } + } + } + + if(version == NULL || resp_code == NULL || message == NULL) { + free(data); + return 0; + } + + code = atoi(resp_code); + if(code < 200 || code >= 300) { + httpp_setvar(parser, HTTPP_VAR_ERROR_MESSAGE, message); + free(data); + return 0; + } + + httpp_setvar(parser, HTTPP_VAR_URI, uri); + httpp_setvar(parser, HTTPP_VAR_REQ_TYPE, "RELAY"); + + parse_headers(parser, line, lines); + + free(data); + + return 1; +} + +int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len) +{ + char *data, *tmp; + char *line[MAX_HEADERS]; /* limited to 32 lines, should be more than enough */ + int i; + int lines; + char *req_type = NULL; + char *uri = NULL; + char *version = NULL; + int whitespace, where, slen; + + if (http_data == NULL) + return 0; + + /* make a local copy of the data, including 0 terminator */ + data = (char *)malloc(len+1); + if (data == NULL) return 0; + memcpy(data, http_data, len); + data[len] = 0; + + lines = split_headers(data, len, line); /* parse the first line special ** the format is: @@ -189,48 +295,18 @@ int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len) return 0; } - if (parser->uri != NULL) { + if (parser->uri != NULL) { httpp_setvar(parser, HTTPP_VAR_URI, parser->uri); } else { free(data); return 0; } - /* parse the name: value lines. */ - for (l = 1; l < lines; l++) { - where = 0; - whitespace = 0; - name = line[l]; - value = NULL; - slen = strlen(line[l]); - for (i = 0; i < slen; i++) { - if (line[l][i] == ':') { - whitespace = 1; - line[l][i] = '\0'; - } else { - if (whitespace) { - whitespace = 0; - while (i < slen && line[l][i] == ' ') - i++; - - if (i < slen) - value = &line[l][i]; - - break; - } - } - } - - if (name != NULL && value != NULL) { - httpp_setvar(parser, _lowercase(name), value); - name = NULL; - value = NULL; - } - } + parse_headers(parser, line, lines); free(data); - return retlen; + return 1; } void httpp_setvar(http_parser_t *parser, char *name, char *value) diff --git a/src/httpp/httpp.h b/src/httpp/httpp.h index 7e03f324..6901fa5e 100644 --- a/src/httpp/httpp.h +++ b/src/httpp/httpp.h @@ -12,6 +12,7 @@ #define HTTPP_VAR_VERSION "__version" #define HTTPP_VAR_URI "__uri" #define HTTPP_VAR_REQ_TYPE "__req_type" +#define HTTPP_VAR_ERROR_MESSAGE "__errormessage" typedef enum httpp_request_type_tag { httpp_req_none, httpp_req_get, httpp_req_post, httpp_req_head, httpp_req_source, httpp_req_play, httpp_req_stats, httpp_req_unknown @@ -30,13 +31,13 @@ typedef struct http_varlist_tag { typedef struct http_parser_tag { httpp_request_type_e req_type; char *uri; - avl_tree *vars; } http_parser_t; http_parser_t *httpp_create_parser(void); void httpp_initialize(http_parser_t *parser, http_varlist_t *defaults); int httpp_parse(http_parser_t *parser, char *http_data, unsigned long len); +int httpp_parse_response(http_parser_t *parser, char *http_data, unsigned long len, char *uri); void httpp_setvar(http_parser_t *parser, char *name, char *value); char *httpp_getvar(http_parser_t *parser, char *name); void httpp_destroy(http_parser_t *parser); diff --git a/src/main.c b/src/main.c index df36caeb..0eb4a61d 100644 --- a/src/main.c +++ b/src/main.c @@ -23,6 +23,7 @@ #include "connection.h" #include "refbuf.h" #include "client.h" +#include "slave.h" #include "stats.h" #include "logging.h" @@ -56,6 +57,7 @@ static void _shutdown_subsystems(void) { refbuf_shutdown(); stats_shutdown(); + slave_shutdown(); global_shutdown(); connection_shutdown(); config_shutdown(); @@ -316,6 +318,9 @@ int main(int argc, char **argv) return 1; } + /* Do this after logging init */ + slave_initialize(); + INFO0("icecast server started"); /* REM 3D Graphics */ diff --git a/src/net/resolver.c b/src/net/resolver.c index cc2b6af3..3711b217 100644 --- a/src/net/resolver.c +++ b/src/net/resolver.c @@ -127,6 +127,7 @@ static char *_lookup(const char *what, char *buff, int len) if (host == NULL) { buff = NULL; } else { + // still need to be locked here? temp = inet_ntoa(*(struct in_addr *)host->h_addr); strncpy(buff, temp, len); } diff --git a/src/slave.c b/src/slave.c new file mode 100644 index 00000000..f3041fb3 --- /dev/null +++ b/src/slave.c @@ -0,0 +1,135 @@ +/* slave.c + * by Ciaran Anscomb + * + * Periodically requests a list of streams from a master server + * and creates source threads for any it doesn't already have. + * */ + +#include +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#else +#include +#define snprintf _snprintf +#define strcasecmp stricmp +#define strncasecmp strnicmp +#endif + +#include "os.h" + +#include "thread.h" +#include "avl.h" +#include "sock.h" +#include "log.h" +#include "httpp.h" + +#include "config.h" +#include "global.h" +#include "util.h" +#include "connection.h" +#include "refbuf.h" +#include "client.h" +#include "stats.h" +#include "format.h" +#include "logging.h" + +#include "source.h" + +#define CATMODULE "slave" + +static void *_slave_thread(void *arg); +long _slave_thread_id; +static int _initialized = 0; + +void slave_initialize(void) { + if (_initialized) return; + /* Don't create a slave thread if it isn't configured */ + if (config_get_config()->master_server == NULL) + return; + + _initialized = 1; + _slave_thread_id = thread_create("Slave Thread", _slave_thread, NULL, THREAD_ATTACHED); +} + +void slave_shutdown(void) { + if (!_initialized) return; + _initialized = 0; + thread_join(_slave_thread_id); +} + +static void *_slave_thread(void *arg) { + sock_t mastersock, streamsock; + char buf[256]; + char header[4096]; + connection_t *con; + http_parser_t *parser; + int interval = config_get_config()->master_update_interval; + + while (_initialized) { + if (config_get_config()->master_update_interval > ++interval) { + thread_sleep(1000000); + continue; + } + else + interval = 0; + + mastersock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0); + if (mastersock == SOCK_ERROR) { + printf("DEBUG: failed to contact master server\n"); + continue; + } + sock_write(mastersock, "GET /allstreams.txt HTTP/1.0\r\nice-password: %s\r\n\r\n", config_get_config()->source_password); + while (sock_read_line(mastersock, buf, sizeof(buf))) { + buf[strlen(buf)] = 0; + avl_tree_rlock(global.source_tree); + if (!source_find_mount(buf)) { + avl_tree_unlock(global.source_tree); + printf("DEBUG: adding source for %s\n", buf); + streamsock = sock_connect_wto(config_get_config()->master_server, config_get_config()->master_server_port, 0); + if (streamsock == SOCK_ERROR) { + printf("DEBUG: failed to get stream from master server\n"); + continue; + } + con = create_connection(streamsock, NULL); + sock_write(streamsock, "GET %s HTTP/1.0\r\n\r\n", buf); + memset(header, 0, sizeof(header)); + if (util_read_header(con->sock, header, 4096) == 0) { + connection_close(con); + continue; + } + parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + if(!httpp_parse_response(parser, header, strlen(header), buf)) { + if(httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)) { + ERROR1("Error parsing relay request: %s", + httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); + } + else + ERROR0("Error parsing relay request"); + connection_close(con); + httpp_destroy(parser); + continue; + } + + if (!connection_create_source(con, parser, + httpp_getvar(parser, HTTPP_VAR_URI))) { + connection_close(con); + httpp_destroy(parser); + } + continue; + + } + avl_tree_unlock(global.source_tree); + } + sock_close(mastersock); + } + thread_exit(0); + return NULL; +} diff --git a/src/slave.h b/src/slave.h new file mode 100644 index 00000000..d05dff9d --- /dev/null +++ b/src/slave.h @@ -0,0 +1,7 @@ +#ifndef __SLAVE_H__ +#define __SLAVE_H__ + +void slave_initialize(void); +void slave_shutdown(void); + +#endif /* __SLAVE_H__ */ diff --git a/src/source.c b/src/source.c index 00f6ac6b..339acde7 100644 --- a/src/source.c +++ b/src/source.c @@ -373,7 +373,7 @@ done: stats_event_dec(NULL, "sources"); stats_event(source->mount, "listeners", NULL); - printf("DEBUG: souce_main() is now exiting...\n"); + printf("DEBUG: source_main() is now exiting...\n"); global_lock(); global.sources--; @@ -394,7 +394,7 @@ done: static int _compare_clients(void *compare_arg, void *a, void *b) { connection_t *cona = (connection_t *)a; - connection_t *conb = (connection_t *)b; + connection_t *conb = (connection_t *)b; if (cona->id < conb->id) return -1; if (cona->id > conb->id) return 1; diff --git a/src/thread/thread.c b/src/thread/thread.c index 949c6417..e312e3fb 100644 --- a/src/thread/thread.c +++ b/src/thread/thread.c @@ -465,6 +465,18 @@ void thread_cond_broadcast_c(cond_t *cond, int line, char *file) pthread_cond_broadcast(&cond->sys_cond); } +void thread_cond_timedwait_c(cond_t *cond, int millis, int line, char *file) +{ + struct timespec time; + + time.tv_sec = millis/1000; + time.tv_nsec = (millis - time.tv_sec*1000)*1000000; + + pthread_mutex_lock(&cond->cond_mutex); + pthread_cond_timedwait(&cond->sys_cond, &cond->cond_mutex, &time); + pthread_mutex_unlock(&cond->cond_mutex); +} + void thread_cond_wait_c(cond_t *cond, int line, char *file) { pthread_mutex_lock(&cond->cond_mutex); @@ -472,6 +484,8 @@ void thread_cond_wait_c(cond_t *cond, int line, char *file) pthread_mutex_unlock(&cond->cond_mutex); } +static int rwlocknum = 0; + void thread_rwlock_create_c(rwlock_t *rwlock, int line, char *file) { pthread_rwlock_init(&rwlock->sys_rwlock, NULL); diff --git a/src/thread/thread.h b/src/thread/thread.h index a3522657..6ab5c7b5 100644 --- a/src/thread/thread.h +++ b/src/thread/thread.h @@ -86,6 +86,7 @@ typedef struct rwlock_tag { #define thread_cond_signal(x) thread_cond_signal_c(x,__LINE__,__FILE__) #define thread_cond_broadcast(x) thread_cond_broadcast_c(x,__LINE__,__FILE__) #define thread_cond_wait(x) thread_cond_wait_c(x,__LINE__,__FILE__) +#define thread_cond_timedwait(x,t) thread_cond_wait_c(x,t,__LINE__,__FILE__) #define thread_rwlock_create(x) thread_rwlock_create_c(x,__LINE__,__FILE__) #define thread_rwlock_rlock(x) thread_rwlock_rlock_c(x,__LINE__,__FILE__) #define thread_rwlock_wlock(x) thread_rwlock_wlock_c(x,__LINE__,__FILE__) @@ -113,6 +114,7 @@ void thread_cond_create_c(cond_t *cond, int line, char *file); void thread_cond_signal_c(cond_t *cond, int line, char *file); void thread_cond_broadcast_c(cond_t *cond, int line, char *file); void thread_cond_wait_c(cond_t *cond, int line, char *file); +void thread_cond_timedwait_c(cond_t *cond, int millis, int line, char *file); void thread_cond_destroy(cond_t *cond); void thread_rwlock_create_c(rwlock_t *rwlock, int line, char *file); void thread_rwlock_rlock_c(rwlock_t *rwlock, int line, char *file);