1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-06-23 06:25:24 +00:00
icecast-server/src/yp.c
2022-03-22 15:35:31 +00:00

984 lines
28 KiB
C

/* Icecast
*
* This program is distributed under the GNU General Public License, version 2.
* A copy of this license is included with this source.
*
* Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
* Michael Smith <msmith@xiph.org>,
* oddsock <oddsock@xiph.org>,
* Karl Heyes <karl@xiph.org>
* and others (see AUTHORS for details).
* Copyright 2013-2022, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
*/
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <igloo/prng.h>
#include "common/thread/thread.h"
#include "yp.h"
#include "global.h"
#include "curl.h"
#include "logging.h"
#include "source.h"
#include "cfgfile.h"
#include "stats.h"
#include "listensocket.h"
#ifdef WIN32
#define snprintf _snprintf
#endif
#define CATMODULE "yp"
typedef enum {
YP_SERVER_NAME,
YP_SERVER_DESC,
YP_SERVER_GENRE,
YP_SERVER_URL,
YP_BITRATE,
YP_AUDIO_INFO,
YP_SERVER_TYPE,
YP_CURRENT_SONG,
YP_CLUSTER_PASSWORD,
YP_SUBTYPE,
YP_AUDIO_SAMPLERATE,
YP_AUDIO_CHANNELS
} yp_param_type_t;
struct yp_server {
char *url;
char *server_id;
unsigned url_timeout;
unsigned touch_interval;
int remove;
char *listen_socket_id;
CURL *curl;
struct ypdata_tag *mounts, *pending_mounts;
struct yp_server *next;
char curl_error[CURL_ERROR_SIZE];
};
typedef struct ypdata_tag {
int remove;
int release;
int cmd_ok;
char *sid;
char *mount;
char *listen_url;
char *cluster_password;
char *current_song; /* do_yp_touch: artist & title */
/* ---[ From stats ]--- */
char *server_type; /* do_yp_add: server_type */
char *server_name; /* do_yp_add: server_name */
char *url; /* do_yp_add: server_url */
char *server_genre; /* do_yp_add: genre */
char *bitrate; /* do_yp_add: audio_bitrate || ice-bitrate */
char *server_desc; /* do_yp_add: server_description */
char *audio_info; /* do_yp_add: audio_info */
char *subtype; /* update_yp_info: subtype*/
char *audio_samplerate; /* update_yp_info: audio_samplerate */
char *audio_channels; /* update_yp_info: audio_channels */
/* ---[ END from stats ]--- */
struct yp_server *server;
time_t next_update;
unsigned touch_interval;
char *error_msg;
int (*process)(struct ypdata_tag *yp, char *s, unsigned len);
struct ypdata_tag *next;
} ypdata_t;
static rwlock_t yp_lock;
static mutex_t yp_pending_lock;
static volatile struct yp_server *active_yps = NULL, *pending_yps = NULL;
static volatile int yp_update = 0;
static int yp_running;
static time_t now;
static thread_type *yp_thread;
static volatile unsigned client_limit = 0;
static volatile char *server_version = NULL;
static void *yp_update_thread(void *arg);
static void update_yp_info(ypdata_t *yp);
static void add_yp_info(ypdata_t *yp, void *info, yp_param_type_t type);
static int do_yp_remove(ypdata_t *yp, char *s, unsigned len);
static int do_yp_add(ypdata_t *yp, char *s, unsigned len);
static int do_yp_touch(ypdata_t *yp, char *s, unsigned len);
static void add_pending_yp (struct yp_server *server);
static void delete_marked_yp(struct yp_server *server);
static void yp_destroy_ypdata(ypdata_t *ypdata);
/* curl callback used to parse headers coming back from the YP server */
static size_t handle_returned_header (void *ptr, size_t size, size_t nmemb, void *stream)
{
ypdata_t *yp = stream;
size_t bytes = size * nmemb;
igloo_prng_write(igloo_instance, ptr, bytes, -1, igloo_PRNG_FLAG_NONE);
/* ICECAST_LOG_DEBUG("header from YP is \"%.*s\"", bytes, ptr); */
if (strncasecmp(ptr, "YPResponse: 1", 13) == 0)
yp->cmd_ok = 1;
if (strncasecmp(ptr, "YPMessage: ", 11) == 0) {
size_t len = bytes - 11;
free(yp->error_msg);
yp->error_msg = calloc (1, len);
if (yp->error_msg)
sscanf (ptr + 11, "%[^\r\n]", yp->error_msg);
}
if (yp->process == do_yp_add) {
if (strncasecmp(ptr, "SID: ", 5) == 0) {
size_t len = bytes - 5;
free(yp->sid);
yp->sid = calloc (1, len);
if (yp->sid)
sscanf (ptr + 5, "%[^\r\n]", yp->sid);
}
}
if (strncasecmp(ptr, "TouchFreq: ", 11) == 0) {
unsigned secs;
if ( sscanf (ptr + 11, "%u", &secs) != 1 )
secs = 0;
if (secs < 30)
secs = 30;
ICECAST_LOG_DEBUG("server touch interval is %u", secs);
yp->touch_interval = secs;
}
return (size_t)bytes;
}
/* search the active and pending YP server lists */
static struct yp_server *find_yp_server (const char *url)
{
struct yp_server *server;
server = (struct yp_server *)active_yps;
while (server) {
if (strcmp(server->url, url) == 0)
return server;
server = server->next;
}
server = (struct yp_server *)pending_yps;
while (server) {
if (strcmp(server->url, url) == 0)
break;
server = server->next;
}
return server;
}
static void destroy_yp_server (struct yp_server *server)
{
ypdata_t *yp;
if (server == NULL)
return;
ICECAST_LOG_DEBUG("Removing YP server entry for %s", server->url);
/* delete yps:
* first move all pendings into main queue.
* then mark all main queue entries for deleting.
* then remove all marked entries.
*/
add_pending_yp(server);
yp = server->mounts;
while (yp) {
yp->remove = 1;
yp = yp->next;
}
delete_marked_yp(server);
icecast_curl_free(server->curl);
if (server->mounts) ICECAST_LOG_WARN("active ypdata not freed");
if (server->pending_mounts) ICECAST_LOG_WARN("pending ypdata not freed");
free(server->url);
free(server->server_id);
free(server);
}
/* search for a ypdata entry corresponding to a specific mountpoint */
static ypdata_t *find_yp_mount (ypdata_t *mounts, const char *mount)
{
ypdata_t *yp = mounts;
while (yp) {
if (strcmp (yp->mount, mount) == 0)
break;
yp = yp->next;
}
return yp;
}
void yp_recheck_config (ice_config_t *config)
{
struct yp_server *server;
ICECAST_LOG_DEBUG("Updating YP configuration");
thread_rwlock_rlock(&yp_lock);
server = (struct yp_server *)active_yps;
while (server) {
server->remove = 1;
server = server->next;
}
client_limit = config->client_limit;
free((char*)server_version);
server_version = strdup(config->server_id);
/* for each yp url in config, check to see if one exists
if not, then add it. */
for (yp_directory_t *yp = config->yp_directories; yp; yp = yp->next) {
server = find_yp_server (yp->url);
if (server == NULL) {
server = calloc (1, sizeof (struct yp_server));
if (server == NULL) {
destroy_yp_server (server);
break;
}
server->server_id = strdup((char *)server_version);
server->url = strdup(yp->url);
server->url_timeout = yp->timeout;
server->touch_interval = yp->touch_interval;
server->listen_socket_id = yp->listen_socket_id;
server->curl = icecast_curl_new(server->url, &(server->curl_error[0]));
if (server->curl == NULL) {
destroy_yp_server (server);
break;
}
if (server->url_timeout > 10 || server->url_timeout < 1)
server->url_timeout = 6;
if (server->touch_interval < 30)
server->touch_interval = 30;
curl_easy_setopt (server->curl, CURLOPT_HEADERFUNCTION, handle_returned_header);
server->next = (struct yp_server *)pending_yps;
pending_yps = server;
ICECAST_LOG_INFO("Adding new YP server \"%s\" (timeout %ds, default interval %ds)",
server->url, server->url_timeout, server->touch_interval);
} else {
server->remove = 0;
}
}
thread_rwlock_unlock(&yp_lock);
thread_rwlock_wlock(&yp_lock);
yp_update = 1;
thread_rwlock_unlock(&yp_lock);
}
void yp_initialize(void)
{
ice_config_t *config = config_get_config();
thread_rwlock_create(&yp_lock);
thread_mutex_create(&yp_pending_lock);
yp_recheck_config(config);
config_release_config();
yp_thread = thread_create("YP Touch Thread", yp_update_thread,
(void *)NULL, THREAD_ATTACHED);
}
/* handler for curl, checks if successful handling occurred
* return 0 for ok, -1 for this entry failed, -2 for server fail.
* On failure case, update and process are modified
*/
static int send_to_yp (const char *cmd, ypdata_t *yp, char *post)
{
int curlcode;
struct yp_server *server = yp->server;
/* ICECAST_LOG_DEBUG("send YP (%s):%s", cmd, post); */
yp->cmd_ok = 0;
curl_easy_setopt (server->curl, CURLOPT_POSTFIELDS, post);
curl_easy_setopt (server->curl, CURLOPT_WRITEHEADER, yp);
curlcode = curl_easy_perform (server->curl);
if (curlcode) {
yp->process = do_yp_add;
yp->next_update = now + 1200;
ICECAST_LOG_ERROR("connection to %s failed with \"%s\"", server->url, server->curl_error);
return -2;
}
if (yp->cmd_ok == 0) {
if (yp->error_msg == NULL)
yp->error_msg = strdup("no response from server");
if (yp->process == do_yp_add) {
ICECAST_LOG_ERROR("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
yp->next_update = now + 7200;
}
if (yp->process == do_yp_touch) {
/* At this point the touch request failed, either because they rejected our session
* or the server isn't accessible. This means we have to wait before doing another
* add request. We have a minimum delay but we could allow the directory server to
* give us a wait time using the TouchFreq header. This time could be given in such
* cases as a firewall block or incorrect listenurl.
*/
if (yp->touch_interval < 1200)
yp->next_update = now + 1200;
else
yp->next_update = now + yp->touch_interval;
ICECAST_LOG_INFO("YP %s on %s failed: %s", cmd, server->url, yp->error_msg);
}
yp->process = do_yp_add;
free(yp->sid);
yp->sid = NULL;
return -1;
}
ICECAST_LOG_DEBUG("YP %s at %s succeeded", cmd, server->url);
return 0;
}
/* routines for building and issues requests to the YP server */
static int do_yp_remove (ypdata_t *yp, char *s, unsigned len)
{
int ret = 0;
if (yp->sid) {
ret = snprintf (s, len, "action=remove&sid=%s", yp->sid);
if (ret >= (signed)len)
return ret+1;
ICECAST_LOG_INFO("clearing up YP entry for %s", yp->mount);
ret = send_to_yp ("remove", yp, s);
free(yp->sid);
yp->sid = NULL;
}
yp->remove = 1;
yp->process = do_yp_add;
yp_update = 1;
return ret;
}
static int do_yp_add (ypdata_t *yp, char *s, unsigned len)
{
int ret;
char *value;
ice_config_t *config;
char *admin;
config = config_get_config();
admin = util_url_escape(config->admin);
config_release_config();
value = stats_get_value(yp->mount, "server_type");
add_yp_info(yp, value, YP_SERVER_TYPE);
free(value);
value = stats_get_value(yp->mount, "server_name");
add_yp_info(yp, value, YP_SERVER_NAME);
free(value);
value = stats_get_value(yp->mount, "server_url");
add_yp_info(yp, value, YP_SERVER_URL);
free(value);
value = stats_get_value(yp->mount, "genre");
add_yp_info(yp, value, YP_SERVER_GENRE);
free(value);
value = stats_get_value(yp->mount, "audio_bitrate");
if (value == NULL)
value = stats_get_value(yp->mount, "ice-bitrate");
add_yp_info(yp, value, YP_BITRATE);
free(value);
value = stats_get_value(yp->mount, "server_description");
add_yp_info(yp, value, YP_SERVER_DESC);
free(value);
value = stats_get_value(yp->mount, "audio_info");
add_yp_info(yp, value, YP_AUDIO_INFO);
free(value);
update_yp_info(yp);
ret = snprintf(s, len, "action=add&admin=%s&sn=%s&genre=%s&cpswd=%s&desc="
"%s&url=%s&listenurl=%s&type=%s&stype=%s&b=%s&samplerate=%s&channels=%s&audioinfo=%s",
admin,
yp->server_name, yp->server_genre, yp->cluster_password,
yp->server_desc, yp->url, yp->listen_url,
yp->server_type, yp->subtype, yp->bitrate,
yp->audio_samplerate, yp->audio_channels, yp->audio_info);
free(admin);
if (ret >= (signed)len)
return ret+1;
ret = send_to_yp("add", yp, s);
if (ret == 0) {
yp->process = do_yp_touch;
/* force first touch in 5 secs */
yp->next_update = time(NULL) + 5;
}
return ret;
}
static int do_yp_touch (ypdata_t *yp, char *s, unsigned len)
{
unsigned listeners = 0, max_listeners = 1;
char *val, *artist, *title;
int ret;
artist = (char *)stats_get_value(yp->mount, "artist");
title = (char *)stats_get_value(yp->mount, "title");
if (artist || title) {
char *song;
const char *separator = " - ";
if (artist == NULL) {
artist = strdup("");
separator = "";
}
if (title == NULL) title = strdup("");
song = malloc(strlen(artist) + strlen(title) + strlen(separator) + 1);
if (song) {
sprintf(song, "%s%s%s", artist, separator, title);
add_yp_info(yp, song, YP_CURRENT_SONG);
stats_event(yp->mount, "yp_currently_playing", song);
free(song);
}
}
free(artist);
free(title);
val = (char *)stats_get_value(yp->mount, "listeners");
if (val) {
listeners = atoi(val);
free(val);
}
val = stats_get_value(yp->mount, "max_listeners");
if (val == NULL || strcmp(val, "unlimited") == 0 || atoi(val) < 0) {
max_listeners = client_limit;
} else {
max_listeners = atoi(val);
}
free(val);
update_yp_info(yp);
ret = snprintf (s, len, "action=touch&sid=%s&st=%s"
"&listeners=%u&max_listeners=%u&stype=%s&samplerate=%s&channels=%s",
yp->sid, yp->current_song, listeners, max_listeners, yp->subtype,
yp->audio_samplerate, yp->audio_channels);
if (ret >= (signed)len)
return ret+1; /* space required for above text and nul*/
if (send_to_yp ("touch", yp, s) == 0) {
yp->next_update = now + yp->touch_interval;
return 0;
}
return -1;
}
static int process_ypdata(struct yp_server *server, ypdata_t *yp)
{
unsigned len = 1024;
char *s = NULL, *tmp;
if (now < yp->next_update)
return 0;
/* loop just in case the memory area isn't big enough */
while (1) {
int ret;
if ((tmp = realloc(s, len)) == NULL)
return 0;
s = tmp;
if (yp->release) {
yp->process = do_yp_remove;
yp->next_update = 0;
}
ret = yp->process(yp, s, len);
if (ret <= 0) {
free(s);
return ret;
}
len = ret;
}
return 0;
}
static void yp_process_server (struct yp_server *server)
{
ypdata_t *yp;
int state = 0;
/* ICECAST_LOG_DEBUG("processing yp server %s", server->url); */
yp = server->mounts;
while (yp) {
now = time(NULL);
/* if one of the streams shows that the server cannot be contacted then mark the
* other entries for an update later. Assume YP server is dead and skip it for now
*/
if (state == -2) {
ICECAST_LOG_DEBUG("skipping %s on %s", yp->mount, server->url);
yp->process = do_yp_add;
yp->next_update += 900;
}
else
state = process_ypdata(server, yp);
yp = yp->next;
}
}
static ypdata_t *create_yp_entry (struct yp_server *server, const char *mount)
{
ypdata_t *yp;
if (!server)
return NULL;
yp = calloc(1, sizeof (ypdata_t));
do {
listensocket_t *listen_socket = NULL;
unsigned len = 512;
ssize_t ret;
char *url;
mount_proxy *mountproxy = NULL;
ice_config_t *config;
if (yp == NULL)
break;
yp->mount = strdup(mount);
yp->server_name = strdup("");
yp->server_desc = strdup("");
yp->server_genre = strdup("");
yp->bitrate = strdup("");
yp->server_type = strdup("");
yp->cluster_password = strdup("");
yp->url = strdup("");
yp->current_song = strdup("");
yp->audio_info = strdup("");
yp->subtype = strdup("");
yp->audio_samplerate = strdup("");
yp->audio_channels = strdup("");
yp->process = do_yp_add;
url = malloc (len);
if (url == NULL)
break;
if (server->listen_socket_id) {
listen_socket = listensocket_container_get_by_id(global.listensockets,
server->listen_socket_id);
if (!listen_socket)
ICECAST_LOG_ERROR("Failure to find listen socket with ID %#H, using default.",
server->listen_socket_id);
} else {
listen_socket = listensocket_container_get_default(global.listensockets);
if (!listen_socket) {
ICECAST_LOG_WARN("There is no listen socket defined for this YP nor is there a default. Listen URLs might be wrong.");
}
}
ret = client_get_baseurl(NULL, listen_socket, url, len, NULL, NULL, NULL, mount, NULL);
if (ret >= len) {
// Buffer was too small, allocate a big enough one
char *s = realloc (url, ret + 1);
if (!s) {
refobject_unref(listen_socket);
free(url);
break;
}
url = s;
ret = client_get_baseurl(NULL, listen_socket, url, len, NULL, NULL, NULL, mount, NULL);
}
refobject_unref(listen_socket);
if (ret < 0 || ret >= len)
break;
config = config_get_config();
mountproxy = config_find_mount(config, mount, MOUNT_TYPE_NORMAL);
if (mountproxy && mountproxy->cluster_password)
add_yp_info(yp, mountproxy->cluster_password, YP_CLUSTER_PASSWORD);
config_release_config();
yp->listen_url = util_url_escape(url);
free(url);
if (yp->listen_url == NULL)
break;
return yp;
} while (0);
yp_destroy_ypdata(yp);
return NULL;
}
/* Check for changes in the YP servers configured */
static void check_servers (void)
{
struct yp_server *server = (struct yp_server *)active_yps,
**server_p = (struct yp_server **)&active_yps;
while (server) {
if (server->remove) {
struct yp_server *to_go = server;
ICECAST_LOG_DEBUG("YP server \"%s\" removed", server->url);
*server_p = server->next;
server = server->next;
destroy_yp_server(to_go);
continue;
}
server_p = &server->next;
server = server->next;
}
/* add new server entries */
while (pending_yps) {
avl_node *node;
server = (struct yp_server *)pending_yps;
pending_yps = server->next;
ICECAST_LOG_DEBUG("Add pending yps %s", server->url);
server->next = (struct yp_server *)active_yps;
active_yps = server;
/* new YP server configured, need to populate with existing sources */
avl_tree_rlock(global.source_tree);
node = avl_get_first(global.source_tree);
while (node) {
ypdata_t *yp;
source_t *source = node->key;
if (source->yp_public && (yp = create_yp_entry (server, source->mount)) != NULL) {
ICECAST_LOG_DEBUG("Adding existing mount %s", source->mount);
yp->touch_interval = server->touch_interval;
yp->next = server->mounts;
server->mounts = yp;
}
node = avl_get_next (node);
}
avl_tree_unlock(global.source_tree);
}
}
static void add_pending_yp (struct yp_server *server)
{
ypdata_t *current, *yp;
unsigned count = 0;
if (server->pending_mounts == NULL)
return;
current = server->mounts;
server->mounts = server->pending_mounts;
server->pending_mounts = NULL;
yp = server->mounts;
while (1) {
count++;
if (yp->next == NULL)
break;
yp = yp->next;
}
yp->next = current;
ICECAST_LOG_DEBUG("%u YP entries added to %s", count, server->url);
}
static void delete_marked_yp(struct yp_server *server)
{
ypdata_t *yp = server->mounts, **prev = &server->mounts;
while (yp) {
if (yp->remove) {
ypdata_t *to_go = yp;
ICECAST_LOG_DEBUG("removed %s from YP server %s", yp->mount, server->url);
*prev = yp->next;
yp = yp->next;
yp_destroy_ypdata(to_go);
continue;
}
prev = &yp->next;
yp = yp->next;
}
}
static void *yp_update_thread(void *arg)
{
ICECAST_LOG_INFO("YP update thread started");
int running;
yp_running = 1;
running = 1;
while (running) {
struct yp_server *server;
thread_sleep (200000);
/* do the YP communication */
thread_rwlock_rlock (&yp_lock);
server = (struct yp_server *)active_yps;
while (server) {
/* ICECAST_LOG_DEBUG("trying %s", server->url); */
yp_process_server (server);
server = server->next;
}
/* update the local YP structure */
if (yp_update) {
thread_rwlock_unlock(&yp_lock);
thread_rwlock_wlock (&yp_lock);
check_servers();
server = (struct yp_server *)active_yps;
while (server) {
/* ICECAST_LOG_DEBUG("Checking yps %s", server->url); */
add_pending_yp (server);
delete_marked_yp (server);
server = server->next;
}
yp_update = 0;
}
running = yp_running;
thread_rwlock_unlock(&yp_lock);
}
thread_rwlock_destroy (&yp_lock);
thread_mutex_destroy (&yp_pending_lock);
/* free server and ypdata left */
while (active_yps) {
struct yp_server *server = (struct yp_server *)active_yps;
active_yps = server->next;
destroy_yp_server (server);
}
return NULL;
}
static void yp_destroy_ypdata(ypdata_t *ypdata)
{
if (ypdata) {
free(ypdata->mount);
free(ypdata->url);
free(ypdata->sid);
free(ypdata->server_name);
free(ypdata->server_desc);
free(ypdata->server_genre);
free(ypdata->cluster_password);
free(ypdata->listen_url);
free(ypdata->current_song);
free(ypdata->bitrate);
free(ypdata->server_type);
free(ypdata->audio_info);
free(ypdata->subtype);
free(ypdata->audio_samplerate);
free(ypdata->audio_channels);
free(ypdata->error_msg);
free(ypdata);
}
}
static void update_yp_info(ypdata_t *yp)
{
char * val;
if ((val = stats_get_value(yp->mount, "subtype"))) {
add_yp_info(yp, val, YP_SUBTYPE);
free(val);
}
if ((val = stats_get_value(yp->mount, "audio_samplerate"))) {
add_yp_info(yp, val, YP_AUDIO_SAMPLERATE);
free(val);
}
if ((val = stats_get_value(yp->mount, "audio_channels"))) {
add_yp_info(yp, val, YP_AUDIO_CHANNELS);
free(val);
}
}
static void add_yp_info(ypdata_t *yp, void *info, yp_param_type_t type)
{
if (!info)
return;
switch (type) {
case YP_SERVER_NAME:
util_replace_string_url_escape(&(yp->server_name), info);
break;
case YP_SERVER_DESC:
util_replace_string_url_escape(&(yp->server_desc), info);
break;
case YP_SERVER_GENRE:
util_replace_string_url_escape(&(yp->server_genre), info);
break;
case YP_SERVER_URL:
util_replace_string_url_escape(&(yp->url), info);
break;
case YP_BITRATE:
util_replace_string_url_escape(&(yp->bitrate), info);
break;
case YP_AUDIO_INFO:
util_replace_string_url_escape(&(yp->audio_info), info);
break;
case YP_SERVER_TYPE:
util_replace_string_url_escape(&(yp->server_type), info);
break;
case YP_CURRENT_SONG:
util_replace_string_url_escape(&(yp->current_song), info);
break;
case YP_CLUSTER_PASSWORD:
util_replace_string_url_escape(&(yp->cluster_password), info);
break;
case YP_SUBTYPE:
util_replace_string_url_escape(&(yp->subtype), info);
break;
case YP_AUDIO_SAMPLERATE:
util_replace_string_url_escape(&(yp->audio_samplerate), info);
break;
case YP_AUDIO_CHANNELS:
util_replace_string_url_escape(&(yp->audio_channels), info);
break;
}
}
/* Add YP entries to active servers */
void yp_add (const char *mount)
{
struct yp_server *server;
/* make sure YP thread is not modifying the lists */
thread_rwlock_rlock(&yp_lock);
/* make sure we don't race against another yp_add */
thread_mutex_lock(&yp_pending_lock);
server = (struct yp_server *)active_yps;
while (server) {
ypdata_t *yp;
/* on-demand relays may already have a YP entry */
yp = find_yp_mount (server->mounts, mount);
if (yp == NULL) {
/* add new ypdata to each servers pending yp */
yp = create_yp_entry(server, mount);
if (yp) {
ICECAST_LOG_DEBUG("Adding %s to %s", mount, server->url);
yp->server = server;
yp->touch_interval = server->touch_interval;
yp->next = server->pending_mounts;
yp->next_update = time(NULL) + 60;
server->pending_mounts = yp;
yp_update = 1;
}
} else {
ICECAST_LOG_DEBUG("YP entry %s already exists", mount);
}
server = server->next;
}
thread_mutex_unlock(&yp_pending_lock);
thread_rwlock_unlock(&yp_lock);
}
/* Mark an existing entry in the YP list as to be marked for deletion */
void yp_remove (const char *mount)
{
struct yp_server *server = (struct yp_server *)active_yps;
thread_rwlock_rlock(&yp_lock);
while (server) {
ypdata_t *list = server->mounts;
while (1) {
ypdata_t *yp = find_yp_mount (list, mount);
if (yp == NULL)
break;
if (yp->release || yp->remove) {
list = yp->next;
continue; /* search again these are old entries */
}
ICECAST_LOG_DEBUG("release %s on YP %s", mount, server->url);
yp->release = 1;
yp->next_update = 0;
}
server = server->next;
}
thread_rwlock_unlock(&yp_lock);
}
/* This is similar to yp_remove, but we force a touch
* attempt */
void yp_touch (const char *mount)
{
struct yp_server *server = (struct yp_server *)active_yps;
ypdata_t *search_list = NULL;
thread_rwlock_rlock(&yp_lock);
if (server)
search_list = server->mounts;
while (server) {
ypdata_t *yp = find_yp_mount(search_list, mount);
if (yp) {
/* we may of found old entries not purged yet, so skip them */
if (yp->release != 0 || yp->remove != 0)
{
search_list = yp->next;
continue;
}
/* don't update the directory if there is a touch scheduled soon */
if (yp->process == do_yp_touch && now + yp->touch_interval - yp->next_update > 60)
yp->next_update = now + 3;
}
server = server->next;
if (server)
search_list = server->mounts;
}
thread_rwlock_unlock(&yp_lock);
}
void yp_shutdown (void)
{
thread_rwlock_wlock(&yp_lock);
yp_running = 0;
yp_update = 1;
thread_rwlock_unlock(&yp_lock);
if (yp_thread)
thread_join (yp_thread);
free((char*)server_version);
server_version = NULL;
ICECAST_LOG_INFO("YP thread down");
}