1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-09-22 04:15:54 -04:00

No functional change. removed unused code, Use bitmask fields for

refbuf and client structures. add ogg header pages via the associated
pointers for simpler handling.

svn path=/icecast/branches/kh/icecast/; revision=16208
This commit is contained in:
Karl Heyes 2009-07-05 18:45:30 +00:00
parent 53bae7ac43
commit 500f3a761b
23 changed files with 115 additions and 153 deletions

View File

@ -184,7 +184,7 @@ xmlDocPtr admin_build_sourcelist (const char *mount)
}
config_release_config();
if (source->running)
if (source_running (source))
{
if (source->client)
{
@ -290,7 +290,7 @@ void admin_mount_request (client_t *client, const char *uri)
}
else
{
if (source->running == 0 && source->on_demand == 0)
if (source_available (source) == 0)
{
avl_tree_unlock (global.source_tree);
INFO1("Received admin command on unavailable mount \"%s\"", mount);
@ -335,19 +335,19 @@ int admin_handle_request (client_t *client, const char *uri)
uri += 7;
if (connection_check_admin_pass (client->parser))
client->authenticated = 1;
client->flags |= CLIENT_AUTHENTICATED;
/* special case for slaves requesting a streamlist for authenticated relaying */
if (strcmp (uri, "streams") == 0)
{
client->is_slave = 1;
client->flags |= CLIENT_IS_SLAVE;
auth_add_listener ("/admin/streams", client);
return 0;
}
if (strcmp (uri, "streamlist.txt") == 0)
{
if (connection_check_relay_pass (client->parser))
client->authenticated = 1;
client->flags |= CLIENT_AUTHENTICATED;
}
if (mount)
@ -365,7 +365,7 @@ int admin_handle_request (client_t *client, const char *uri)
}
/* This is a mount request, but admin user is allowed */
if (client->authenticated == 0)
if ((client->flags & CLIENT_AUTHENTICATED) == 0)
{
switch (auth_check_source (client, mount))
{
@ -394,7 +394,7 @@ static void admin_handle_general_request(client_t *client, const char *uri)
{
struct admin_command *cmd;
if (client->authenticated == 0)
if ((client->flags & CLIENT_AUTHENTICATED) == 0)
{
INFO1("Bad or missing password on admin command request (%s)", uri);
client_send_401 (client, NULL);
@ -610,7 +610,7 @@ static void command_manage_relay (client_t *client, int response)
msg = "relay has been changed";
if (relay->enable == 0)
{
if (relay->source && relay->source->running == 0)
if (relay->source && source_running (relay->source) == 0)
relay->source->on_demand = 0;
}
slave_update_all_mounts();

View File

@ -34,6 +34,7 @@
#include "httpp/httpp.h"
#include "fserve.h"
#include "admin.h"
#include "global.h"
#include "logging.h"
#define CATMODULE "auth"
@ -254,7 +255,7 @@ static void auth_remove_listener (auth_client *auth_user)
auth_release (auth_user->auth);
auth_user->auth = NULL;
/* client is going, so auth is not an issue at this point */
auth_user->client->authenticated = 0;
auth_user->client->flags &= ~CLIENT_AUTHENTICATED;
client_send_404 (auth_user->client, "Failed relay");
auth_user->client = NULL;
}
@ -270,7 +271,7 @@ static void stream_auth_callback (auth_client *auth_user)
if (auth_user->auth->stream_auth)
auth_user->auth->stream_auth (auth_user);
if (client->authenticated)
if (client->flags & CLIENT_AUTHENTICATED)
auth_postprocess_source (auth_user);
else
WARN1 ("Failed auth for source \"%s\"", auth_user->mount);
@ -361,7 +362,7 @@ static int add_authenticated_listener (const char *mount, mount_proxy *mountinfo
{
int ret = 0;
client->authenticated = 1;
client->flags |= CLIENT_AUTHENTICATED;
/* check whether we are processing a streamlist request for slaves */
if (strcmp (mount, "/admin/streams") == 0)
@ -421,9 +422,9 @@ static int auth_postprocess_listener (auth_client *auth_user)
if (client == NULL)
return -1;
if (client->authenticated == 0)
if ((client->flags & CLIENT_AUTHENTICATED) == 0)
{
/* auth failed so check to placing listeners elsewhere */
/* auth failed so do we place the listener elsewhere */
if (auth_user->rejected_mount)
mount = auth_user->rejected_mount;
else if (auth->rejected_mount)
@ -477,8 +478,7 @@ void auth_add_listener (const char *mount, client_t *client)
if (connection_check_relay_pass (client->parser))
{
client->is_slave = 1;
client->authenticated = 1;
client->flags |= (CLIENT_IS_SLAVE|CLIENT_AUTHENTICATED);
INFO0 ("client connected as slave");
}
config = config_get_config();
@ -489,7 +489,7 @@ void auth_add_listener (const char *mount, client_t *client)
client_send_403 (client, "mountpoint unavailable");
return;
}
if (client->authenticated == 0 && mountinfo && mountinfo->auth && mountinfo->auth->authenticate)
if ((client->flags & CLIENT_AUTHENTICATED) == 0 && mountinfo && mountinfo->auth && mountinfo->auth->authenticate)
{
auth_client *auth_user;
@ -518,7 +518,7 @@ void auth_add_listener (const char *mount, client_t *client)
*/
int auth_release_listener (client_t *client, const char *mount, mount_proxy *mountinfo)
{
if (client->authenticated)
if (client->flags & CLIENT_AUTHENTICATED)
{
/* drop any queue reference here, we do not want a race between the source thread
* and the auth/fserve thread */
@ -531,7 +531,7 @@ int auth_release_listener (client_t *client, const char *mount, mount_proxy *mou
queue_auth_client (auth_user, mountinfo);
return 1;
}
client->authenticated = 0;
client->flags &= ~CLIENT_AUTHENTICATED;
}
client_send_404 (client, NULL);
return 0;

View File

@ -215,7 +215,7 @@ static auth_result htpasswd_auth (auth_client *auth_user)
if (strcmp (found->pass, hashed_pw) == 0)
{
free (hashed_pw);
client->authenticated = 1;
client->flags |= CLIENT_AUTHENTICATED;
return AUTH_OK;
}
free (hashed_pw);

View File

@ -169,7 +169,7 @@ static int handle_returned_header (void *ptr, size_t size, size_t nmemb, void *s
}
}
if (strncasecmp (ptr, url->auth_header, url->auth_header_len) == 0)
client->authenticated = 1;
client->flags |= CLIENT_AUTHENTICATED;
if (strncasecmp (ptr, url->timelimit_header, url->timelimit_header_len) == 0)
{
unsigned int limit = 0;
@ -177,7 +177,7 @@ static int handle_returned_header (void *ptr, size_t size, size_t nmemb, void *s
client->con->discon_time = time(NULL) + limit;
}
if (strncasecmp (ptr, "icecast-slave: 1", 16) == 0)
client->is_slave =1;
client->flags |= CLIENT_IS_SLAVE;
if (strncasecmp (ptr, "icecast-auth-message: ", 22) == 0)
{
@ -394,7 +394,7 @@ static auth_result url_add_listener (auth_client *auth_user)
return AUTH_FAILED;
}
/* we received a response, lets see what it is */
if (client->authenticated)
if (client->flags & CLIENT_AUTHENTICATED)
return AUTH_OK;
if (atoi (atd->errormsg) == 403)
{
@ -528,7 +528,7 @@ static void url_stream_auth (auth_client *auth_user)
free (mount);
free (host);
client->authenticated = 0;
client->flags &= ~CLIENT_AUTHENTICATED;
if (curl_easy_perform (atd->curl))
WARN2 ("auth to server %s failed with %s", url->stream_auth, atd->errormsg);
}

View File

@ -28,6 +28,7 @@
#include "refbuf.h"
#include "client.h"
#include "logging.h"
#include "global.h"
#define CATMODULE "cfgfile"
#define CONFIG_DEFAULT_LOCATION "Earth"
@ -36,7 +37,6 @@
#define CONFIG_DEFAULT_SOURCE_LIMIT 16
#define CONFIG_DEFAULT_QUEUE_SIZE_LIMIT (500*1024)
#define CONFIG_DEFAULT_BURST_SIZE (64*1024)
#define CONFIG_DEFAULT_THREADPOOL_SIZE 4
#define CONFIG_DEFAULT_CLIENT_TIMEOUT 30
#define CONFIG_DEFAULT_HEADER_TIMEOUT 15
#define CONFIG_DEFAULT_SOURCE_TIMEOUT 10
@ -469,7 +469,6 @@ static void _set_defaults(ice_config_t *configuration)
configuration->client_limit = CONFIG_DEFAULT_CLIENT_LIMIT;
configuration->source_limit = CONFIG_DEFAULT_SOURCE_LIMIT;
configuration->queue_size_limit = CONFIG_DEFAULT_QUEUE_SIZE_LIMIT;
configuration->threadpool_size = CONFIG_DEFAULT_THREADPOOL_SIZE;
configuration->client_timeout = CONFIG_DEFAULT_CLIENT_TIMEOUT;
configuration->header_timeout = CONFIG_DEFAULT_HEADER_TIMEOUT;
configuration->source_timeout = CONFIG_DEFAULT_SOURCE_TIMEOUT;

View File

@ -28,7 +28,6 @@ typedef struct _listener_t listener_t;
#include "avl/avl.h"
#include "auth.h"
#include "global.h"
typedef struct ice_config_dir_tag
{
@ -205,7 +204,6 @@ typedef struct ice_config_tag
int client_limit;
int source_limit;
unsigned int queue_size_limit;
int threadpool_size;
unsigned int burst_size;
int client_timeout;
int header_timeout;

View File

@ -37,6 +37,7 @@
#include "client.h"
#include "logging.h"
#include "slave.h"
#include "global.h"
#undef CATMODULE
#define CATMODULE "client"
@ -89,7 +90,7 @@ void client_destroy(client_t *client)
client->refbuf = NULL;
}
if (client->authenticated)
if (client->flags & CLIENT_AUTHENTICATED)
DEBUG1 ("client still in auth \"%s\"", httpp_getvar (client->parser, HTTPP_VAR_URI));
/* write log entry if ip is set (some things don't set it, like outgoing
@ -221,7 +222,7 @@ void client_send_404(client_t *client, const char *message)
if (message == NULL)
message = "Not Available";
if (client->refbuf == NULL)
client->refbuf = refbuf_new (4096);
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
"HTTP/1.0 404 Not Available\r\n"
"Content-Type: text/html\r\n\r\n"

View File

@ -18,7 +18,6 @@
#ifndef __CLIENT_H__
#define __CLIENT_H__
struct source_tag;
typedef struct _client_tag client_t;
#include "cfgfile.h"
@ -28,6 +27,12 @@ typedef struct _client_tag client_t;
struct _client_tag
{
/* various states the client could be in */
unsigned int flags;
/* position in first buffer */
unsigned int pos;
/* the client's connection */
connection_t *con;
/* the client's http headers */
@ -36,26 +41,17 @@ struct _client_tag
/* reference to incoming connection details */
listener_t *server_conn;
/* http response code for this client */
int respcode;
/* auth completed, 0 not yet, 1 passed */
int authenticated;
/* is client getting intro data */
long intro_offset;
/* where in the queue the client is */
refbuf_t *refbuf;
/* position in first buffer */
unsigned int pos;
/* byte count in queue */
unsigned int lag;
/* client is a slave server */
int is_slave;
/* http response code for this client */
int respcode;
/* Client username, if authenticated */
char *username;
@ -63,12 +59,6 @@ struct _client_tag
/* Client password, if authenticated */
char *password;
#ifdef HAVE_AIO
/* for handling async IO */
struct aiocb aio;
int pending_io;
#endif
/* Format-handler-specific data for this client */
void *format_data;
@ -81,7 +71,7 @@ struct _client_tag
/* function to check if refbuf needs updating */
int (*check_buffer)(struct source_tag *source, struct _client_tag *client);
struct _client_tag *next;
client_t *next;
};
client_t *client_create (connection_t *con, http_parser_t *parser);
@ -98,4 +88,9 @@ int client_send_bytes (client_t *client, const void *buf, unsigned len);
int client_read_bytes (client_t *client, void *buf, unsigned len);
void client_set_queue (client_t *client, refbuf_t *refbuf);
/* client flags bitmask */
#define CLIENT_AUTHENTICATED (002)
#define CLIENT_IS_SLAVE (004)
#define CLIENT_FORMAT_BIT (01000)
#endif /* __CLIENT_H__ */

View File

@ -113,7 +113,6 @@ cache_file_contents banned_ip, allowed_ip;
cache_file_contents useragents;
int connection_running = 0;
rwlock_t _source_shutdown_rwlock;
static void _handle_connection(void);
@ -152,8 +151,6 @@ void connection_initialize(void)
{
thread_spin_create (&_connection_lock);
thread_mutex_create(&move_clients_mutex);
thread_rwlock_create(&_source_shutdown_rwlock);
thread_cond_create(&global.shutdown_cond);
_req_queue = NULL;
_req_queue_tail = &_req_queue;
_con_queue = NULL;
@ -179,10 +176,6 @@ void connection_shutdown(void)
if (allowed_ip.contents) avl_tree_free (allowed_ip.contents, free_filtered_line);
if (useragents.contents) avl_tree_free (useragents.contents, free_filtered_line);
thread_cond_destroy(&global.shutdown_cond);
thread_rwlock_wlock(&_source_shutdown_rwlock);
thread_rwlock_unlock(&_source_shutdown_rwlock);
thread_rwlock_destroy(&_source_shutdown_rwlock);
thread_spin_destroy (&_connection_lock);
thread_mutex_destroy(&move_clients_mutex);
}
@ -860,13 +853,13 @@ int connection_complete_source (source_t *source, int response)
source->running = 1;
mountinfo = config_find_mount (config, source->mount);
thread_mutex_lock (&source->lock);
source_update_settings (config, source, mountinfo);
INFO1 ("source %s is ready to start", source->mount);
thread_mutex_unlock (&source->lock);
config_release_config();
slave_rebuild_mounts();
source->shutdown_rwlock = &_source_shutdown_rwlock;
DEBUG0 ("source is ready to start");
return 0;
}
WARN1("Request to add source when maximum source limit "
@ -1059,7 +1052,7 @@ static void _handle_stats_request (client_t *client, char *uri)
return;
}
client->authenticated = 1;
client->flags |= CLIENT_AUTHENTICATED;
stats_add_listener (client, STATS_ALL);
}

View File

@ -70,7 +70,6 @@ int connection_check_pass (http_parser_t *parser, const char *user, const char *
int connection_check_relay_pass(http_parser_t *parser);
int connection_check_admin_pass(http_parser_t *parser);
extern rwlock_t _source_shutdown_rwlock;
extern int connection_running;
#endif /* __CONNECTION_H__ */

View File

@ -60,13 +60,16 @@ static void format_mp3_apply_settings(client_t *client, format_plugin_t *format,
typedef struct {
unsigned int interval;
int metadata_offset;
unsigned int since_meta_block;
int in_metadata;
refbuf_t *associated;
unsigned short interval;
short metadata_offset;
unsigned short since_meta_block;
} mp3_client_data;
/* client format flags */
#define CLIENT_IN_METADATA CLIENT_FORMAT_BIT
int format_mp3_get_plugin (source_t *source)
{
const char *metadata;
@ -367,7 +370,7 @@ static int send_stream_metadata (client_t *client, refbuf_t *refbuf, unsigned in
if (ret < (int)remaining + meta_len)
{
client_mp3->metadata_offset += (ret - remaining);
client_mp3->in_metadata = 1;
client->flags |= CLIENT_IN_METADATA;
}
else
client_mp3->associated = associated;
@ -381,9 +384,8 @@ static int send_stream_metadata (client_t *client, refbuf_t *refbuf, unsigned in
client_mp3->since_meta_block += ret;
client->pos += ret;
client->lag -= ret;
return ret;
}
return 0;
return ret > 0 ? ret : 0;
}
ret = client_send_bytes (client, metadata, meta_len);
@ -391,17 +393,15 @@ static int send_stream_metadata (client_t *client, refbuf_t *refbuf, unsigned in
{
client_mp3->associated = associated;
client_mp3->metadata_offset = 0;
client_mp3->in_metadata = 0;
client->flags &= ~CLIENT_IN_METADATA;
client_mp3->since_meta_block = 0;
return ret;
}
if (ret > 0)
client_mp3->metadata_offset += ret;
else
ret = 0;
client_mp3->in_metadata = 1;
client->flags |= CLIENT_IN_METADATA;
return ret;
return ret > 0 ? ret : 0;
}
@ -419,11 +419,11 @@ static int format_mp3_write_buf_to_client (client_t *client)
do
{
/* send any unwritten metadata to the client */
if (client_mp3->in_metadata)
if (client->flags & CLIENT_IN_METADATA)
{
ret = send_stream_metadata (client, refbuf, 0);
if (client_mp3->in_metadata)
if (client->flags & CLIENT_IN_METADATA)
break;
written += ret;
}
@ -437,7 +437,7 @@ static int format_mp3_write_buf_to_client (client_t *client)
if (remaining <= len)
{
ret = send_stream_metadata (client, refbuf, remaining);
if (client_mp3->in_metadata)
if (client->flags & CLIENT_IN_METADATA)
break;
written += ret;
buf += remaining;
@ -509,14 +509,7 @@ static int complete_read (source_t *source)
{
bytes = client_read_bytes (source->client, buf, source_mp3->queue_block_size-source_mp3->read_count);
if (bytes < 0)
{
if (source->client->con->error)
{
refbuf_release (source_mp3->read_data);
source_mp3->read_data = NULL;
}
return 0;
}
rate_add (format->in_bitrate, bytes, time(NULL));
}
source_mp3->read_count += bytes;
@ -525,14 +518,7 @@ static int complete_read (source_t *source)
format->read_bytes += bytes;
if (source_mp3->read_count < source_mp3->queue_block_size)
{
if (source_mp3->read_count == 0)
{
refbuf_release (source_mp3->read_data);
source_mp3->read_data = NULL;
}
return 0;
}
return 1;
}
@ -556,7 +542,7 @@ static refbuf_t *mp3_get_no_meta (source_t *source)
}
refbuf->associated = source_mp3->metadata;
refbuf_addref (source_mp3->metadata);
refbuf->sync_point = 1;
refbuf->flags |= SOURCE_BLOCK_SYNC;
return refbuf;
}
@ -681,7 +667,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source)
}
refbuf->associated = source_mp3->metadata;
refbuf_addref (source_mp3->metadata);
refbuf->sync_point = 1;
refbuf->flags |= SOURCE_BLOCK_SYNC;
return refbuf;
}
@ -748,6 +734,9 @@ static int format_mp3_create_client_data(source_t *source, client_t *client)
static void free_mp3_client_data (client_t *client)
{
mp3_client_data *client_mp3 = client->format_data;
refbuf_release (client_mp3->associated);
free (client->format_data);
client->format_data = NULL;
}

View File

@ -103,14 +103,14 @@ void format_ogg_attach_header (ogg_codec_t *codec, ogg_page *page)
DEBUG0 ("attaching BOS page");
if (*ogg_info->bos_end == NULL)
ogg_info->header_pages_tail = refbuf;
refbuf->next = *ogg_info->bos_end;
refbuf->associated = *ogg_info->bos_end;
*ogg_info->bos_end = refbuf;
ogg_info->bos_end = &refbuf->next;
ogg_info->bos_end = &refbuf->associated;
return;
}
DEBUG0 ("attaching header page");
if (ogg_info->header_pages_tail)
ogg_info->header_pages_tail->next = refbuf;
ogg_info->header_pages_tail->associated = refbuf;
ogg_info->header_pages_tail = refbuf;
if (ogg_info->header_pages == NULL)
@ -120,17 +120,9 @@ void format_ogg_attach_header (ogg_codec_t *codec, ogg_page *page)
void format_ogg_free_headers (ogg_state_t *ogg_info)
{
refbuf_t *header;
/* release the header pages first */
DEBUG0 ("releasing header pages");
header = ogg_info->header_pages;
while (header)
{
refbuf_t *to_release = header;
header = header->next;
refbuf_release (to_release);
}
refbuf_release (ogg_info->header_pages);
ogg_info->header_pages = NULL;
ogg_info->header_pages_tail = NULL;
ogg_info->bos_end = &ogg_info->header_pages;
@ -371,14 +363,9 @@ static void update_comments (source_t *source)
static refbuf_t *complete_buffer (source_t *source, refbuf_t *refbuf)
{
ogg_state_t *ogg_info = source->format->_state;
refbuf_t *header = ogg_info->header_pages;
while (header)
{
refbuf_addref (header);
header = header->next;
}
refbuf->associated = ogg_info->header_pages;
refbuf_addref (refbuf->associated);
if (ogg_info->log_metadata)
{
@ -388,7 +375,7 @@ static refbuf_t *complete_buffer (source_t *source, refbuf_t *refbuf)
/* listeners can start anywhere unless the codecs themselves are
* marking starting points */
if (ogg_info->codec_sync == NULL)
refbuf->sync_point = 1;
refbuf->flags |= SOURCE_BLOCK_SYNC;
return refbuf;
}
@ -539,7 +526,7 @@ static int send_ogg_headers (client_t *client, refbuf_t *headers)
client_data->pos += ret;
if (client_data->pos == refbuf->len)
{
refbuf = refbuf->next;
refbuf = refbuf->associated;
client_data->header_page = refbuf;
client_data->pos = 0;
}

View File

@ -21,8 +21,7 @@
#include <ogg/ogg.h>
#include <theora/theora.h>
typedef struct source_tag source_t;
#include "source.h"
#include "refbuf.h"
#include "format_ogg.h"
#include "format_theora.h"
@ -423,7 +422,7 @@ static refbuf_t *process_theora_page (ogg_state_t *ogg_info, ogg_codec_t *codec,
theora->prev_granulepos = granulepos;
if (has_keyframe && codec->possible_start)
{
codec->possible_start->sync_point = 1;
codec->possible_start->flags |= SOURCE_BLOCK_SYNC;
refbuf_release (codec->possible_start);
codec->possible_start = NULL;
}

View File

@ -392,7 +392,7 @@ static void fserve_client_destroy(fserve_t *fclient)
ice_config_t *config = config_get_config ();
mount_proxy *mountinfo = config_find_mount (config, fclient->mount);
fclient->client->authenticated = 0;
fclient->client->flags &= ~CLIENT_AUTHENTICATED;
auth_release_listener (fclient->client, fclient->mount, mountinfo);
config_release_config();
}

View File

@ -44,6 +44,7 @@ void global_initialize(void)
global.source_tree = avl_tree_new(source_compare_sources, NULL);
thread_mutex_create(&_global_mutex);
thread_spin_create (&global.spinlock);
thread_rwlock_create (&global.shutdown_lock);
global.out_bitrate = rate_setup (151, 1000);
}
@ -51,6 +52,7 @@ void global_shutdown(void)
{
thread_mutex_destroy(&_global_mutex);
thread_spin_destroy (&global.spinlock);
thread_rwlock_destroy (&global.shutdown_lock);
avl_tree_free(global.source_tree, NULL);
rate_free (global.out_bitrate);
global.out_bitrate = NULL;
@ -76,7 +78,7 @@ void global_add_bitrates (struct rate_calc *rate, unsigned long value)
void global_reduce_bitrate_sampling (struct rate_calc *rate)
{
thread_spin_lock (&global.spinlock);
rate_reduce (rate, 5);
rate_reduce (rate, 0);
thread_spin_unlock (&global.spinlock);
}

View File

@ -38,6 +38,8 @@ typedef struct ice_global_tag
int schedule_config_reread;
avl_tree *source_tree;
rwlock_t shutdown_lock;
/* for locally defined relays */
struct _relay_server *relays;
/* relays retrieved from master */

View File

@ -53,7 +53,6 @@ refbuf_t *refbuf_new (unsigned int size)
abort();
}
refbuf->len = size;
refbuf->sync_point = 0;
refbuf->_count = 1;
refbuf->next = NULL;
refbuf->associated = NULL;
@ -72,11 +71,12 @@ static void refbuf_release_associated (refbuf_t *ref)
{
if (ref == NULL)
return;
while (ref && ref->_count == 1)
while (ref)
{
refbuf_t *to_go = ref;
ref = to_go->next;
to_go->next = NULL;
if (to_go->_count == 1)
to_go->next = NULL;
refbuf_release (to_go);
}
}

View File

@ -27,7 +27,7 @@ typedef struct _refbuf_tag
char *data;
struct _refbuf_tag *associated;
struct _refbuf_tag *next;
int sync_point;
unsigned int flags;
} refbuf_t;

View File

@ -1054,7 +1054,6 @@ static void *_slave_thread(void *arg)
global . schedule_config_reread = 0;
}
thread_sleep (1000000);
global_add_bitrates (global.out_bitrate, 0L);
if (global.running != ICE_RUNNING)
@ -1101,6 +1100,7 @@ static void *_slave_thread(void *arg)
/* trigger any YP processing */
yp_thread_startup();
stats_global_calc();
thread_sleep (1000000);
}
connection_thread_shutdown();
INFO0 ("shutting down current relays");
@ -1112,6 +1112,8 @@ static void *_slave_thread(void *arg)
/* send any removals to the YP servers */
yp_thread_startup();
thread_rwlock_wlock (&global.shutdown_lock);
thread_rwlock_unlock (&global.shutdown_lock);
INFO0 ("Slave thread shutdown complete");
return NULL;

View File

@ -207,7 +207,7 @@ void source_clear_source (source_t *source)
/* log bytes read in access log */
if (source->client)
{
source->client->authenticated = 0;
source->client->flags &= ~CLIENT_AUTHENTICATED;
if (source->format)
source->client->con->sent_bytes = source->format->read_bytes;
}
@ -393,7 +393,7 @@ void source_move_clients (source_t *source, source_t *dest)
source->active_clients = client->next;
/* don't move known slave relays to streams which are not timed (fallback file) */
if (dest->client == NULL && client->is_slave)
if (dest->client == NULL && (client->flags & CLIENT_IS_SLAVE))
{
client->next = leave_list;
leave_list = client;
@ -648,8 +648,7 @@ static int locate_start_on_queue (source_t *source, client_t *client)
if (source->stream_data_tail == NULL)
return -1;
refbuf = source->stream_data_tail;
DEBUG0 ("in here");
if (client->intro_offset == -1 && refbuf->sync_point)
if (client->intro_offset == -1 && (refbuf->flags & SOURCE_BLOCK_SYNC))
{
refbuf = source->stream_data_tail;
lag = refbuf->len;
@ -671,7 +670,7 @@ static int locate_start_on_queue (source_t *source, client_t *client)
while (refbuf)
{
if (refbuf->sync_point)
if (refbuf->flags & SOURCE_BLOCK_SYNC)
{
client_set_queue (client, refbuf);
client->check_buffer = format_advance_queue;
@ -731,8 +730,6 @@ static int http_source_listener (source_t *source, client_t *client)
if (client->respcode == 0)
{
DEBUG0("processing pending listener headers");
if (format_prepare_headers (source, client) < 0)
{
ERROR0 ("internal problem, dropping client");
@ -769,7 +766,6 @@ static int send_to_listener (source_t *source, client_t *client, int deletion_ex
int bytes;
int loop = 8; /* max number of iterations in one go */
long total_written = 0;
int ret = 0;
/* check for limited listener time */
if (client->con->discon_time && time(NULL) >= client->con->discon_time)
@ -782,27 +778,20 @@ static int send_to_listener (source_t *source, client_t *client, int deletion_ex
if (source->amount_added_to_queue)
client->lag += source->amount_added_to_queue;
while (1)
while (loop)
{
/* jump out if client connection has died */
if (client->con->error)
break;
if (loop == 0)
{
ret = 0;
break;
}
/* lets not send too much to one client in one go, but don't
sleep for too long if more data can be sent */
if (total_written > source->listener_send_trigger)
{
ret = 1;
loop = 0;
break;
}
loop--;
if (client->check_buffer (source, client) < 0)
break;
@ -811,11 +800,14 @@ static int send_to_listener (source_t *source, client_t *client, int deletion_ex
break; /* can't write any more */
total_written += bytes;
loop--;
}
if (total_written)
{
rate_add (source->format->out_bitrate, total_written, timing_get_time());
global_add_bitrates (global.out_bitrate, total_written);
source->bytes_sent_since_update += total_written;
}
rate_add (source->format->out_bitrate, total_written, timing_get_time());
source->bytes_sent_since_update += total_written;
global_add_bitrates (global.out_bitrate, total_written);
/* the refbuf referenced at head (last in queue) may be marked for deletion
* if so, check to see if this client is still referring to it */
@ -824,9 +816,10 @@ static int send_to_listener (source_t *source, client_t *client, int deletion_ex
INFO2 ("Client %lu (%s) has fallen too far behind, removing",
client->con->id, client->con->ip);
stats_event_inc (source->mount, "slow_listeners");
client_set_queue (client, NULL);
client->con->error = 1;
}
return ret;
return loop ? 0 : 1;
}
@ -937,7 +930,7 @@ static void source_init (source_t *source)
}
/* grab a read lock, to make sure we get a chance to cleanup */
thread_rwlock_rlock (source->shutdown_rwlock);
thread_rwlock_rlock (&global.shutdown_lock);
/* start off the statistics */
stats_event_inc (NULL, "source_total_connections");
@ -1131,7 +1124,7 @@ static void source_shutdown (source_t *source)
global_unlock();
/* release our hold on the lock so the main thread can continue cleaning up */
thread_rwlock_unlock(source->shutdown_rwlock);
thread_rwlock_unlock (&global.shutdown_lock);
}
@ -1173,7 +1166,7 @@ static void _parse_audio_info (source_t *source, const char *s)
esc = util_url_unescape (value);
if (esc)
{
if (source->running)
if (source_running (source))
{
util_dict_set (source->audio_info, name, esc);
stats_event (source->mount, name, esc);
@ -1716,7 +1709,7 @@ static int check_duplicate_logins (source_t *source, client_t *client, auth_t *a
return 1;
/* allow multiple authenticated relays */
if (client->username == NULL || client->is_slave)
if (client->username == NULL || (client->flags & CLIENT_IS_SLAVE))
return 1;
existing = source->active_clients;
@ -1780,7 +1773,7 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *cl
} while (1);
/* ok, we found a source and it is locked */
if (client->is_slave)
if (client->flags & CLIENT_IS_SLAVE)
{
if (source->client == NULL && source->on_demand == 0)
{

View File

@ -37,7 +37,6 @@ typedef struct source_tag
client_t *active_clients;
client_t **fast_clients_p;
rwlock_t *shutdown_rwlock;
util_dict *audio_info;
/* name of a file, whose contents are sent at listener connection */
@ -87,6 +86,7 @@ typedef struct source_tag
} source_t;
#define source_available(x) ((x)->running || (x)->on_demand)
#define source_running(x) ((x)->running)
source_t *source_reserve (const char *mount);
void *source_client_thread (void *arg);
@ -107,6 +107,8 @@ int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *c
extern mutex_t move_clients_mutex;
#define SOURCE_BLOCK_SYNC 01
#endif

View File

@ -84,7 +84,6 @@ typedef struct _stats_source_tag
typedef struct _event_listener_tag
{
client_t *client;
int master;
int hidden_level;
char *source;
@ -197,7 +196,8 @@ void stats_event(const char *source, const char *name, const char *value)
if (value && xmlCheckUTF8 ((unsigned char *)value) == 0)
{
WARN2 ("seen non-UTF8 data, probably incorrect metadata (%s, %s)", name, value);
WARN3 ("seen non-UTF8 data (%s), probably incorrect metadata (%s, %s)",
source?source:"global", name, value);
return;
}
build_event (&event, source, name, (char *)value);
@ -435,6 +435,7 @@ static void modify_node_event (stats_node_t *node, stats_event_t *event)
free (node->value);
node->value = strdup (event->value);
}
DEBUG3 ("update \"%s\" %s (%s)", event->source?event->source:"global", node->name, node->value);
}
@ -459,7 +460,6 @@ static void process_global_event (stats_event_t *event)
{
modify_node_event (node, event);
stats_listener_send (node->hidden, "EVENT global %s %s\n", node->name, node->value);
DEBUG2 ("update node on global \"%s\" (%s)", node->name, node->value);
}
else
{
@ -717,7 +717,7 @@ static int _append_to_buffer (refbuf_t *refbuf, int max_len, const char *fmt, ..
static void _add_node_to_stats_client (event_listener_t *listener, refbuf_t *refbuf)
{
if (refbuf->len)
if (refbuf->len)
{
*listener->queue_recent_p = refbuf;
listener->queue_recent_p = &refbuf->next;

View File

@ -30,6 +30,7 @@
#include "source.h"
#include "cfgfile.h"
#include "stats.h"
#include "global.h"
#define CATMODULE "yp"