1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-11-03 04:17:17 -05:00

Added ability to have multiple master servers

We can basically aggregate all streams from any number of master servers
This commit is contained in:
greenbender 2016-02-25 13:43:32 +11:00 committed by Marvin Scholz
parent 4117a2d2e2
commit 79fb9cbbf4
4 changed files with 133 additions and 28 deletions

View File

@ -133,6 +133,7 @@ static void _parse_http_headers(xmlDocPtr doc,
xmlNodePtr node,
ice_config_http_header_t **http_headers);
static void _parse_master(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c);
static void _parse_relay(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c);
static void _parse_mount(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c);
@ -487,6 +488,7 @@ void config_clear(ice_config_t *c)
{
ice_config_dir_t *dirnode,
*nextdirnode;
master_server *master;
relay_server *relay,
*nextrelay;
mount_proxy *mount,
@ -529,6 +531,11 @@ void config_clear(ice_config_t *c)
while ((c->listen_sock = config_clear_listener(c->listen_sock)));
master = c->master;
while (master) {
master = master_free(master);
}
thread_mutex_lock(&(_locks.relay_lock));
relay = c->relay;
while (relay) {
@ -934,6 +941,8 @@ static void _parse_root(xmlDocPtr doc,
_parse_limits(doc, node->xmlChildrenNode, configuration);
} else if (xmlStrcmp(node->name, XMLSTR("http-headers")) == 0) {
_parse_http_headers(doc, node->xmlChildrenNode, &(configuration->http_headers));
} else if (xmlStrcmp(node->name, XMLSTR("master")) == 0) {
_parse_master(doc, node->xmlChildrenNode, configuration);
} else if (xmlStrcmp(node->name, XMLSTR("relay")) == 0) {
_parse_relay(doc, node->xmlChildrenNode, configuration);
} else if (xmlStrcmp(node->name, XMLSTR("mount")) == 0) {
@ -1588,6 +1597,70 @@ static void _parse_http_headers(xmlDocPtr doc,
xmlFree(value);
}
static void _parse_master(xmlDocPtr doc,
xmlNodePtr node,
ice_config_t *configuration)
{
char *tmp;
master_server *master = calloc(1, sizeof(master_server));
master_server *current = configuration->master;
master_server *last = NULL;
while(current) {
last = current;
current = current->next;
}
if (last) {
last->next = master;
} else {
configuration->master = master;
}
master->next = NULL;
master->on_demand = configuration->on_demand;
master->server = (char *) xmlCharStrdup("127.0.0.1");
master->username = (char *) xmlCharStrdup(configuration->master_username);
master->password = (char *) xmlCharStrdup(configuration->master_password);
do {
if (node == NULL)
break;
if (xmlIsBlankNode(node))
continue;
if (xmlStrcmp(node->name, XMLSTR("server")) == 0) {
if (master->server)
xmlFree(master->server);
master->server = (char *)xmlNodeListGetString(doc,
node->xmlChildrenNode, 1);
} else if (xmlStrcmp(node->name, XMLSTR("port")) == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
if (tmp) {
master->port = atoi(tmp);
xmlFree(tmp);
} else {
ICECAST_LOG_WARN("<port> setting must not be empty.");
}
} else if (xmlStrcmp(node->name, XMLSTR("username")) == 0) {
if (master->username)
xmlFree(master->username);
master->username = (char *)xmlNodeListGetString(doc,
node->xmlChildrenNode, 1);
} else if (xmlStrcmp(node->name, XMLSTR("password")) == 0) {
if (master->password)
xmlFree(master->password);
master->password = (char *)xmlNodeListGetString(doc,
node->xmlChildrenNode, 1);
} else if (xmlStrcmp(node->name, XMLSTR("on-demand")) == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
master->on_demand = util_str_to_bool(tmp);
if (tmp)
xmlFree(tmp);
}
} while ((node = node->next));
}
static void _parse_relay(xmlDocPtr doc,
xmlNodePtr node,
ice_config_t *configuration)

View File

@ -218,6 +218,7 @@ typedef struct ice_config_tag {
/* is TLS supported by the server? */
int tls_ok;
master_server *master;
relay_server *relay;
mount_proxy *mounts;

View File

@ -63,6 +63,19 @@ static volatile int update_all_mounts = 0;
static volatile unsigned int max_interval = 0;
static mutex_t _slave_mutex; // protects update_settings, update_all_mounts, max_interval
master_server *master_free (master_server *master)
{
master_server *next = master->next;
ICECAST_LOG_DEBUG("freeing master %s:%d", master->server, master->port);
xmlFree (master->server);
if (master->username)
xmlFree (master->username);
if (master->password)
xmlFree (master->password);
free (master);
return next;
}
relay_server *relay_free (relay_server *relay)
{
relay_server *next = relay->next;
@ -598,10 +611,8 @@ static void relay_check_streams (relay_server *to_start,
}
static int update_from_master(ice_config_t *config)
static int update_from_master(master_server *master)
{
char *master = NULL, *password = NULL, *username= NULL;
int port;
sock_t mastersock;
int ret = 0;
char buf[256];
@ -610,23 +621,12 @@ static int update_from_master(ice_config_t *config)
char *authheader, *data;
relay_server *new_relays = NULL, *cleanup_relays;
int len, count = 1;
int on_demand;
username = strdup(config->master_username);
if (config->master_password)
password = strdup(config->master_password);
if (config->master_server)
master = strdup(config->master_server);
port = config->master_server_port;
if (password == NULL || master == NULL || port == 0)
if (master->password == NULL || master->server == NULL || master->port == 0)
break;
on_demand = config->on_demand;
ret = 1;
config_release_config();
mastersock = sock_connect_wto(master, port, 10);
mastersock = sock_connect_wto(master->server, master->port, 10);
if (mastersock == SOCK_ERROR)
{
@ -634,9 +634,9 @@ static int update_from_master(ice_config_t *config)
break;
}
len = strlen(username) + strlen(password) + 2;
len = strlen(master->username) + strlen(master->password) + 2;
authheader = malloc(len);
snprintf (authheader, len, "%s:%s", username, password);
snprintf (authheader, len, "%s:%s", master->username, master->password);
data = util_base64_encode(authheader, len);
sock_write (mastersock,
"GET /admin/streamlist.txt HTTP/1.0\r\n"
@ -684,14 +684,14 @@ static int update_from_master(ice_config_t *config)
}
else
{
r->server = (char *)xmlCharStrdup (master);
r->port = port;
r->server = (char *)xmlCharStrdup (master->server);
r->port = master->port;
}
r->mount = strdup(parsed_uri->path);
r->localmount = strdup(parsed_uri->path);
r->mp3metadata = 1;
r->on_demand = on_demand;
r->on_demand = master->on_demand;
r->next = new_relays;
ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", r->server, r->port, r->mount);
new_relays = r;
@ -710,12 +710,24 @@ static int update_from_master(ice_config_t *config)
} while(0);
if (master)
free (master);
if (username)
free (username);
if (password)
free (password);
return ret;
}
static int update_from_master_legacy(ice_config_t *config)
{
master_server *master = calloc (1, sizeof (master_server));
int ret = 0;
if (master) {
master->username = strdup(config->master_username);
if (config->master_password)
master->password = strdup(config->master_password);
if (config->master_server)
master->server = strdup(config->master_server);
master->port = config->master_server_port;
ret = update_from_master(master);
master_free(master);
}
return ret;
}
@ -761,6 +773,7 @@ static void *_slave_thread(void *arg)
thread_mutex_lock(&_slave_mutex);
if (max_interval <= interval)
{
master_server *master;
ICECAST_LOG_DEBUG("checking master stream list");
config = config_get_config();
@ -769,9 +782,16 @@ static void *_slave_thread(void *arg)
interval = 0;
max_interval = config->master_update_interval;
thread_mutex_unlock(&_slave_mutex);
/* update all non-legacy master servers */
master = config->master;
while (master) {
update_from_master(master);
master = master->next;
}
/* the connection could take some time, so the lock can drop */
if (update_from_master (config))
if (update_from_master_legacy (config))
config = config_get_config();
thread_mutex_lock (&(config_locks()->relay_lock));

View File

@ -15,6 +15,15 @@
#include "common/thread/thread.h"
typedef struct _master_server {
char *server;
int port;
char *username;
char *password;
int on_demand;
struct _master_server *next;
} master_server;
typedef struct _relay_server {
char *server;
int port;
@ -34,6 +43,8 @@ typedef struct _relay_server {
} relay_server;
master_server *master_free (master_server *master);
void slave_initialize(void);
void slave_shutdown(void);
void slave_update_all_mounts (void);