mirror of
https://gitlab.xiph.org/xiph/icecast-server.git
synced 2025-02-02 15:07:36 -05:00
Merge branch 'feature-relay-improvements'
This commit is contained in:
commit
3f76299e3e
204
src/cfgfile.c
204
src/cfgfile.c
@ -75,6 +75,9 @@
|
||||
#define CONFIG_DEFAULT_GROUP NULL
|
||||
#define CONFIG_MASTER_UPDATE_INTERVAL 120
|
||||
#define CONFIG_YP_URL_TIMEOUT 10
|
||||
#define CONFIG_DEFAULT_RELAY_SERVER "127.0.0.1"
|
||||
#define CONFIG_DEFAULT_RELAY_PORT 80
|
||||
#define CONFIG_DEFAULT_RELAY_MOUNT "/"
|
||||
#define CONFIG_DEFAULT_CIPHER_LIST "ECDHE-RSA-AES128-GCM-SHA256:"\
|
||||
"ECDHE-ECDSA-AES128-GCM-SHA256:"\
|
||||
"ECDHE-RSA-AES256-GCM-SHA384:"\
|
||||
@ -142,8 +145,8 @@ static void _parse_http_headers(xmlDocPtr doc,
|
||||
xmlNodePtr node,
|
||||
ice_config_http_header_t **http_headers);
|
||||
|
||||
static void _parse_relay(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c);
|
||||
static void _parse_mount(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c);
|
||||
static void _parse_relay(xmlDocPtr doc, xmlNodePtr node, ice_config_t *c, const char *mount);
|
||||
static void _parse_mount(xmlDocPtr doc, xmlNodePtr parentnode, ice_config_t *c);
|
||||
|
||||
static void _parse_listen_socket(xmlDocPtr doc,
|
||||
xmlNodePtr node,
|
||||
@ -622,13 +625,9 @@ void config_clear(ice_config_t *c)
|
||||
{
|
||||
ice_config_dir_t *dirnode,
|
||||
*nextdirnode;
|
||||
relay_server *relay,
|
||||
*nextrelay;
|
||||
mount_proxy *mount,
|
||||
*nextmount;
|
||||
#ifdef USE_YP
|
||||
int i;
|
||||
#endif
|
||||
size_t i;
|
||||
|
||||
free(c->config_filename);
|
||||
|
||||
@ -666,15 +665,10 @@ void config_clear(ice_config_t *c)
|
||||
while ((c->listen_sock = config_clear_listener(c->listen_sock)));
|
||||
|
||||
thread_mutex_lock(&(_locks.relay_lock));
|
||||
relay = c->relay;
|
||||
while (relay) {
|
||||
nextrelay = relay->next;
|
||||
xmlFree(relay->server);
|
||||
xmlFree(relay->mount);
|
||||
xmlFree(relay->localmount);
|
||||
free(relay);
|
||||
relay = nextrelay;
|
||||
for (i = 0; i < c->relay_length; i++) {
|
||||
relay_config_free(c->relay[i]);
|
||||
}
|
||||
free(c->relay);
|
||||
thread_mutex_unlock(&(_locks.relay_lock));
|
||||
|
||||
mount = c->mounts;
|
||||
@ -694,10 +688,8 @@ void config_clear(ice_config_t *c)
|
||||
dirnode = nextdirnode;
|
||||
}
|
||||
#ifdef USE_YP
|
||||
i = 0;
|
||||
while (i < c->num_yp_directories) {
|
||||
for (i = 0; i < c->num_yp_directories; i++) {
|
||||
xmlFree(c->yp_url[i]);
|
||||
i++;
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -1070,7 +1062,7 @@ static void _parse_root(xmlDocPtr doc,
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("http-headers")) == 0) {
|
||||
_parse_http_headers(doc, node->xmlChildrenNode, &(configuration->http_headers));
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("relay")) == 0) {
|
||||
_parse_relay(doc, node->xmlChildrenNode, configuration);
|
||||
_parse_relay(doc, node->xmlChildrenNode, configuration, NULL);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("mount")) == 0) {
|
||||
_parse_mount(doc, node, configuration);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("directory")) == 0) {
|
||||
@ -1366,7 +1358,7 @@ static void _parse_mount_oldstyle_authentication(mount_proxy *mount,
|
||||
}
|
||||
|
||||
static void _parse_mount(xmlDocPtr doc,
|
||||
xmlNodePtr node,
|
||||
xmlNodePtr parentnode,
|
||||
ice_config_t *configuration)
|
||||
{
|
||||
char *tmp;
|
||||
@ -1376,6 +1368,7 @@ static void _parse_mount(xmlDocPtr doc,
|
||||
char *username = NULL;
|
||||
char *password = NULL;
|
||||
auth_stack_t *authstack = NULL;
|
||||
xmlNodePtr node;
|
||||
|
||||
/* default <mount> settings */
|
||||
mount->mounttype = MOUNT_TYPE_NORMAL;
|
||||
@ -1386,7 +1379,7 @@ static void _parse_mount(xmlDocPtr doc,
|
||||
mount->max_history = -1;
|
||||
mount->next = NULL;
|
||||
|
||||
tmp = (char *)xmlGetProp(node, XMLSTR("type"));
|
||||
tmp = (char *)xmlGetProp(parentnode, XMLSTR("type"));
|
||||
if (tmp) {
|
||||
if (strcmp(tmp, "normal") == 0) {
|
||||
mount->mounttype = MOUNT_TYPE_NORMAL;
|
||||
@ -1400,7 +1393,7 @@ static void _parse_mount(xmlDocPtr doc,
|
||||
xmlFree(tmp);
|
||||
}
|
||||
|
||||
node = node->xmlChildrenNode;
|
||||
node = parentnode->xmlChildrenNode;
|
||||
|
||||
do {
|
||||
if (node == NULL)
|
||||
@ -1553,6 +1546,25 @@ static void _parse_mount(xmlDocPtr doc,
|
||||
}
|
||||
} while ((node = node->next));
|
||||
|
||||
/* Do a second interation as we need to know mount->mountname, and mount->mounttype first */
|
||||
node = parentnode->xmlChildrenNode;
|
||||
|
||||
do {
|
||||
if (node == NULL)
|
||||
break;
|
||||
|
||||
if (xmlStrcmp(node->name, XMLSTR("relay")) == 0) {
|
||||
if (mount->mounttype != MOUNT_TYPE_NORMAL) {
|
||||
ICECAST_LOG_WARN("<relay> set within <mount> for mountpoint %s%s%s that is not type=\"normal\"",
|
||||
(mount->mountname ? "\"" : ""), (mount->mountname ? mount->mountname : "<no name>"), (mount->mountname ? "\"" : ""));
|
||||
} else if (!mount->mountname || mount->mountname[0] != '/') {
|
||||
ICECAST_LOG_WARN("<relay> set within <mount> with no mountpoint defined.");
|
||||
} else {
|
||||
_parse_relay(doc, node->xmlChildrenNode, configuration, mount->mountname);
|
||||
}
|
||||
}
|
||||
} while ((node = node->next));
|
||||
|
||||
if (password) {
|
||||
auth_stack_t *old_style = NULL;
|
||||
__append_old_style_auth(&old_style, "legacy-mount-source",
|
||||
@ -1685,31 +1697,11 @@ static void _parse_http_headers(xmlDocPtr doc,
|
||||
xmlFree(value);
|
||||
}
|
||||
|
||||
static void _parse_relay(xmlDocPtr doc,
|
||||
xmlNodePtr node,
|
||||
ice_config_t *configuration)
|
||||
static void _parse_relay_upstream(xmlDocPtr doc,
|
||||
xmlNodePtr node,
|
||||
relay_config_upstream_t *upstream)
|
||||
{
|
||||
char *tmp;
|
||||
relay_server *relay = calloc(1, sizeof(relay_server));
|
||||
relay_server *current = configuration->relay;
|
||||
relay_server *last = NULL;
|
||||
|
||||
while(current) {
|
||||
last = current;
|
||||
current = current->next;
|
||||
}
|
||||
|
||||
if (last) {
|
||||
last->next = relay;
|
||||
} else {
|
||||
configuration->relay = relay;
|
||||
}
|
||||
|
||||
relay->next = NULL;
|
||||
relay->mp3metadata = 1;
|
||||
relay->on_demand = configuration->on_demand;
|
||||
relay->server = (char *) xmlCharStrdup("127.0.0.1");
|
||||
relay->mount = (char *) xmlCharStrdup("/");
|
||||
|
||||
do {
|
||||
if (node == NULL)
|
||||
@ -1718,51 +1710,123 @@ static void _parse_relay(xmlDocPtr doc,
|
||||
continue;
|
||||
|
||||
if (xmlStrcmp(node->name, XMLSTR("server")) == 0) {
|
||||
if (relay->server)
|
||||
xmlFree(relay->server);
|
||||
relay->server = (char *)xmlNodeListGetString(doc,
|
||||
if (upstream->server)
|
||||
xmlFree(upstream->server);
|
||||
upstream->server = (char *)xmlNodeListGetString(doc,
|
||||
node->xmlChildrenNode, 1);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("port")) == 0) {
|
||||
__read_int(doc, node, &relay->port, "<port> setting must not be empty.");
|
||||
__read_int(doc, node, &upstream->port, "<port> setting must not be empty.");
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("mount")) == 0) {
|
||||
if (relay->mount)
|
||||
xmlFree(relay->mount);
|
||||
relay->mount = (char *)xmlNodeListGetString(doc,
|
||||
node->xmlChildrenNode, 1);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("local-mount")) == 0) {
|
||||
if (relay->localmount)
|
||||
xmlFree(relay->localmount);
|
||||
relay->localmount = (char *)xmlNodeListGetString(doc,
|
||||
if (upstream->mount)
|
||||
xmlFree(upstream->mount);
|
||||
upstream->mount = (char *)xmlNodeListGetString(doc,
|
||||
node->xmlChildrenNode, 1);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("relay-shoutcast-metadata")) == 0) {
|
||||
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
|
||||
relay->mp3metadata = util_str_to_bool(tmp);
|
||||
upstream->mp3metadata = util_str_to_bool(tmp);
|
||||
if(tmp)
|
||||
xmlFree(tmp);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("username")) == 0) {
|
||||
if (relay->username)
|
||||
xmlFree(relay->username);
|
||||
relay->username = (char *)xmlNodeListGetString(doc,
|
||||
if (upstream->username)
|
||||
xmlFree(upstream->username);
|
||||
upstream->username = (char *)xmlNodeListGetString(doc,
|
||||
node->xmlChildrenNode, 1);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("password")) == 0) {
|
||||
if (relay->password)
|
||||
xmlFree(relay->password);
|
||||
relay->password = (char *)xmlNodeListGetString(doc,
|
||||
if (upstream->password)
|
||||
xmlFree(upstream->password);
|
||||
upstream->password = (char *)xmlNodeListGetString(doc,
|
||||
node->xmlChildrenNode, 1);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("bind")) == 0) {
|
||||
if (upstream->bind)
|
||||
xmlFree(upstream->bind);
|
||||
upstream->bind = (char *)xmlNodeListGetString(doc,
|
||||
node->xmlChildrenNode, 1);
|
||||
}
|
||||
} while ((node = node->next));
|
||||
}
|
||||
|
||||
static void _parse_relay_upstream_apply_defaults(relay_config_upstream_t *upstream)
|
||||
{
|
||||
if (!upstream->server)
|
||||
upstream->server = (char *)xmlCharStrdup(CONFIG_DEFAULT_RELAY_SERVER);
|
||||
if (!upstream->port)
|
||||
upstream->port = CONFIG_DEFAULT_RELAY_PORT;
|
||||
if (!upstream->mount)
|
||||
upstream->mount = (char *)xmlCharStrdup(CONFIG_DEFAULT_RELAY_MOUNT);
|
||||
}
|
||||
|
||||
static void _parse_relay(xmlDocPtr doc,
|
||||
xmlNodePtr node,
|
||||
ice_config_t *configuration,
|
||||
const char *mount)
|
||||
{
|
||||
char *tmp;
|
||||
relay_config_t *relay = calloc(1, sizeof(relay_config_t));
|
||||
relay_config_t **n = realloc(configuration->relay, sizeof(*configuration->relay)*(configuration->relay_length + 1));
|
||||
|
||||
if (!n) {
|
||||
ICECAST_LOG_ERROR("Can not allocate memory for additional relay.");
|
||||
return;
|
||||
}
|
||||
|
||||
configuration->relay = n;
|
||||
configuration->relay[configuration->relay_length++] = relay;
|
||||
|
||||
relay->upstream_default.mp3metadata = 1;
|
||||
relay->on_demand = configuration->on_demand;
|
||||
|
||||
_parse_relay_upstream(doc, node, &(relay->upstream_default));
|
||||
|
||||
do {
|
||||
if (node == NULL)
|
||||
break;
|
||||
if (xmlIsBlankNode(node))
|
||||
continue;
|
||||
|
||||
if (xmlStrcmp(node->name, XMLSTR("local-mount")) == 0) {
|
||||
if (mount) {
|
||||
ICECAST_LOG_WARN("Relay defined within mount \"%s\" defines <local-mount> which is ignored.", mount);
|
||||
} else {
|
||||
if (relay->localmount)
|
||||
xmlFree(relay->localmount);
|
||||
relay->localmount = (char *)xmlNodeListGetString(doc,
|
||||
node->xmlChildrenNode, 1);
|
||||
}
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("on-demand")) == 0) {
|
||||
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
|
||||
relay->on_demand = util_str_to_bool(tmp);
|
||||
if (tmp)
|
||||
xmlFree(tmp);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("bind")) == 0) {
|
||||
if (relay->bind)
|
||||
xmlFree(relay->bind);
|
||||
relay->bind = (char *)xmlNodeListGetString(doc,
|
||||
node->xmlChildrenNode, 1);
|
||||
} else if (xmlStrcmp(node->name, XMLSTR("upstream")) == 0) {
|
||||
tmp = (char *)xmlGetProp(node, XMLSTR("type"));
|
||||
|
||||
if (tmp == NULL || strcmp(tmp, "normal") == 0) {
|
||||
relay_config_upstream_t *n = realloc(relay->upstream, sizeof(*n)*(relay->upstreams + 1));
|
||||
if (n) {
|
||||
relay->upstream = n;
|
||||
memset(&(n[relay->upstreams]), 0, sizeof(relay_config_upstream_t));
|
||||
_parse_relay_upstream(doc, node->xmlChildrenNode, &(n[relay->upstreams]));
|
||||
relay->upstreams++;
|
||||
}
|
||||
} else if (strcmp(tmp, "default") == 0) {
|
||||
_parse_relay_upstream(doc, node->xmlChildrenNode, &(relay->upstream_default));
|
||||
} else {
|
||||
ICECAST_LOG_WARN("<upstream> of unknown type is ignored.");
|
||||
}
|
||||
|
||||
if (tmp)
|
||||
xmlFree(tmp);
|
||||
}
|
||||
} while ((node = node->next));
|
||||
|
||||
_parse_relay_upstream_apply_defaults(&(relay->upstream_default));
|
||||
|
||||
if (mount) {
|
||||
relay->localmount = (char *)xmlStrdup(XMLSTR(mount));
|
||||
}
|
||||
|
||||
if (relay->localmount == NULL)
|
||||
relay->localmount = (char *)xmlStrdup(XMLSTR(relay->mount));
|
||||
relay->localmount = (char *)xmlStrdup(XMLSTR(relay->upstream_default.mount));
|
||||
}
|
||||
|
||||
static void _parse_listen_socket(xmlDocPtr doc,
|
||||
|
@ -173,6 +173,24 @@ typedef struct _config_tls_context {
|
||||
char *cipher_list;
|
||||
} config_tls_context_t;
|
||||
|
||||
typedef struct {
|
||||
char *server;
|
||||
int port;
|
||||
char *mount;
|
||||
char *username;
|
||||
char *password;
|
||||
char *bind;
|
||||
int mp3metadata;
|
||||
} relay_config_upstream_t;
|
||||
|
||||
typedef struct {
|
||||
char *localmount;
|
||||
int on_demand;
|
||||
size_t upstreams;
|
||||
relay_config_upstream_t *upstream;
|
||||
relay_config_upstream_t upstream_default;
|
||||
} relay_config_t;
|
||||
|
||||
struct ice_config_tag {
|
||||
char *config_filename;
|
||||
|
||||
@ -219,7 +237,8 @@ struct ice_config_tag {
|
||||
/* is TLS supported by the server? */
|
||||
int tls_ok;
|
||||
|
||||
relay_server *relay;
|
||||
size_t relay_length;
|
||||
relay_config_t **relay;
|
||||
|
||||
mount_proxy *mounts;
|
||||
|
||||
|
@ -36,9 +36,9 @@ typedef struct ice_global_tag
|
||||
|
||||
avl_tree *source_tree;
|
||||
/* for locally defined relays */
|
||||
relay_server *relays;
|
||||
relay_t *relays;
|
||||
/* relays retrieved from master */
|
||||
relay_server *master_relays;
|
||||
relay_t *master_relays;
|
||||
|
||||
module_container_t *modulecontainer;
|
||||
|
||||
|
@ -90,7 +90,7 @@ typedef enum {
|
||||
|
||||
/* ---[ slave.[ch] ]--- */
|
||||
|
||||
typedef struct _relay_server relay_server;
|
||||
typedef struct relay_tag relay_t;
|
||||
|
||||
/* ---[ module.[ch] ]--- */
|
||||
|
||||
|
389
src/slave.c
389
src/slave.c
@ -56,6 +56,16 @@
|
||||
|
||||
#define CATMODULE "slave"
|
||||
|
||||
struct relay_tag {
|
||||
relay_config_t *config;
|
||||
source_t *source;
|
||||
int running;
|
||||
int cleanup;
|
||||
time_t start;
|
||||
thread_type *thread;
|
||||
relay_t *next;
|
||||
};
|
||||
|
||||
static void *_slave_thread(void *arg);
|
||||
static thread_type *_slave_thread_id;
|
||||
static int slave_running = 0;
|
||||
@ -64,44 +74,115 @@ static volatile int update_all_mounts = 0;
|
||||
static volatile unsigned int max_interval = 0;
|
||||
static mutex_t _slave_mutex; // protects update_settings, update_all_mounts, max_interval
|
||||
|
||||
relay_server *relay_free (relay_server *relay)
|
||||
static inline void relay_config_upstream_free (relay_config_upstream_t *upstream)
|
||||
{
|
||||
relay_server *next = relay->next;
|
||||
ICECAST_LOG_DEBUG("freeing relay %s", relay->localmount);
|
||||
if (upstream->server)
|
||||
xmlFree(upstream->server);
|
||||
if (upstream->mount)
|
||||
xmlFree(upstream->mount);
|
||||
if (upstream->username)
|
||||
xmlFree(upstream->username);
|
||||
if (upstream->password)
|
||||
xmlFree(upstream->password);
|
||||
}
|
||||
|
||||
void relay_config_free (relay_config_t *relay)
|
||||
{
|
||||
size_t i;
|
||||
|
||||
ICECAST_LOG_DEBUG("freeing relay config for %s", relay->localmount);
|
||||
|
||||
for (i = 0; i < relay->upstreams; i++) {
|
||||
relay_config_upstream_free(&(relay->upstream[i]));
|
||||
}
|
||||
|
||||
relay_config_upstream_free(&(relay->upstream_default));
|
||||
|
||||
xmlFree(relay->localmount);
|
||||
free(relay->upstream);
|
||||
free(relay);
|
||||
}
|
||||
|
||||
relay_t *relay_free (relay_t *relay)
|
||||
{
|
||||
relay_t *next = relay->next;
|
||||
|
||||
ICECAST_LOG_DEBUG("freeing relay %s", relay->config->localmount);
|
||||
|
||||
if (relay->source)
|
||||
source_free_source (relay->source);
|
||||
xmlFree (relay->server);
|
||||
xmlFree (relay->mount);
|
||||
xmlFree (relay->localmount);
|
||||
if (relay->username)
|
||||
xmlFree (relay->username);
|
||||
if (relay->password)
|
||||
xmlFree (relay->password);
|
||||
free (relay);
|
||||
|
||||
relay_config_free(relay->config);
|
||||
|
||||
free(relay);
|
||||
return next;
|
||||
}
|
||||
|
||||
|
||||
relay_server *relay_copy (relay_server *r)
|
||||
static inline void relay_config_upstream_copy(relay_config_upstream_t *dst, const relay_config_upstream_t *src)
|
||||
{
|
||||
relay_server *copy = calloc (1, sizeof (relay_server));
|
||||
dst->server = (char *)xmlCharStrdup(src->server);
|
||||
dst->mount = (char *)xmlCharStrdup(src->mount);
|
||||
|
||||
if (copy)
|
||||
{
|
||||
copy->server = (char *)xmlCharStrdup (r->server);
|
||||
copy->mount = (char *)xmlCharStrdup (r->mount);
|
||||
copy->localmount = (char *)xmlCharStrdup (r->localmount);
|
||||
if (r->username)
|
||||
copy->username = (char *)xmlCharStrdup (r->username);
|
||||
if (r->password)
|
||||
copy->password = (char *)xmlCharStrdup (r->password);
|
||||
copy->port = r->port;
|
||||
copy->mp3metadata = r->mp3metadata;
|
||||
copy->on_demand = r->on_demand;
|
||||
if (src->username)
|
||||
dst->username = (char *)xmlCharStrdup(src->username);
|
||||
if (src->password)
|
||||
dst->password = (char *)xmlCharStrdup(src->password);
|
||||
|
||||
dst->port = src->port;
|
||||
|
||||
dst->mp3metadata = src->mp3metadata;
|
||||
}
|
||||
|
||||
static inline relay_config_t *relay_config_copy (relay_config_t *r)
|
||||
{
|
||||
relay_config_t *copy = calloc (1, sizeof (relay_config_t));
|
||||
relay_config_upstream_t *u = NULL;
|
||||
size_t i;
|
||||
|
||||
if (r->upstreams) {
|
||||
u = calloc(r->upstreams, sizeof(relay_config_upstream_t));
|
||||
if (!u) {
|
||||
free(copy);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (!copy) {
|
||||
free(u);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
copy->upstream = u;
|
||||
copy->upstreams = r->upstreams;
|
||||
|
||||
copy->localmount = (char *)xmlCharStrdup(r->localmount);
|
||||
copy->on_demand = r->on_demand;
|
||||
|
||||
relay_config_upstream_copy(&(copy->upstream_default), &(r->upstream_default));
|
||||
|
||||
for (i = 0; i < r->upstreams; i++)
|
||||
relay_config_upstream_copy(&(copy->upstream[i]), &(r->upstream[i]));
|
||||
|
||||
|
||||
return copy;
|
||||
}
|
||||
|
||||
static inline relay_t *relay_new(relay_config_t *config)
|
||||
{
|
||||
relay_t *r = calloc(1, sizeof(*r));
|
||||
|
||||
if (!r)
|
||||
return NULL;
|
||||
|
||||
r->config = relay_config_copy(config);
|
||||
if (!r->config) {
|
||||
free(r);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
/* force a recheck of the relays. This will recheck the master server if
|
||||
* this is a slave and rebuild all mountpoints in the stats tree
|
||||
@ -152,16 +233,17 @@ void slave_shutdown(void)
|
||||
/* Actually open the connection and do some http parsing, handle any 302
|
||||
* responses within here.
|
||||
*/
|
||||
static client_t *open_relay_connection (relay_server *relay)
|
||||
#define _GET_UPSTREAM_SETTING(n) ((upstream && upstream->n) ? upstream->n : relay->config->upstream_default.n)
|
||||
static client_t *open_relay_connection (relay_t *relay, relay_config_upstream_t *upstream)
|
||||
{
|
||||
int redirects = 0;
|
||||
char *server_id = NULL;
|
||||
ice_config_t *config;
|
||||
http_parser_t *parser = NULL;
|
||||
connection_t *con=NULL;
|
||||
char *server = strdup (relay->server);
|
||||
char *mount = strdup (relay->mount);
|
||||
int port = relay->port;
|
||||
char *server = strdup (_GET_UPSTREAM_SETTING(server));
|
||||
char *mount = strdup (_GET_UPSTREAM_SETTING(mount));
|
||||
int port = _GET_UPSTREAM_SETTING(port);
|
||||
char *auth_header;
|
||||
char header[4096];
|
||||
|
||||
@ -170,13 +252,13 @@ static client_t *open_relay_connection (relay_server *relay)
|
||||
config_release_config ();
|
||||
|
||||
/* build any authentication header before connecting */
|
||||
if (relay->username && relay->password)
|
||||
if (_GET_UPSTREAM_SETTING(username) && _GET_UPSTREAM_SETTING(password))
|
||||
{
|
||||
char *esc_authorisation;
|
||||
unsigned len = strlen(relay->username) + strlen(relay->password) + 2;
|
||||
unsigned len = strlen(_GET_UPSTREAM_SETTING(username)) + strlen(_GET_UPSTREAM_SETTING(password)) + 2;
|
||||
|
||||
auth_header = malloc (len);
|
||||
snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
|
||||
snprintf (auth_header, len, "%s:%s", _GET_UPSTREAM_SETTING(username), _GET_UPSTREAM_SETTING(password));
|
||||
esc_authorisation = util_base64_encode(auth_header, len);
|
||||
free(auth_header);
|
||||
len = strlen (esc_authorisation) + 24;
|
||||
@ -194,7 +276,7 @@ static client_t *open_relay_connection (relay_server *relay)
|
||||
|
||||
ICECAST_LOG_INFO("connecting to %s:%d", server, port);
|
||||
|
||||
streamsock = sock_connect_wto_bind (server, port, relay->bind, 10);
|
||||
streamsock = sock_connect_wto_bind (server, port, _GET_UPSTREAM_SETTING(bind), 10);
|
||||
if (streamsock == SOCK_ERROR)
|
||||
{
|
||||
ICECAST_LOG_WARN("Failed to connect to %s:%d", server, port);
|
||||
@ -216,19 +298,19 @@ static client_t *open_relay_connection (relay_server *relay)
|
||||
mount,
|
||||
server_id,
|
||||
server,
|
||||
relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
|
||||
_GET_UPSTREAM_SETTING(mp3metadata) ? "Icy-MetaData: 1\r\n" : "",
|
||||
auth_header);
|
||||
memset (header, 0, sizeof(header));
|
||||
if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
|
||||
{
|
||||
ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
|
||||
ICECAST_LOG_ERROR("Header read failed for %s (%s:%d%s)", relay->config->localmount, server, port, mount);
|
||||
break;
|
||||
}
|
||||
parser = httpp_create_parser();
|
||||
httpp_initialize (parser, NULL);
|
||||
if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
|
||||
if (! httpp_parse_response (parser, header, strlen(header), relay->config->localmount))
|
||||
{
|
||||
ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
|
||||
ICECAST_LOG_ERROR("Error parsing relay request for %s (%s:%d%s)", relay->config->localmount,
|
||||
server, port, mount);
|
||||
break;
|
||||
}
|
||||
@ -268,7 +350,7 @@ static client_t *open_relay_connection (relay_server *relay)
|
||||
|
||||
if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
|
||||
{
|
||||
ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->localmount,
|
||||
ICECAST_LOG_ERROR("Error from relay request: %s (%s)", relay->config->localmount,
|
||||
httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
|
||||
break;
|
||||
}
|
||||
@ -313,14 +395,27 @@ static client_t *open_relay_connection (relay_server *relay)
|
||||
*/
|
||||
static void *start_relay_stream (void *arg)
|
||||
{
|
||||
relay_server *relay = arg;
|
||||
relay_t *relay = arg;
|
||||
source_t *src = relay->source;
|
||||
client_t *client;
|
||||
|
||||
ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->localmount);
|
||||
ICECAST_LOG_INFO("Starting relayed source at mountpoint \"%s\"", relay->config->localmount);
|
||||
do
|
||||
{
|
||||
client = open_relay_connection (relay);
|
||||
size_t i;
|
||||
|
||||
for (i = 0; i < relay->config->upstreams; i++) {
|
||||
ICECAST_LOG_DEBUG("For relay on mount \"%s\", trying upstream #%zu", relay->config->localmount, i);
|
||||
client = open_relay_connection(relay, &(relay->config->upstream[i]));
|
||||
if (client)
|
||||
break;
|
||||
}
|
||||
|
||||
/* if we have no upstreams defined, use the default upstream */
|
||||
if (!relay->config->upstreams) {
|
||||
ICECAST_LOG_DEBUG("For relay on mount \"%s\" with no upstreams trying upstream default", relay->config->localmount);
|
||||
client = open_relay_connection(relay, NULL);
|
||||
}
|
||||
|
||||
if (client == NULL)
|
||||
continue;
|
||||
@ -337,14 +432,14 @@ static void *start_relay_stream (void *arg)
|
||||
continue;
|
||||
}
|
||||
stats_event_inc(NULL, "source_relay_connections");
|
||||
stats_event (relay->localmount, "source_ip", client->con->ip);
|
||||
stats_event (relay->config->localmount, "source_ip", client->con->ip);
|
||||
|
||||
source_main (relay->source);
|
||||
|
||||
if (relay->on_demand == 0)
|
||||
if (relay->config->on_demand == 0)
|
||||
{
|
||||
/* only keep refreshing YP entries for inactive on-demand relays */
|
||||
yp_remove (relay->localmount);
|
||||
yp_remove (relay->config->localmount);
|
||||
relay->source->yp_public = -1;
|
||||
relay->start = time(NULL) + 10; /* prevent busy looping if failing */
|
||||
slave_update_all_mounts();
|
||||
@ -387,30 +482,30 @@ static void *start_relay_stream (void *arg)
|
||||
|
||||
|
||||
/* wrapper for starting the provided relay stream */
|
||||
static void check_relay_stream (relay_server *relay)
|
||||
static void check_relay_stream (relay_t *relay)
|
||||
{
|
||||
if (relay->source == NULL)
|
||||
{
|
||||
if (relay->localmount[0] != '/')
|
||||
if (relay->config->localmount[0] != '/')
|
||||
{
|
||||
ICECAST_LOG_WARN("relay mountpoint \"%s\" does not start with /, skipping",
|
||||
relay->localmount);
|
||||
relay->config->localmount);
|
||||
return;
|
||||
}
|
||||
/* new relay, reserve the name */
|
||||
relay->source = source_reserve (relay->localmount);
|
||||
relay->source = source_reserve (relay->config->localmount);
|
||||
if (relay->source)
|
||||
{
|
||||
ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->localmount);
|
||||
if (relay->on_demand)
|
||||
ICECAST_LOG_DEBUG("Adding relay source at mountpoint \"%s\"", relay->config->localmount);
|
||||
if (relay->config->on_demand)
|
||||
{
|
||||
ice_config_t *config = config_get_config ();
|
||||
mount_proxy *mountinfo = config_find_mount (config, relay->localmount, MOUNT_TYPE_NORMAL);
|
||||
relay->source->on_demand = relay->on_demand;
|
||||
mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
|
||||
relay->source->on_demand = relay->config->on_demand;
|
||||
if (mountinfo == NULL)
|
||||
source_update_settings (config, relay->source, mountinfo);
|
||||
config_release_config ();
|
||||
stats_event (relay->localmount, "listeners", "0");
|
||||
stats_event (relay->config->localmount, "listeners", "0");
|
||||
slave_update_all_mounts();
|
||||
}
|
||||
}
|
||||
@ -418,7 +513,7 @@ static void check_relay_stream (relay_server *relay)
|
||||
{
|
||||
if (relay->start == 0)
|
||||
{
|
||||
ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->localmount);
|
||||
ICECAST_LOG_WARN("new relay but source \"%s\" already exists", relay->config->localmount);
|
||||
relay->start = 1;
|
||||
}
|
||||
return;
|
||||
@ -431,9 +526,9 @@ static void check_relay_stream (relay_server *relay)
|
||||
if (relay->source == NULL || relay->running || relay->start > time(NULL))
|
||||
break;
|
||||
/* check if an inactive on-demand relay has a fallback that has listeners */
|
||||
if (relay->on_demand && source->on_demand_req == 0)
|
||||
if (relay->config->on_demand && source->on_demand_req == 0)
|
||||
{
|
||||
relay->source->on_demand = relay->on_demand;
|
||||
relay->source->on_demand = relay->config->on_demand;
|
||||
|
||||
if (source->fallback_mount && source->fallback_override)
|
||||
{
|
||||
@ -463,20 +558,20 @@ static void check_relay_stream (relay_server *relay)
|
||||
{
|
||||
if (relay->thread)
|
||||
{
|
||||
ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->localmount);
|
||||
ICECAST_LOG_DEBUG("waiting for relay thread for \"%s\"", relay->config->localmount);
|
||||
thread_join (relay->thread);
|
||||
relay->thread = NULL;
|
||||
}
|
||||
relay->cleanup = 0;
|
||||
relay->running = 0;
|
||||
|
||||
if (relay->on_demand && relay->source)
|
||||
if (relay->config->on_demand && relay->source)
|
||||
{
|
||||
ice_config_t *config = config_get_config ();
|
||||
mount_proxy *mountinfo = config_find_mount (config, relay->localmount, MOUNT_TYPE_NORMAL);
|
||||
mount_proxy *mountinfo = config_find_mount (config, relay->config->localmount, MOUNT_TYPE_NORMAL);
|
||||
source_update_settings (config, relay->source, mountinfo);
|
||||
config_release_config ();
|
||||
stats_event (relay->localmount, "listeners", "0");
|
||||
stats_event (relay->config->localmount, "listeners", "0");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -485,23 +580,52 @@ static void check_relay_stream (relay_server *relay)
|
||||
/* compare the 2 relays to see if there are any changes, return 1 if
|
||||
* the relay needs to be restarted, 0 otherwise
|
||||
*/
|
||||
static int relay_has_changed (relay_server *new, relay_server *old)
|
||||
#define _EQ_STR(a,b) (((a) == (b)) || ((a) != NULL && (b) != NULL && strcmp((a), (b)) == 0))
|
||||
#define _EQ_ATTR(x) (_EQ_STR((new->x), (old->x)))
|
||||
static int relay_has_changed_upstream(const relay_config_upstream_t *new, const relay_config_upstream_t *old)
|
||||
{
|
||||
do
|
||||
{
|
||||
if (strcmp (new->mount, old->mount) != 0)
|
||||
break;
|
||||
if (strcmp (new->server, old->server) != 0)
|
||||
break;
|
||||
if (new->port != old->port)
|
||||
break;
|
||||
if (new->mp3metadata != old->mp3metadata)
|
||||
break;
|
||||
if (new->on_demand != old->on_demand)
|
||||
old->on_demand = new->on_demand;
|
||||
return 0;
|
||||
} while (0);
|
||||
return 1;
|
||||
if (new->mp3metadata != old->mp3metadata)
|
||||
return 1;
|
||||
|
||||
if (!_EQ_ATTR(server) || new->port != old->port)
|
||||
return 1;
|
||||
|
||||
if (!_EQ_ATTR(mount))
|
||||
return 1;
|
||||
|
||||
/* NOTE: We currently do not consider this a relevant change. Why?
|
||||
if (!_EQ_ATTR(username) || !_EQ_ATTR(password))
|
||||
return 1;
|
||||
|
||||
if (!_EQ_ATTR(bind))
|
||||
return 1;
|
||||
*/
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int relay_has_changed (const relay_config_t *new, relay_config_t *old)
|
||||
{
|
||||
size_t i;
|
||||
|
||||
/* This is not fully true: If more upstreams has been added there is no reason
|
||||
* to restart the relay. However for now we ignore this case. TODO: Change this.
|
||||
*/
|
||||
if (new->upstreams != old->upstreams)
|
||||
return 1;
|
||||
|
||||
for (i = 0; i < new->upstreams; i++) {
|
||||
if (relay_has_changed_upstream(&(new->upstream[i]), &(old->upstream[i])))
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (relay_has_changed_upstream(&(new->upstream_default), &(old->upstream_default)))
|
||||
return 1;
|
||||
|
||||
/* Why do we do this here? */
|
||||
old->on_demand = new->on_demand;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@ -509,31 +633,36 @@ static int relay_has_changed (relay_server *new, relay_server *old)
|
||||
* returned list contains relays that should be kept running, current contains
|
||||
* the list of relays to shutdown
|
||||
*/
|
||||
static relay_server *
|
||||
update_relay_set(relay_server **current, relay_server *updated)
|
||||
static relay_t *
|
||||
update_relay_set(relay_t **current, relay_config_t **updated, size_t updated_length)
|
||||
{
|
||||
relay_server *relay = updated;
|
||||
relay_server *existing_relay, **existing_p;
|
||||
relay_server *new_list = NULL;
|
||||
relay_config_t *relay;
|
||||
relay_t *existing_relay, **existing_p;
|
||||
relay_t *new_list = NULL;
|
||||
size_t i;
|
||||
|
||||
for (i = 0; i < updated_length; i++) {
|
||||
relay = updated[i];
|
||||
|
||||
while (relay)
|
||||
{
|
||||
existing_relay = *current;
|
||||
existing_p = current;
|
||||
|
||||
while (existing_relay)
|
||||
{
|
||||
/* break out if keeping relay */
|
||||
if (strcmp (relay->localmount, existing_relay->localmount) == 0)
|
||||
if (relay_has_changed (relay, existing_relay) == 0)
|
||||
if (strcmp(relay->localmount, existing_relay->config->localmount) == 0)
|
||||
if (relay_has_changed(relay, existing_relay->config) == 0)
|
||||
break;
|
||||
existing_p = &existing_relay->next;
|
||||
|
||||
existing_relay = existing_relay->next;
|
||||
}
|
||||
|
||||
|
||||
if (existing_relay == NULL)
|
||||
{
|
||||
/* new one, copy and insert */
|
||||
existing_relay = relay_copy (relay);
|
||||
existing_relay = relay_new(relay);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -541,8 +670,8 @@ update_relay_set(relay_server **current, relay_server *updated)
|
||||
}
|
||||
existing_relay->next = new_list;
|
||||
new_list = existing_relay;
|
||||
relay = relay->next;
|
||||
}
|
||||
|
||||
return new_list;
|
||||
}
|
||||
|
||||
@ -551,12 +680,12 @@ update_relay_set(relay_server **current, relay_server *updated)
|
||||
* are added to the list, and any not listed in the provided new_relay_list
|
||||
* are separated and returned in a separate list
|
||||
*/
|
||||
static relay_server *
|
||||
update_relays (relay_server **relay_list, relay_server *new_relay_list)
|
||||
static relay_t *
|
||||
update_relays (relay_t **relay_list, relay_config_t **new_relay_list, size_t new_relay_list_length)
|
||||
{
|
||||
relay_server *active_relays, *cleanup_relays;
|
||||
relay_t *active_relays, *cleanup_relays;
|
||||
|
||||
active_relays = update_relay_set(relay_list, new_relay_list);
|
||||
active_relays = update_relay_set(relay_list, new_relay_list, new_relay_list_length);
|
||||
|
||||
cleanup_relays = *relay_list;
|
||||
/* re-assign new set */
|
||||
@ -566,10 +695,10 @@ update_relays (relay_server **relay_list, relay_server *new_relay_list)
|
||||
}
|
||||
|
||||
|
||||
static void relay_check_streams (relay_server *to_start,
|
||||
relay_server *to_free, int skip_timer)
|
||||
static void relay_check_streams (relay_t *to_start,
|
||||
relay_t *to_free, int skip_timer)
|
||||
{
|
||||
relay_server *relay;
|
||||
relay_t *relay;
|
||||
|
||||
while (to_free)
|
||||
{
|
||||
@ -578,13 +707,13 @@ static void relay_check_streams (relay_server *to_start,
|
||||
if (to_free->running)
|
||||
{
|
||||
/* relay has been removed from xml, shut down active relay */
|
||||
ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->localmount);
|
||||
ICECAST_LOG_DEBUG("source shutdown request on \"%s\"", to_free->config->localmount);
|
||||
to_free->running = 0;
|
||||
to_free->source->running = 0;
|
||||
thread_join (to_free->thread);
|
||||
}
|
||||
else
|
||||
stats_event (to_free->localmount, NULL, NULL);
|
||||
stats_event (to_free->config->localmount, NULL, NULL);
|
||||
}
|
||||
to_free = relay_free (to_free);
|
||||
}
|
||||
@ -610,9 +739,12 @@ static int update_from_master(ice_config_t *config)
|
||||
do
|
||||
{
|
||||
char *authheader, *data;
|
||||
relay_server *new_relays = NULL, *cleanup_relays;
|
||||
relay_t *cleanup_relays;
|
||||
relay_config_t **new_relays = NULL;
|
||||
size_t new_relays_length = 0;
|
||||
int len, count = 1;
|
||||
int on_demand;
|
||||
size_t i;
|
||||
|
||||
username = strdup(config->master_username);
|
||||
if (config->master_password)
|
||||
@ -664,7 +796,9 @@ static int update_from_master(ice_config_t *config)
|
||||
}
|
||||
while (sock_read_line(mastersock, buf, sizeof(buf)))
|
||||
{
|
||||
relay_server *r;
|
||||
relay_config_t *c = NULL;
|
||||
relay_config_t **n;
|
||||
|
||||
if (!strlen(buf))
|
||||
continue;
|
||||
ICECAST_LOG_DEBUG("read %d from master \"%s\"", count++, buf);
|
||||
@ -673,40 +807,47 @@ static int update_from_master(ice_config_t *config)
|
||||
ICECAST_LOG_DEBUG("Error while parsing line from master. Ignoring line.");
|
||||
continue;
|
||||
}
|
||||
r = calloc (1, sizeof (relay_server));
|
||||
if (r)
|
||||
{
|
||||
if (parsed_uri->server != NULL)
|
||||
{
|
||||
r->server = strdup(parsed_uri->server);
|
||||
if (parsed_uri->port == 0)
|
||||
r->port = 80;
|
||||
else
|
||||
r->port = parsed_uri->port;
|
||||
}
|
||||
else
|
||||
{
|
||||
r->server = (char *)xmlCharStrdup (master);
|
||||
r->port = port;
|
||||
|
||||
n = realloc(new_relays, sizeof(*new_relays)*(new_relays_length + 1));
|
||||
if (n) {
|
||||
new_relays = n;
|
||||
|
||||
c = calloc(1, sizeof(*c));
|
||||
new_relays[new_relays_length++] = c;
|
||||
}
|
||||
|
||||
if (c) {
|
||||
if (parsed_uri->server != NULL) {
|
||||
c->upstream_default.server = strdup(parsed_uri->server);
|
||||
if (parsed_uri->port == 0) {
|
||||
c->upstream_default.port = 80;
|
||||
} else {
|
||||
c->upstream_default.port = parsed_uri->port;
|
||||
}
|
||||
} else {
|
||||
c->upstream_default.server = (char *)xmlCharStrdup (master);
|
||||
c->upstream_default.port = port;
|
||||
}
|
||||
|
||||
r->mount = strdup(parsed_uri->path);
|
||||
r->localmount = strdup(parsed_uri->path);
|
||||
r->mp3metadata = 1;
|
||||
r->on_demand = on_demand;
|
||||
r->next = new_relays;
|
||||
ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", r->server, r->port, r->mount);
|
||||
new_relays = r;
|
||||
c->upstream_default.mount = strdup(parsed_uri->path);
|
||||
c->localmount = strdup(parsed_uri->path);
|
||||
c->upstream_default.mp3metadata = 1;
|
||||
c->on_demand = on_demand;
|
||||
ICECAST_LOG_DEBUG("Added relay host=\"%s\", port=%d, mount=\"%s\"", c->upstream_default.server, c->upstream_default.port, c->upstream_default.mount);
|
||||
}
|
||||
xmlFreeURI(parsed_uri);
|
||||
}
|
||||
sock_close (mastersock);
|
||||
|
||||
thread_mutex_lock (&(config_locks()->relay_lock));
|
||||
cleanup_relays = update_relays (&global.master_relays, new_relays);
|
||||
cleanup_relays = update_relays (&global.master_relays, new_relays, new_relays_length);
|
||||
|
||||
relay_check_streams (global.master_relays, cleanup_relays, 0);
|
||||
relay_check_streams (NULL, new_relays, 0);
|
||||
|
||||
for (i = 0; i < new_relays_length; i++) {
|
||||
relay_config_free(new_relays[i]);
|
||||
}
|
||||
free(new_relays);
|
||||
|
||||
thread_mutex_unlock (&(config_locks()->relay_lock));
|
||||
|
||||
@ -742,7 +883,7 @@ static void *_slave_thread(void *arg)
|
||||
|
||||
while (1)
|
||||
{
|
||||
relay_server *cleanup_relays = NULL;
|
||||
relay_t *cleanup_relays = NULL;
|
||||
int skip_timer = 0;
|
||||
|
||||
/* re-read xml file if requested */
|
||||
@ -778,7 +919,7 @@ static void *_slave_thread(void *arg)
|
||||
|
||||
thread_mutex_lock (&(config_locks()->relay_lock));
|
||||
|
||||
cleanup_relays = update_relays (&global.relays, config->relay);
|
||||
cleanup_relays = update_relays(&global.relays, config->relay, config->relay_length);
|
||||
|
||||
config_release_config();
|
||||
}
|
||||
|
22
src/slave.h
22
src/slave.h
@ -15,29 +15,13 @@
|
||||
|
||||
#include "common/thread/thread.h"
|
||||
#include "icecasttypes.h"
|
||||
|
||||
struct _relay_server {
|
||||
char *server;
|
||||
int port;
|
||||
char *mount;
|
||||
char *username;
|
||||
char *password;
|
||||
char *localmount;
|
||||
char *bind;
|
||||
source_t *source;
|
||||
int mp3metadata;
|
||||
int on_demand;
|
||||
int running;
|
||||
int cleanup;
|
||||
time_t start;
|
||||
thread_type *thread;
|
||||
relay_server *next;
|
||||
};
|
||||
#include "cfgfile.h"
|
||||
|
||||
void slave_initialize(void);
|
||||
void slave_shutdown(void);
|
||||
void slave_update_all_mounts (void);
|
||||
void slave_rebuild_mounts (void);
|
||||
relay_server *relay_free (relay_server *relay);
|
||||
void relay_config_free (relay_config_t *relay);
|
||||
relay_t *relay_free (relay_t *relay);
|
||||
|
||||
#endif /* __SLAVE_H__ */
|
||||
|
Loading…
x
Reference in New Issue
Block a user