mirror of
https://gitlab.xiph.org/xiph/icecast-server.git
synced 2024-11-03 04:17:17 -05:00
slave update. Make relay threads attached. Allow for rescanning the current
relays so that master server queries are not triggered when not required. Each relay is started independently now. some comment and variable cleanups svn path=/icecast/branches/kh/icecast/; revision=7431
This commit is contained in:
parent
1a5effb4bd
commit
6aa93abc03
218
src/slave.c
218
src/slave.c
@ -62,7 +62,8 @@
|
||||
static void *_slave_thread(void *arg);
|
||||
thread_type *_slave_thread_id;
|
||||
static int slave_running = 0;
|
||||
static unsigned max_interval = 0;
|
||||
static unsigned int max_interval = 0;
|
||||
static int rescan_relays = 0;
|
||||
|
||||
relay_server *relay_free (relay_server *relay)
|
||||
{
|
||||
@ -99,28 +100,21 @@ relay_server *relay_copy (relay_server *r)
|
||||
}
|
||||
|
||||
|
||||
static void *_relay_thread (void *arg)
|
||||
{
|
||||
relay_server *relay = arg;
|
||||
|
||||
relay->running = 1;
|
||||
stats_event_inc(NULL, "source_relay_connections");
|
||||
|
||||
source_main (relay->source);
|
||||
|
||||
relay->running = 0;
|
||||
if (relay->cleanup)
|
||||
relay_free (relay);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/* force a recheck of the relays. This will recheck the master server if
|
||||
* a this is a slave.
|
||||
*/
|
||||
void slave_recheck (void)
|
||||
{
|
||||
max_interval = 0;
|
||||
}
|
||||
|
||||
/* rescan the current relays to see if any need starting or if any
|
||||
* relay threads have terminated
|
||||
*/
|
||||
void slave_rescan (void)
|
||||
{
|
||||
rescan_relays = 1;
|
||||
}
|
||||
|
||||
void slave_initialize(void)
|
||||
{
|
||||
@ -134,22 +128,10 @@ void slave_initialize(void)
|
||||
|
||||
void slave_shutdown(void)
|
||||
{
|
||||
relay_server *relay;
|
||||
|
||||
if (!slave_running)
|
||||
return;
|
||||
slave_running = 0;
|
||||
thread_join (_slave_thread_id);
|
||||
|
||||
relay = global.relays;
|
||||
while (relay)
|
||||
relay = relay_free (relay);
|
||||
global.relays = NULL;
|
||||
|
||||
relay = global.master_relays;
|
||||
while (relay)
|
||||
relay = relay_free (relay);
|
||||
global.master_relays = NULL;
|
||||
}
|
||||
|
||||
|
||||
@ -198,23 +180,22 @@ int slave_redirect (char *mountpoint, client_t *client)
|
||||
/* This does the actual connection for a relay. A thread is
|
||||
* started off if a connection can be acquired
|
||||
*/
|
||||
static void start_relay_stream (relay_server *relay)
|
||||
static void *start_relay_stream (void *arg)
|
||||
{
|
||||
relay_server *relay = arg;
|
||||
sock_t streamsock = SOCK_ERROR;
|
||||
source_t *src = relay->source;
|
||||
http_parser_t *parser = NULL;
|
||||
connection_t *con=NULL;
|
||||
char header[4096];
|
||||
|
||||
if (relay->on_demand && src->on_demand_req == 0)
|
||||
return;
|
||||
|
||||
relay->running = 1;
|
||||
INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
|
||||
do
|
||||
{
|
||||
char *auth_header;
|
||||
|
||||
streamsock = sock_connect_wto (relay->server, relay->port, 30);
|
||||
streamsock = sock_connect_wto (relay->server, relay->port, 10);
|
||||
if (streamsock == SOCK_ERROR)
|
||||
{
|
||||
WARN3("Failed to relay stream from master server, couldn't connect to http://%s:%d%s",
|
||||
@ -279,9 +260,15 @@ static void start_relay_stream (relay_server *relay)
|
||||
DEBUG0("Failed to complete source initialisation");
|
||||
break;
|
||||
}
|
||||
thread_create ("Relay Thread", _relay_thread, relay, THREAD_DETACHED);
|
||||
stats_event_inc(NULL, "source_relay_connections");
|
||||
|
||||
return;
|
||||
source_main (relay->source);
|
||||
|
||||
/* initiate an immediate relay cleanup run */
|
||||
relay->cleanup = 1;
|
||||
slave_rescan();
|
||||
|
||||
return NULL;
|
||||
} while (0);
|
||||
|
||||
if (con == NULL && streamsock != SOCK_ERROR)
|
||||
@ -293,6 +280,12 @@ static void start_relay_stream (relay_server *relay)
|
||||
httpp_destroy (parser);
|
||||
src->parser = NULL;
|
||||
source_clear_source (relay->source);
|
||||
|
||||
/* initiate an immediate relay cleanup run */
|
||||
relay->cleanup = 1;
|
||||
slave_rescan();
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
@ -302,18 +295,37 @@ static void check_relay_stream (relay_server *relay)
|
||||
if (relay->source == NULL)
|
||||
{
|
||||
/* new relay, reserve the name */
|
||||
DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
|
||||
relay->source = source_reserve (relay->localmount);
|
||||
if (relay->source)
|
||||
{
|
||||
DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
|
||||
if (relay->on_demand)
|
||||
DEBUG0 ("setting on_demand");
|
||||
relay->source->on_demand = relay->on_demand;
|
||||
}
|
||||
else
|
||||
WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
|
||||
}
|
||||
if (relay->source && !relay->running)
|
||||
do
|
||||
{
|
||||
start_relay_stream (relay);
|
||||
if (relay->source == NULL || relay->running)
|
||||
break;
|
||||
if (relay->on_demand && relay->source->on_demand_req == 0)
|
||||
break;
|
||||
|
||||
relay->thread = thread_create ("Relay Thread", start_relay_stream,
|
||||
relay, THREAD_ATTACHED);
|
||||
return;
|
||||
|
||||
} while (0);
|
||||
/* the relay thread may of close down */
|
||||
if (relay->cleanup && relay->thread)
|
||||
{
|
||||
DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
|
||||
thread_join (relay->thread);
|
||||
relay->thread = NULL;
|
||||
relay->cleanup = 0;
|
||||
relay->running = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@ -336,6 +348,7 @@ update_relay_set (relay_server **current, relay_server *updated)
|
||||
|
||||
while (existing_relay)
|
||||
{
|
||||
/* break out if keeping relay */
|
||||
if (strcmp (relay->localmount, existing_relay->localmount) == 0)
|
||||
break;
|
||||
existing_p = &existing_relay->next;
|
||||
@ -360,33 +373,44 @@ update_relay_set (relay_server **current, relay_server *updated)
|
||||
|
||||
/* update the relay_list with entries from new_relay_list. Any new relays
|
||||
* are added to the list, and any not listed in the provided new_relay_list
|
||||
* get marked for shutting down, just in case they are not shutting down by
|
||||
* themselves
|
||||
* are separated an returned in a separate list
|
||||
*/
|
||||
static void
|
||||
static relay_server *
|
||||
update_relays (relay_server **relay_list, relay_server *new_relay_list)
|
||||
{
|
||||
relay_server *relay, *current;
|
||||
relay_server *active_relays, *cleanup_relays;
|
||||
|
||||
current = update_relay_set (relay_list, new_relay_list);
|
||||
active_relays = update_relay_set (relay_list, new_relay_list);
|
||||
|
||||
/* ok whats left, lets make sure they shut down */
|
||||
relay = *relay_list;
|
||||
cleanup_relays = *relay_list;
|
||||
/* re-assign new set */
|
||||
*relay_list = active_relays;
|
||||
|
||||
return cleanup_relays;
|
||||
}
|
||||
|
||||
|
||||
static void relay_check_streams (relay_server *to_start, relay_server *to_free)
|
||||
{
|
||||
relay_server *relay;
|
||||
|
||||
while (to_free)
|
||||
{
|
||||
if (to_free->running && to_free->source)
|
||||
{
|
||||
DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
|
||||
to_free->source->running = 0;
|
||||
thread_join (to_free->thread);
|
||||
}
|
||||
to_free = relay_free (to_free);
|
||||
}
|
||||
|
||||
relay = to_start;
|
||||
while (relay)
|
||||
{
|
||||
relay->cleanup = 1;
|
||||
if (relay->source)
|
||||
{
|
||||
if (relay->source->running)
|
||||
DEBUG1 ("requested %s to shut down", relay->source->mount);
|
||||
relay->source->running = 0;
|
||||
relay = relay->next;
|
||||
}
|
||||
else
|
||||
relay = relay_free (relay);
|
||||
check_relay_stream (relay);
|
||||
relay = relay->next;
|
||||
}
|
||||
/* re-assign new set */
|
||||
*relay_list = current;
|
||||
}
|
||||
|
||||
|
||||
@ -400,7 +424,7 @@ static int update_from_master(ice_config_t *config)
|
||||
do
|
||||
{
|
||||
char *authheader, *data;
|
||||
relay_server *relays = NULL, *relay;
|
||||
relay_server *new_relays = NULL, *cleanup_relays;
|
||||
int len, count = 1;
|
||||
int on_demand, send_auth;
|
||||
|
||||
@ -474,23 +498,20 @@ static int update_from_master(ice_config_t *config)
|
||||
r->username = xmlStrdup (username);
|
||||
r->password = xmlStrdup (password);
|
||||
}
|
||||
r->next = relays;
|
||||
relays = r;
|
||||
r->next = new_relays;
|
||||
new_relays = r;
|
||||
}
|
||||
}
|
||||
sock_close (mastersock);
|
||||
|
||||
update_relays (&global.master_relays, relays);
|
||||
/* start any inactive relays */
|
||||
relay = global.master_relays;
|
||||
while (relay)
|
||||
{
|
||||
check_relay_stream (relay);
|
||||
relay = relay->next;
|
||||
}
|
||||
relay = relays;
|
||||
while (relay)
|
||||
relay = relay_free (relay);
|
||||
thread_mutex_lock (&(config_locks()->relay_lock));
|
||||
cleanup_relays = update_relays (&global.master_relays, new_relays);
|
||||
|
||||
relay_check_streams (global.master_relays, cleanup_relays);
|
||||
relay_check_streams (NULL, new_relays);
|
||||
|
||||
thread_mutex_unlock (&(config_locks()->relay_lock));
|
||||
|
||||
} while(0);
|
||||
|
||||
if (master)
|
||||
@ -507,39 +528,50 @@ static int update_from_master(ice_config_t *config)
|
||||
static void *_slave_thread(void *arg)
|
||||
{
|
||||
ice_config_t *config;
|
||||
relay_server *relay;
|
||||
unsigned interval = 0;
|
||||
unsigned int interval = 0;
|
||||
|
||||
while (slave_running)
|
||||
{
|
||||
relay_server *cleanup_relays;
|
||||
|
||||
thread_sleep (1000000);
|
||||
if (max_interval > ++interval)
|
||||
if (rescan_relays == 0 && max_interval > ++interval)
|
||||
continue;
|
||||
|
||||
interval = 0;
|
||||
config = config_get_config();
|
||||
|
||||
max_interval = config->master_update_interval;
|
||||
|
||||
/* the connection could time some time, so the lock can drop */
|
||||
if (update_from_master (config))
|
||||
/* only update relays lists when required */
|
||||
if (max_interval <= interval)
|
||||
{
|
||||
config = config_get_config();
|
||||
|
||||
thread_mutex_lock (&(config_locks()->relay_lock));
|
||||
interval = 0;
|
||||
max_interval = config->master_update_interval;
|
||||
|
||||
update_relays (&global.relays, config->relay);
|
||||
/* the connection could take some time, so the lock can drop */
|
||||
if (update_from_master (config))
|
||||
config = config_get_config();
|
||||
|
||||
config_release_config();
|
||||
thread_mutex_lock (&(config_locks()->relay_lock));
|
||||
|
||||
/* start any inactive relays */
|
||||
relay = global.relays;
|
||||
while (relay)
|
||||
{
|
||||
check_relay_stream (relay);
|
||||
relay = relay->next;
|
||||
cleanup_relays = update_relays (&global.relays, config->relay);
|
||||
|
||||
config_release_config();
|
||||
|
||||
relay_check_streams (global.relays, cleanup_relays);
|
||||
thread_mutex_unlock (&(config_locks()->relay_lock));
|
||||
}
|
||||
thread_mutex_unlock (&(config_locks()->relay_lock));
|
||||
else
|
||||
{
|
||||
thread_mutex_lock (&(config_locks()->relay_lock));
|
||||
relay_check_streams (global.master_relays, NULL);
|
||||
relay_check_streams (global.relays, NULL);
|
||||
thread_mutex_unlock (&(config_locks()->relay_lock));
|
||||
}
|
||||
rescan_relays = 0;
|
||||
}
|
||||
DEBUG0 ("shutting down current relays");
|
||||
relay_check_streams (NULL, global.relays);
|
||||
relay_check_streams (NULL, global.master_relays);
|
||||
|
||||
INFO0 ("Slave thread shutdown complete");
|
||||
|
||||
return NULL;
|
||||
|
@ -14,6 +14,7 @@
|
||||
#define __SLAVE_H__
|
||||
|
||||
#include <client.h>
|
||||
#include <thread/thread.h>
|
||||
|
||||
typedef struct _relay_server {
|
||||
char *server;
|
||||
@ -27,6 +28,7 @@ typedef struct _relay_server {
|
||||
int on_demand;
|
||||
int running;
|
||||
int cleanup;
|
||||
thread_type *thread;
|
||||
struct _relay_server *next;
|
||||
} relay_server;
|
||||
|
||||
@ -40,6 +42,7 @@ typedef struct _slave_host
|
||||
void slave_initialize(void);
|
||||
void slave_shutdown(void);
|
||||
void slave_recheck (void);
|
||||
void slave_rescan (void);
|
||||
int slave_redirect (char *mountpoint, client_t *client);
|
||||
relay_server *relay_free (relay_server *relay);
|
||||
|
||||
|
@ -399,7 +399,7 @@ void source_move_clients (source_t *source, source_t *dest)
|
||||
if (dest->running == 0 && dest->on_demand)
|
||||
{
|
||||
dest->on_demand_req = 1;
|
||||
slave_recheck();
|
||||
slave_rescan();
|
||||
}
|
||||
thread_mutex_unlock (&dest->lock);
|
||||
thread_mutex_unlock (&move_clients_mutex);
|
||||
@ -820,7 +820,7 @@ void add_authenticated_client (source_t *source, client_t *client)
|
||||
/* enable on-demand relay to start, wake up the slave thread */
|
||||
DEBUG0("kicking off on-demand relay");
|
||||
source->on_demand_req = 1;
|
||||
slave_recheck();
|
||||
slave_rescan();
|
||||
}
|
||||
DEBUG1 ("Added client to pending on %s", source->mount);
|
||||
source->check_pending = 1;
|
||||
|
Loading…
Reference in New Issue
Block a user