1
0
mirror of https://gitlab.xiph.org/xiph/icecast-server.git synced 2024-06-23 06:25:24 +00:00

merged singleq branch 7177:7591

svn path=/icecast/trunk/icecast/; revision=7592
This commit is contained in:
Karl Heyes 2004-08-20 15:13:59 +00:00
parent 490e64663c
commit 9c44a7d184
16 changed files with 987 additions and 760 deletions

View File

@ -799,11 +799,7 @@ static void command_metadata(client_t *client, source_t *source)
state = source->format->_state;
thread_mutex_lock(&(state->lock));
free(state->metadata);
state->metadata = strdup(value);
state->metadata_age++;
thread_mutex_unlock(&(state->lock));
mp3_set_tag (source->format, "title", value);
DEBUG2("Metadata on mountpoint %s changed to \"%s\"",
source->mount, value);

View File

@ -339,7 +339,8 @@ static void _set_defaults(ice_config_t *configuration)
configuration->num_yp_directories = 0;
configuration->relay_username = NULL;
configuration->relay_password = NULL;
configuration->burst_on_connect = 1;
/* default to a typical prebuffer size used by clients */
configuration->burst_size = 65536;
}
static void _parse_root(xmlDocPtr doc, xmlNodePtr node,
@ -463,7 +464,12 @@ static void _parse_limits(xmlDocPtr doc, xmlNodePtr node,
if (tmp) xmlFree(tmp);
} else if (strcmp(node->name, "burst-on-connect") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
configuration->burst_on_connect = atoi(tmp);
if (atoi(tmp) == 0)
configuration->burst_size = 0;
if (tmp) xmlFree(tmp);
} else if (strcmp(node->name, "burst-size") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
configuration->burst_size = atoi(tmp);
if (tmp) xmlFree(tmp);
}
} while ((node = node->next));
@ -489,7 +495,9 @@ static void _parse_mount(xmlDocPtr doc, xmlNodePtr node,
else
configuration->mounts = mount;
/* default <mount> settings */
mount->max_listeners = -1;
mount->burst_size = -1;
mount->next = NULL;
do {
@ -574,6 +582,10 @@ static void _parse_mount(xmlDocPtr doc, xmlNodePtr node,
mount->source_timeout = atoi (tmp);
xmlFree(tmp);
}
} else if (strcmp(node->name, "burst-size") == 0) {
tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1);
mount->burst_size = atoi(tmp);
if (tmp) xmlFree(tmp);
}
} while ((node = node->next));
}

View File

@ -54,8 +54,10 @@ typedef struct _mount_proxy {
clients from the fallback? */
int no_mount; /* Do we permit direct requests of this mountpoint? (or only
indirect, through fallbacks) */
unsigned queue_size_limit;
unsigned source_timeout; /* source timeout in seconds */
int burst_size; /* amount to send to a new client if possible, -1 take
* from global setting */
unsigned int queue_size_limit;
unsigned int source_timeout; /* source timeout in seconds */
char *auth_type; /* Authentication type */
config_options_t *auth_options; /* Options for this type */
@ -84,8 +86,9 @@ typedef struct ice_config_tag
int client_limit;
int source_limit;
unsigned queue_size_limit;
unsigned int queue_size_limit;
int threadpool_size;
unsigned int burst_size;
int client_timeout;
int header_timeout;
int source_timeout;
@ -133,7 +136,6 @@ typedef struct ice_config_tag
char *yp_url[MAX_YP_DIRECTORIES];
int yp_url_timeout[MAX_YP_DIRECTORIES];
int num_yp_directories;
int burst_on_connect;
} ice_config_t;
typedef struct {

View File

@ -42,17 +42,14 @@ client_t *client_create(connection_t *con, http_parser_t *parser)
client->con = con;
client->parser = parser;
client->queue = NULL;
client->refbuf = NULL;
client->pos = 0;
client->burst_sent = 0;
return client;
}
void client_destroy(client_t *client)
{
refbuf_t *refbuf;
if (client == NULL)
return;
/* write log entry if ip is set (some things don't set it, like outgoing
@ -64,9 +61,9 @@ void client_destroy(client_t *client)
connection_close(client->con);
httpp_destroy(client->parser);
while ((refbuf = refbuf_queue_remove(&client->queue)))
refbuf_release(refbuf);
/* drop ref counts if need be */
if (client->refbuf)
refbuf_release (client->refbuf);
/* we need to free client specific format data (if any) */
if (client->free_client_data)
client->free_client_data (client);
@ -144,3 +141,14 @@ int client_send_bytes (client_t *client, const void *buf, unsigned len)
return ret;
}
void client_set_queue (client_t *client, refbuf_t *refbuf)
{
refbuf_t *to_release = client->refbuf;
client->refbuf = refbuf;
refbuf_addref (client->refbuf);
client->pos = 0;
if (to_release)
refbuf_release (to_release);
}

View File

@ -32,8 +32,9 @@ typedef struct _client_tag
/* http response code for this client */
int respcode;
/* buffer queue */
refbuf_queue_t *queue;
/* where in the queue the client is */
refbuf_t *refbuf;
/* position in first buffer */
unsigned long pos;
@ -45,7 +46,6 @@ typedef struct _client_tag
/* function to call to release format specific resources */
void (*free_client_data)(struct _client_tag *client);
int burst_sent;
} client_t;
client_t *client_create(connection_t *con, http_parser_t *parser);
@ -56,5 +56,6 @@ void client_send_401(client_t *client);
void client_send_403(client_t *client);
void client_send_400(client_t *client, char *message);
int client_send_bytes (client_t *client, const void *buf, unsigned len);
void client_set_queue (client_t *client, refbuf_t *refbuf);
#endif /* __CLIENT_H__ */

View File

@ -465,9 +465,8 @@ int connection_complete_source (source_t *source)
"for icecast 1.x relays. Assuming content is mp3.");
format_type = FORMAT_TYPE_MP3;
}
source->format = format_get_plugin (format_type, source->mount, source->parser);
if (source->format == NULL)
if (format_get_plugin (format_type, source) < 0)
{
global_unlock();
config_release_config();
@ -483,6 +482,7 @@ int connection_complete_source (source_t *source)
/* set global settings first */
source->queue_size_limit = config->queue_size_limit;
source->timeout = config->source_timeout;
source->burst_size = config->burst_size;
/* for relays, we don't yet have a client, however we do require one
* to retrieve the stream from. This is created here, quite late,
@ -932,8 +932,7 @@ static void _handle_get_request(connection_t *con,
global.clients++;
global_unlock();
client->format_data = source->format->create_client_data(
source->format, source, client);
source->format->create_client_data (source, client);
source->format->client_send_headers(source->format, source, client);

View File

@ -75,32 +75,22 @@ char *format_get_mimetype(format_type_t type)
}
}
format_plugin_t *format_get_plugin(format_type_t type, char *mount,
http_parser_t *parser)
int format_get_plugin(format_type_t type, source_t *source)
{
format_plugin_t *plugin;
int ret = -1;
switch (type) {
case FORMAT_TYPE_VORBIS:
plugin = format_vorbis_get_plugin();
if (plugin) plugin->mount = mount;
ret = format_vorbis_get_plugin (source);
break;
case FORMAT_TYPE_MP3:
plugin = format_mp3_get_plugin(parser);
if (plugin) plugin->mount = mount;
ret = format_mp3_get_plugin (source);
break;
default:
plugin = NULL;
break;
}
return plugin;
}
int format_generic_write_buf_to_client(format_plugin_t *format,
client_t *client, unsigned char *buf, int len)
{
return client_send_bytes (client, buf, len);
return ret;
}
void format_send_general_headers(format_plugin_t *format,

View File

@ -45,13 +45,10 @@ typedef struct _format_plugin_tag
*/
int has_predata;
int (*get_buffer)(struct _format_plugin_tag *self, char *data, unsigned long
len, refbuf_t **buffer);
refbuf_queue_t *(*get_predata)(struct _format_plugin_tag *self);
int (*write_buf_to_client)(struct _format_plugin_tag *format,
client_t *client, unsigned char *buf, int len);
void *(*create_client_data)(struct _format_plugin_tag *format,
struct source_tag *source, client_t *client);
refbuf_t *(*get_buffer)(struct source_tag *);
int (*write_buf_to_client)(struct _format_plugin_tag *format, client_t *client);
void (*write_buf_to_file)(struct source_tag *source, refbuf_t *refbuf);
int (*create_client_data)(struct source_tag *source, client_t *client);
void (*client_send_headers)(struct _format_plugin_tag *format,
struct source_tag *source, client_t *client);
void (*free_plugin)(struct _format_plugin_tag *self);
@ -62,11 +59,8 @@ typedef struct _format_plugin_tag
format_type_t format_get_type(char *contenttype);
char *format_get_mimetype(format_type_t type);
format_plugin_t *format_get_plugin(format_type_t type, char *mount,
http_parser_t *parser);
int format_get_plugin(format_type_t type, struct source_tag *source);
int format_generic_write_buf_to_client(format_plugin_t *format,
client_t *client, unsigned char *buf, int len);
void format_send_general_headers(format_plugin_t *format,
struct source_tag *source, client_t *client);

View File

@ -54,38 +54,38 @@
#define ICY_METADATA_INTERVAL 16000
static void format_mp3_free_plugin(format_plugin_t *self);
static int format_mp3_get_buffer(format_plugin_t *self, char *data,
unsigned long len, refbuf_t **buffer);
static refbuf_queue_t *format_mp3_get_predata(format_plugin_t *self);
static void *format_mp3_create_client_data(format_plugin_t *self,
source_t *source, client_t *client);
static refbuf_t *mp3_get_filter_meta (source_t *source);
static refbuf_t *mp3_get_no_meta (source_t *source);
static int format_mp3_create_client_data (source_t *source, client_t *client);
static void free_mp3_client_data (client_t *client);
static int format_mp3_write_buf_to_client(format_plugin_t *self,
client_t *client, unsigned char *buf, int len);
static int format_mp3_write_buf_to_client(format_plugin_t *self, client_t *client);
static void format_mp3_send_headers(format_plugin_t *self,
source_t *source, client_t *client);
static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf);
typedef struct {
int use_metadata;
int interval;
int offset;
int metadata_age;
int metadata_offset;
unsigned int since_meta_block;
int in_metadata;
refbuf_t *associated;
} mp3_client_data;
format_plugin_t *format_mp3_get_plugin(http_parser_t *parser)
int format_mp3_get_plugin (source_t *source)
{
char *metadata;
format_plugin_t *plugin;
mp3_state *state = calloc(1, sizeof(mp3_state));
refbuf_t *meta;
plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t));
plugin->type = FORMAT_TYPE_MP3;
plugin->has_predata = 0;
plugin->get_buffer = format_mp3_get_buffer;
plugin->get_predata = format_mp3_get_predata;
plugin->get_buffer = mp3_get_no_meta;
plugin->write_buf_to_client = format_mp3_write_buf_to_client;
plugin->write_buf_to_file = write_mp3_to_file;
plugin->create_client_data = format_mp3_create_client_data;
plugin->client_send_headers = format_mp3_send_headers;
plugin->free_plugin = format_mp3_free_plugin;
@ -93,309 +93,485 @@ format_plugin_t *format_mp3_get_plugin(http_parser_t *parser)
plugin->_state = state;
state->metadata_age = 0;
state->metadata = strdup("");
thread_mutex_create(&(state->lock));
meta = refbuf_new (1);
memcpy (meta->data, "", 1);
state->metadata = meta;
state->interval = ICY_METADATA_INTERVAL;
metadata = httpp_getvar(parser, "icy-metaint");
if(metadata)
state->inline_metadata_interval = atoi(metadata);
return plugin;
}
static int send_metadata(client_t *client, mp3_client_data *client_state,
mp3_state *source_state)
{
int len_byte;
int len;
int ret = -1;
unsigned char *buf;
int source_age;
char *fullmetadata = NULL;
int fullmetadata_size = 0;
const char meta_fmt[] = "StreamTitle='';";
do
metadata = httpp_getvar (source->parser, "icy-metaint");
if (metadata)
{
thread_mutex_lock (&(source_state->lock));
if (source_state->metadata == NULL)
break; /* Shouldn't be possible */
state->inline_metadata_interval = atoi (metadata);
state->offset = 0;
plugin->get_buffer = mp3_get_filter_meta;
}
source->format = plugin;
thread_mutex_create (&state->url_lock);
fullmetadata_size = strlen (source_state->metadata) + sizeof (meta_fmt);
/* Noted without comment: When the metadata string is a multiple of
* 16 bytes, the "reference" shoutcast server increases the length
* byte by one and appends 16 nulls, rather than omitting the
* pointless trailing null. */
if (fullmetadata_size > 4079)
{
fullmetadata_size = 4079;
}
fullmetadata = malloc (fullmetadata_size);
if (fullmetadata == NULL)
break;
fullmetadata_size = snprintf (fullmetadata, fullmetadata_size,
"StreamTitle='%.*s';", fullmetadata_size-(sizeof (meta_fmt)-1), source_state->metadata);
source_age = source_state->metadata_age;
if (fullmetadata_size > 0 && source_age != client_state->metadata_age)
{
len_byte = (fullmetadata_size)/16 + 1; /* to give 1-255 */
client_state->metadata_offset = 0;
}
else
len_byte = 0;
len = 1 + len_byte*16;
buf = calloc (1, len);
if (buf == NULL)
break;
buf[0] = len_byte;
if (len > 1) {
strncpy (buf+1, fullmetadata, len-1);
buf[len-1] = '\0';
}
thread_mutex_unlock (&(source_state->lock));
/* only write what hasn't been written already */
ret = client_send_bytes (client, buf + client_state->metadata_offset,
len - client_state->metadata_offset);
if (ret > 0 && ret < len) {
client_state->metadata_offset += ret;
}
else if (ret == len) {
client_state->metadata_age = source_age;
client_state->offset = 0;
client_state->metadata_offset = 0;
}
free (buf);
free (fullmetadata);
return ret;
} while (0);
thread_mutex_unlock(&(source_state->lock));
free (fullmetadata);
return -1;
return 0;
}
static int format_mp3_write_buf_to_client(format_plugin_t *self,
client_t *client, unsigned char *buf, int len)
void mp3_set_tag (format_plugin_t *plugin, char *tag, char *value)
{
mp3_state *source_mp3 = plugin->_state;
unsigned int len;
const char meta[] = "StreamTitle='";
int size = sizeof (meta) + 1;
if (tag==NULL || value == NULL)
return;
len = strlen (value)+1;
size += len;
/* protect against multiple updaters */
thread_mutex_lock (&source_mp3->url_lock);
if (strcmp (tag, "title") == 0 || strcmp (tag, "song") == 0)
{
char *p = strdup (value);
if (p)
{
free (source_mp3->url_title);
free (source_mp3->url_artist);
source_mp3->url_artist = NULL;
source_mp3->url_title = p;
source_mp3->update_metadata = 1;
}
}
else if (strcmp (tag, "artist") == 0)
{
char *p = strdup (value);
if (p)
{
free (source_mp3->url_artist);
source_mp3->url_artist = p;
source_mp3->update_metadata = 1;
}
}
thread_mutex_unlock (&source_mp3->url_lock);
}
static void filter_shoutcast_metadata (source_t *source, char *metadata, unsigned int meta_len)
{
if (metadata)
{
char *end, *p;
int len;
do
{
metadata++;
if (strncmp (metadata, "StreamTitle='", 13))
break;
if ((end = strstr (metadata, "\';")) == NULL)
break;
len = (end - metadata) - 13;
p = calloc (1, len+1);
if (p)
{
memcpy (p, metadata+13, len);
stats_event (source->mount, "title", p);
yp_touch (source->mount);
free (p);
}
} while (0);
}
}
/* called from the source thread when the metadata has been updated.
* The artist title are checked and made ready for clients to send
*/
void mp3_set_title (source_t *source)
{
const char meta[] = "StreamTitle='";
int size;
unsigned char len_byte;
refbuf_t *p;
unsigned int len = sizeof(meta) + 2; /* the StreamTitle, quotes, ; and null */
mp3_state *source_mp3 = source->format->_state;
/* make sure the url data does not disappear from under us */
thread_mutex_lock (&source_mp3->url_lock);
/* work out message length */
if (source_mp3->url_artist)
len += strlen (source_mp3->url_artist);
if (source_mp3->url_title)
len += strlen (source_mp3->url_title);
if (source_mp3->url_artist && source_mp3->url_title)
len += 3;
#define MAX_META_LEN 255*16
if (len > MAX_META_LEN)
{
thread_mutex_unlock (&source_mp3->url_lock);
WARN1 ("Metadata too long at %d chars", len);
return;
}
/* work out the metadata len byte */
len_byte = (len-1) / 16 + 1;
/* now we know how much space to allocate, +1 for the len byte */
size = len_byte * 16 + 1;
p = refbuf_new (size);
if (p)
{
mp3_state *source_mp3 = source->format->_state;
memset (p->data, '\0', size);
if (source_mp3->url_artist && source_mp3->url_title)
snprintf (p->data, size, "%c%s%s - %s';", len_byte, meta,
source_mp3->url_artist, source_mp3->url_title);
else
snprintf (p->data, size, "%c%s%s';", len_byte, meta,
source_mp3->url_title);
filter_shoutcast_metadata (source, p->data, size);
refbuf_release (source_mp3->metadata);
source_mp3->metadata = p;
}
thread_mutex_unlock (&source_mp3->url_lock);
}
/* send the appropriate metadata, and return the number of bytes written
* which is 0 or greater. Check the client in_metadata value afterwards
* to see if all metadata has been sent
*/
static int send_mp3_metadata (client_t *client, refbuf_t *associated)
{
int ret = 0;
mp3_client_data *mp3data = client->format_data;
if(((mp3_state *)self->_state)->metadata && mp3data->use_metadata)
unsigned char *metadata;
int meta_len;
mp3_client_data *client_mp3 = client->format_data;
/* If there is a change in metadata then send it else
* send a single zero value byte in its place
*/
if (associated == client_mp3->associated)
{
mp3_client_data *state = client->format_data;
int max = state->interval - state->offset;
if(len == 0) /* Shouldn't happen */
return 0;
if(max > len)
max = len;
if(max > 0) {
ret = client_send_bytes (client, buf, max);
if(ret > 0)
state->offset += ret;
}
else {
send_metadata (client, state, self->_state);
}
metadata = "\0";
meta_len = 1;
}
else {
ret = client_send_bytes (client, buf, len);
else
{
metadata = associated->data + client_mp3->metadata_offset;
meta_len = associated->len - client_mp3->metadata_offset;
}
ret = client_send_bytes (client, metadata, meta_len);
if (ret == meta_len)
{
client_mp3->associated = associated;
client_mp3->metadata_offset = 0;
client_mp3->in_metadata = 0;
client_mp3->since_meta_block = 0;
return ret;
}
if (ret > 0)
client_mp3->metadata_offset += ret;
else
ret = 0;
client_mp3->in_metadata = 1;
return ret;
}
/* Handler for writing mp3 data to a client, taking into account whether
* client has requested shoutcast style metadata updates
*/
static int format_mp3_write_buf_to_client (format_plugin_t *self, client_t *client)
{
int ret, written = 0;
mp3_client_data *client_mp3 = client->format_data;
mp3_state *source_mp3 = self->_state;
refbuf_t *refbuf = client->refbuf;
char *buf;
unsigned int len;
if (refbuf->next == NULL && client->pos == refbuf->len)
return 0;
/* move to the next buffer if we have finished with the current one */
if (refbuf->next && client->pos == refbuf->len)
{
client_set_queue (client, refbuf->next);
refbuf = client->refbuf;
}
buf = refbuf->data + client->pos;
len = refbuf->len - client->pos;
do
{
/* send any unwritten metadata to the client */
if (client_mp3->in_metadata)
{
refbuf_t *associated = refbuf->associated;
ret = send_mp3_metadata (client, associated);
if (client_mp3->in_metadata)
break;
written += ret;
}
/* see if we need to send the current metadata to the client */
if (client_mp3->use_metadata)
{
unsigned int remaining = source_mp3->interval -
client_mp3->since_meta_block;
/* sending the metadata block */
if (remaining <= len)
{
/* send any mp3 before the metadata block */
if (remaining)
{
ret = client_send_bytes (client, buf, remaining);
if (ret > 0)
{
client_mp3->since_meta_block += ret;
client->pos += ret;
}
if (ret < (int)remaining)
break;
written += ret;
}
ret = send_mp3_metadata (client, refbuf->associated);
if (client_mp3->in_metadata)
break;
written += ret;
/* change buf and len */
buf += remaining;
len -= remaining;
}
}
/* write any mp3, maybe after the metadata block */
if (len)
{
ret = client_send_bytes (client, buf, len);
if (ret > 0)
{
client_mp3->since_meta_block += ret;
client->pos += ret;
}
if (ret < (int)len)
break;
written += ret;
}
ret = 0;
} while (0);
if (ret > 0)
written += ret;
return written;
}
static void format_mp3_free_plugin(format_plugin_t *self)
{
/* free the plugin instance */
mp3_state *state = self->_state;
thread_mutex_destroy(&(state->lock));
free(state->metadata);
thread_mutex_destroy (&state->url_lock);
free (state->url_artist);
free (state->url_title);
refbuf_release (state->metadata);
free(state);
free(self);
}
static int format_mp3_get_buffer(format_plugin_t *self, char *data,
unsigned long len, refbuf_t **buffer)
/* read an mp3 stream which does not have shoutcast style metadata */
static refbuf_t *mp3_get_no_meta (source_t *source)
{
int bytes;
refbuf_t *refbuf;
mp3_state *state = self->_state;
mp3_state *source_mp3 = source->format->_state;
/* Set this to NULL in case it doesn't get set to a valid buffer later */
*buffer = NULL;
if ((refbuf = refbuf_new (2048)) == NULL)
return NULL;
bytes = sock_read_bytes (source->con->sock, refbuf->data, 2048);
if(!data)
return 0;
if(state->inline_metadata_interval) {
/* Source is sending metadata, handle it... */
while(len > 0) {
int to_read = state->inline_metadata_interval - state->offset;
if(to_read > 0) {
refbuf_t *old_refbuf = *buffer;
if(to_read > len)
to_read = len;
if(old_refbuf) {
refbuf = refbuf_new(to_read + old_refbuf->len);
memcpy(refbuf->data, old_refbuf->data, old_refbuf->len);
memcpy(refbuf->data+old_refbuf->len, data, to_read);
refbuf_release(old_refbuf);
}
else {
refbuf = refbuf_new(to_read);
memcpy(refbuf->data, data, to_read);
}
*buffer = refbuf;
state->offset += to_read;
data += to_read;
len -= to_read;
}
else if(!state->metadata_length) {
/* Next up is the metadata byte... */
unsigned char byte = data[0];
data++;
len--;
/* According to the "spec"... this byte * 16 */
state->metadata_length = byte * 16;
if(state->metadata_length) {
state->metadata_buffer =
calloc(state->metadata_length + 1, 1);
/* Ensure we have a null-terminator even if the source
* stream is invalid.
*/
state->metadata_buffer[state->metadata_length] = 0;
}
else {
state->offset = 0;
}
state->metadata_offset = 0;
}
else {
/* Metadata to read! */
int readable = state->metadata_length - state->metadata_offset;
if(readable > len)
readable = len;
memcpy(state->metadata_buffer + state->metadata_offset,
data, readable);
state->metadata_offset += readable;
data += readable;
len -= readable;
if(state->metadata_offset == state->metadata_length)
{
if(state->metadata_length)
{
thread_mutex_lock(&(state->lock));
free(state->metadata);
/* Now, reformat state->metadata_buffer to strip off
StreamTitle=' and the closing '; (but only if there's
enough data for it to be correctly formatted) */
if(state->metadata_length >= 15) {
/* This is overly complex because the
metadata_length is the length of the actual raw
data, but the (null-terminated) string is going
to be shorter than this, and we can't trust that
the raw data doesn't have other embedded-nulls */
int stringlength;
state->metadata = malloc(state->metadata_length -
12);
memcpy(state->metadata,
state->metadata_buffer + 13,
state->metadata_length - 13);
/* Make sure we've got a null-terminator of some
sort */
state->metadata[state->metadata_length - 13] = 0;
/* Now figure out the _right_ one */
stringlength = strlen(state->metadata);
if(stringlength > 2)
state->metadata[stringlength - 2] = 0;
free(state->metadata_buffer);
}
else
state->metadata = state->metadata_buffer;
stats_event(self->mount, "title", state->metadata);
state->metadata_buffer = NULL;
state->metadata_age++;
thread_mutex_unlock(&(state->lock));
yp_touch (self->mount);
}
state->offset = 0;
state->metadata_length = 0;
}
}
}
/* Either we got a buffer above (in which case it can be used), or
* we set *buffer to NULL in the prologue, so the return value is
* correct anyway...
*/
return 0;
if (bytes == 0)
{
INFO1 ("End of stream %s", source->mount);
source->running = 0;
refbuf_release (refbuf);
return NULL;
}
else {
/* Simple case - no metadata, just dump data directly to a buffer */
refbuf = refbuf_new(len);
memcpy(refbuf->data, data, len);
*buffer = refbuf;
return 0;
if (source_mp3->update_metadata)
{
mp3_set_title (source);
source_mp3->update_metadata = 0;
}
}
if (bytes > 0)
{
refbuf->len = bytes;
refbuf->associated = source_mp3->metadata;
refbuf_addref (source_mp3->metadata);
return refbuf;
}
refbuf_release (refbuf);
if (!sock_recoverable (sock_error()))
source->running = 0;
static refbuf_queue_t *format_mp3_get_predata(format_plugin_t *self)
{
return NULL;
}
static void *format_mp3_create_client_data(format_plugin_t *self,
source_t *source, client_t *client)
/* read mp3 data with inlined metadata from the source. Filter out the
* metadata so that the mp3 data itself is store on the queue and the
* metadata is is associated with it
*/
static refbuf_t *mp3_get_filter_meta (source_t *source)
{
refbuf_t *refbuf;
format_plugin_t *plugin = source->format;
mp3_state *source_mp3 = plugin->_state;
unsigned char *src;
unsigned int bytes, mp3_block;
int ret;
refbuf = refbuf_new (2048);
src = refbuf->data;
ret = sock_read_bytes (source->con->sock, refbuf->data, 2048);
if (ret == 0)
{
INFO1 ("End of stream %s", source->mount);
source->running = 0;
refbuf_release (refbuf);
return NULL;
}
if (source_mp3->update_metadata)
{
mp3_set_title (source);
source_mp3->update_metadata = 0;
}
if (ret < 0)
{
refbuf_release (refbuf);
if (sock_recoverable (sock_error()))
return NULL; /* go back to waiting */
INFO0 ("Error on connection from source");
source->running = 0;
return NULL;
}
/* fill the buffer with the read data */
bytes = (unsigned int)ret;
refbuf->len = 0;
while (bytes > 0)
{
unsigned int metadata_remaining;
mp3_block = source_mp3->inline_metadata_interval - source_mp3->offset;
/* is there only enough to account for mp3 data */
if (bytes <= mp3_block)
{
refbuf->len += bytes;
source_mp3->offset += bytes;
break;
}
/* we have enough data to get to the metadata
* block, but only transfer upto it */
if (mp3_block)
{
src += mp3_block;
bytes -= mp3_block;
refbuf->len += mp3_block;
source_mp3->offset += mp3_block;
continue;
}
/* process the inline metadata, len == 0 indicates not seen any yet */
if (source_mp3->build_metadata_len == 0)
{
memset (source_mp3->build_metadata, 0,
sizeof (source_mp3->build_metadata));
source_mp3->build_metadata_offset = 0;
source_mp3->build_metadata_len = 1 + (*src * 16);
}
/* do we have all of the metatdata block */
metadata_remaining = source_mp3->build_metadata_len -
source_mp3->build_metadata_offset;
if (bytes < metadata_remaining)
{
memcpy (source_mp3->build_metadata +
source_mp3->build_metadata_offset, src, bytes);
source_mp3->build_metadata_offset += bytes;
break;
}
/* copy all bytes except the last one, that way we
* know a null byte terminates the message */
memcpy (source_mp3->build_metadata + source_mp3->build_metadata_offset,
src, metadata_remaining-1);
/* overwrite metadata in the buffer */
bytes -= metadata_remaining;
memmove (src, src+metadata_remaining, bytes);
/* assign metadata if it's not 1 byte, as that indicates a change */
if (source_mp3->build_metadata_len > 1)
{
refbuf_t *meta = refbuf_new (source_mp3->build_metadata_len);
memcpy (meta->data, source_mp3->build_metadata,
source_mp3->build_metadata_len);
DEBUG1("shoutcast metadata %.4080s", meta->data+1);
if (strncmp (meta->data+1, "StreamTitle=", 12) == 0)
{
filter_shoutcast_metadata (source, source_mp3->build_metadata,
source_mp3->build_metadata_len);
refbuf_release (source_mp3->metadata);
source_mp3->metadata = meta;
}
else
{
ERROR0 ("Incorrect metadata format, ending stream");
source->running = 0;
refbuf_release (refbuf);
return NULL;
}
}
source_mp3->offset = 0;
source_mp3->build_metadata_len = 0;
}
/* the data we have just read may of just been metadata */
if (refbuf->len == 0)
{
refbuf_release (refbuf);
return NULL;
}
refbuf->associated = source_mp3->metadata;
refbuf_addref (source_mp3->metadata);
return refbuf;
}
static int format_mp3_create_client_data(source_t *source, client_t *client)
{
mp3_client_data *data = calloc(1,sizeof(mp3_client_data));
char *metadata;
data->interval = ICY_METADATA_INTERVAL;
data->offset = 0;
client->free_client_data = free_mp3_client_data;
if (data == NULL)
return -1;
client->format_data = data;
client->free_client_data = free_mp3_client_data;
metadata = httpp_getvar(client->parser, "icy-metadata");
if(metadata)
data->use_metadata = atoi(metadata)>0?1:0;
return data;
return 0;
}
@ -432,3 +608,16 @@ static void format_mp3_send_headers(format_plugin_t *self,
format_send_general_headers(self, source, client);
}
static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf)
{
if (refbuf->len == 0)
return;
if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) < (size_t)refbuf->len)
{
WARN0 ("Write to dump file failed, disabling");
fclose (source->dumpfile);
source->dumpfile = NULL;
}
}

View File

@ -19,18 +19,23 @@
#define __FORMAT_MP3_H__
typedef struct {
char *metadata;
int metadata_age;
mutex_t lock;
/* These are for inline metadata */
int inline_metadata_interval;
int offset;
int metadata_length;
char *metadata_buffer;
int metadata_offset;
unsigned interval;
char *url_artist;
char *url_title;
int update_metadata;
refbuf_t *metadata;
mutex_t url_lock;
unsigned build_metadata_len;
unsigned build_metadata_offset;
char build_metadata[4081];
} mp3_state;
format_plugin_t *format_mp3_get_plugin(http_parser_t *parser);
int format_mp3_get_plugin(struct source_tag *src);
void mp3_set_tag (format_plugin_t *plugin, char *tag, char *value);
#endif /* __FORMAT_MP3_H__ */

View File

@ -49,20 +49,31 @@ typedef struct _vstate_tag
ogg_page og;
unsigned long serialno;
int header;
refbuf_t *headbuf[MAX_HEADER_PAGES];
refbuf_t *file_headers;
refbuf_t *header_pages;
refbuf_t *header_pages_tail;
int packets;
} vstate_t;
struct client_vorbis
{
refbuf_t *headers;
refbuf_t *header_page;
unsigned int pos;
int processing_headers;
};
static void format_vorbis_free_plugin(format_plugin_t *self);
static int format_vorbis_get_buffer(format_plugin_t *self, char *data,
unsigned long len, refbuf_t **buffer);
static refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self);
static void *format_vorbis_create_client_data(format_plugin_t *self,
source_t *source, client_t *client);
static refbuf_t *format_vorbis_get_buffer (source_t *source);
static int format_vorbis_create_client_data (source_t *source, client_t *client);
static void format_vorbis_send_headers(format_plugin_t *self,
source_t *source, client_t *client);
static int write_buf_to_client (format_plugin_t *self, client_t *client);
static void write_ogg_to_file (struct source_tag *source, refbuf_t *refbuf);
format_plugin_t *format_vorbis_get_plugin(void)
int format_vorbis_get_plugin(source_t *source)
{
format_plugin_t *plugin;
vstate_t *state;
@ -70,10 +81,9 @@ format_plugin_t *format_vorbis_get_plugin(void)
plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t));
plugin->type = FORMAT_TYPE_VORBIS;
plugin->has_predata = 1;
plugin->write_buf_to_file = write_ogg_to_file;
plugin->get_buffer = format_vorbis_get_buffer;
plugin->get_predata = format_vorbis_get_predata;
plugin->write_buf_to_client = format_generic_write_buf_to_client;
plugin->write_buf_to_client = write_buf_to_client;
plugin->create_client_data = format_vorbis_create_client_data;
plugin->client_send_headers = format_vorbis_send_headers;
plugin->free_plugin = format_vorbis_free_plugin;
@ -83,52 +93,67 @@ format_plugin_t *format_vorbis_get_plugin(void)
ogg_sync_init(&state->oy);
plugin->_state = (void *)state;
source->format = plugin;
return plugin;
return 0;
}
void format_vorbis_free_plugin(format_plugin_t *self)
{
int i;
vstate_t *state = (vstate_t *)self->_state;
refbuf_t *header = state->header_pages;
/* free memory associated with this plugin instance */
/* free state memory */
while (header)
{
refbuf_t *to_release = header;
header = header->next;
refbuf_release (to_release);
}
ogg_sync_clear(&state->oy);
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);
for (i = 0; i < MAX_HEADER_PAGES; i++) {
if (state->headbuf[i]) {
refbuf_release(state->headbuf[i]);
state->headbuf[i] = NULL;
}
}
free(state);
/* free the plugin instance */
free(self);
}
int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long len, refbuf_t **buffer)
static refbuf_t *format_vorbis_get_buffer (source_t *source)
{
char *buf;
int i, result;
int result;
ogg_packet op;
char *tag;
refbuf_t *refbuf, *source_refbuf;
refbuf_t *refbuf, *header;
char *data;
format_plugin_t *self = source->format;
int bytes;
vstate_t *state = (vstate_t *)self->_state;
source_t *source;
if (data) {
/* write the data to the buffer */
buf = ogg_sync_buffer(&state->oy, len);
memcpy(buf, data, len);
ogg_sync_wrote(&state->oy, len);
data = ogg_sync_buffer (&state->oy, 4096);
bytes = sock_read_bytes (source->con->sock, data, 4096);
if (bytes < 0)
{
if (sock_recoverable (sock_error()))
return NULL;
WARN0 ("source connection has died");
ogg_sync_wrote (&state->oy, 0);
source->running = 0;
return NULL;
}
if (bytes == 0)
{
INFO1 ("End of Stream %s", source->mount);
ogg_sync_wrote (&state->oy, 0);
source->running = 0;
return NULL;
}
ogg_sync_wrote (&state->oy, bytes);
refbuf = NULL;
if (ogg_sync_pageout(&state->oy, &state->og) == 1) {
@ -137,21 +162,25 @@ int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long le
memcpy(&refbuf->data[state->og.header_len], state->og.body, state->og.body_len);
if (state->serialno != ogg_page_serialno(&state->og)) {
DEBUG0("new stream");
/* this is a new logical bitstream */
state->header = 0;
state->packets = 0;
/* release old headers, stream state, vorbis data */
for (i = 0; i < MAX_HEADER_PAGES; i++) {
if (state->headbuf[i]) {
refbuf_release(state->headbuf[i]);
state->headbuf[i] = NULL;
}
}
/* Clear old stuff. Rarely but occasionally needed. */
header = state->header_pages;
while (header)
{
refbuf_t *to_release = header;
DEBUG0 ("clearing out header page");
header = header->next;
refbuf_release (to_release);
}
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);
state->header_pages = NULL;
state->header_pages_tail = NULL;
state->serialno = ogg_page_serialno(&state->og);
ogg_stream_init(&state->os, state->serialno);
@ -164,52 +193,40 @@ int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long le
* extras pages beyond the header. We need to collect the pages
* here anyway, but they may have to be discarded later.
*/
DEBUG1 ("header %d", state->header);
if (ogg_page_granulepos(&state->og) <= 0) {
state->header++;
} else {
/* we're done caching headers */
state->header = -1;
DEBUG0 ("doing stats");
/* put known comments in the stats */
tag = vorbis_comment_query(&state->vc, "TITLE", 0);
if (tag) stats_event(self->mount, "title", tag);
else stats_event(self->mount, "title", "unknown");
if (tag) stats_event(source->mount, "title", tag);
else stats_event(source->mount, "title", "unknown");
tag = vorbis_comment_query(&state->vc, "ARTIST", 0);
if (tag) stats_event(self->mount, "artist", tag);
else stats_event(self->mount, "artist", "unknown");
if (tag) stats_event(source->mount, "artist", tag);
else stats_event(source->mount, "artist", "unknown");
/* don't need these now */
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);
/* Drain the source queue on metadata update otherwise you
could have a mismatch between what is on the source queue
and what is in the state->headbuf */
avl_tree_rlock(global.source_tree);
source = source_find_mount_raw(self->mount);
avl_tree_unlock(global.source_tree);
thread_mutex_lock(&source->queue_mutex);
while ((source_refbuf = refbuf_queue_remove(&source->queue))) {
refbuf_release(source_refbuf);
}
thread_mutex_unlock(&source->queue_mutex);
yp_touch (self->mount);
yp_touch (source->mount);
}
}
/* cache header pages */
if (state->header > 0 && state->packets < 3) {
if(state->header > MAX_HEADER_PAGES) {
refbuf_release(refbuf);
ERROR1("Bad vorbis input: header is more than %d pages long", MAX_HEADER_PAGES);
/* build a list of headers pages for attaching */
if (state->header_pages_tail)
state->header_pages_tail->next = refbuf;
state->header_pages_tail = refbuf;
return -1;
}
refbuf_addref(refbuf);
state->headbuf[state->header - 1] = refbuf;
if (state->header_pages == NULL)
state->header_pages = refbuf;
if (state->packets >= 0 && state->packets < 3) {
ogg_stream_pagein(&state->os, &state->og);
@ -229,36 +246,40 @@ int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long le
}
}
}
/* we do not place ogg headers on the main queue */
return NULL;
}
/* increase ref counts on each header page going out */
header = state->header_pages;
while (header)
{
refbuf_addref (header);
header = header->next;
}
refbuf->associated = state->header_pages;
}
*buffer = refbuf;
return 0;
return refbuf;
}
refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self)
static void free_ogg_client_data (client_t *client)
{
refbuf_queue_t *queue;
int i;
vstate_t *state = (vstate_t *)self->_state;
free (client->format_data);
client->format_data = NULL;
}
queue = NULL;
for (i = 0; i < MAX_HEADER_PAGES; i++) {
if (state->headbuf[i]) {
refbuf_addref(state->headbuf[i]);
refbuf_queue_add(&queue, state->headbuf[i]);
} else {
break;
}
static int format_vorbis_create_client_data (source_t *source, client_t *client)
{
struct client_vorbis *client_data = calloc (1, sizeof (struct client_vorbis));
int ret = -1;
if (client_data)
{
client->format_data = client_data;
client->free_client_data = free_ogg_client_data;
ret = 0;
}
return queue;
}
static void *format_vorbis_create_client_data(format_plugin_t *self,
source_t *source, client_t *client)
{
return NULL;
return ret;
}
static void format_vorbis_send_headers(format_plugin_t *self,
@ -277,4 +298,122 @@ static void format_vorbis_send_headers(format_plugin_t *self,
format_send_general_headers(self, source, client);
}
static int send_ogg_headers (client_t *client, refbuf_t *headers)
{
struct client_vorbis *client_data = client->format_data;
refbuf_t *refbuf;
int written = 0;
if (client_data->processing_headers == 0)
{
client_data->header_page = headers;
client_data->pos = 0;
client_data->processing_headers = 1;
}
refbuf = client_data->header_page;
while (refbuf)
{
char *data = refbuf->data + client_data->pos;
unsigned int len = refbuf->len - client_data->pos;
int ret;
ret = client_send_bytes (client, data, len);
if (ret > 0)
{
written += ret;
client_data->pos += ret;
}
if (ret < (int)len)
return written;
if (client_data->pos == refbuf->len)
{
refbuf = refbuf->next;
client_data->header_page = refbuf;
client_data->pos = 0;
}
}
/* update client info on headers sent */
client_data->processing_headers = 0;
client_data->headers = headers;
return written;
}
static int write_buf_to_client (format_plugin_t *self, client_t *client)
{
refbuf_t *refbuf = client->refbuf;
char *buf;
unsigned int len;
struct client_vorbis *client_data = client->format_data;
int ret, written = 0;
if (refbuf->next == NULL && client->pos == refbuf->len)
return 0;
if (refbuf->next && client->pos == refbuf->len)
{
client_set_queue (client, refbuf->next);
refbuf = client->refbuf;
}
do
{
if (client_data->headers != refbuf->associated)
{
/* different headers seen so send the new ones */
ret = send_ogg_headers (client, refbuf->associated);
if (client_data->processing_headers)
break;
written += ret;
}
buf = refbuf->data + client->pos;
len = refbuf->len - client->pos;
ret = client_send_bytes (client, buf, len);
if (ret > 0)
client->pos += ret;
if (ret < (int)len)
break;
written += ret;
/* we have now written the page(s) */
ret = 0;
} while (0);
if (ret > 0)
written += ret;
return written;
}
static int write_ogg_data (struct source_tag *source, refbuf_t *refbuf)
{
int ret = 1;
if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) != refbuf->len)
{
WARN0 ("Write to dump file failed, disabling");
fclose (source->dumpfile);
source->dumpfile = NULL;
ret = 0;
}
return ret;
}
static void write_ogg_to_file (struct source_tag *source, refbuf_t *refbuf)
{
vstate_t *state = (vstate_t *)source->format->_state;
if (state->file_headers != refbuf->associated)
{
refbuf_t *header = refbuf->associated;
while (header)
{
if (write_ogg_data (source, header) == 0)
return;
header = header->next;
}
state->file_headers = refbuf->associated;
}
write_ogg_data (source, refbuf);
}

View File

@ -18,6 +18,6 @@
#ifndef __FORMAT_VORBIS_H__
#define __FORMAT_VORBIS_H__
format_plugin_t *format_vorbis_get_plugin(void);
int format_vorbis_get_plugin(source_t *source);
#endif /* __FORMAT_VORBIS_H__ */

View File

@ -38,9 +38,22 @@ refbuf_t *refbuf_new(unsigned long size)
refbuf_t *refbuf;
refbuf = (refbuf_t *)malloc(sizeof(refbuf_t));
refbuf->data = (void *)malloc(size);
if (refbuf == NULL)
return NULL;
refbuf->data = NULL;
if (size)
{
refbuf->data = malloc (size);
if (refbuf->data == NULL)
{
free (refbuf);
return NULL;
}
}
refbuf->len = size;
refbuf->_count = 1;
refbuf->next = NULL;
refbuf->associated = NULL;
return refbuf;
}
@ -54,100 +67,15 @@ void refbuf_release(refbuf_t *self)
{
self->_count--;
if (self->_count == 0) {
while (self->associated)
{
refbuf_t *ref = self->associated;
self->associated = ref->next;
refbuf_release (ref);
}
free(self->data);
free(self);
return;
}
}
void refbuf_queue_add(refbuf_queue_t **queue, refbuf_t *refbuf)
{
refbuf_queue_t *node;
refbuf_queue_t *item = (refbuf_queue_t *)malloc(sizeof(refbuf_queue_t));
item->refbuf = refbuf;
item->next = NULL;
if (*queue == NULL) {
*queue = item;
(*queue)->total_length = item->refbuf->len;
} else {
node = *queue;
while (node->next) node = node->next;
node->next = item;
(*queue)->total_length += item->refbuf->len;
}
}
refbuf_t *refbuf_queue_remove(refbuf_queue_t **queue)
{
refbuf_queue_t *item;
refbuf_t *refbuf;
if (*queue == NULL) return NULL;
item = *queue;
*queue = item->next;
item->next = NULL;
refbuf = item->refbuf;
item->refbuf = NULL;
if(*queue)
(*queue)->total_length = item->total_length - refbuf->len;
free(item);
return refbuf;
}
refbuf_t * refbuf_queue_get(refbuf_queue_t **queue, int item)
{
refbuf_queue_t *node = *queue;
int size = 0;
while (node) {
if (size == item) {
return node->refbuf;
}
node = node->next;
size++;
}
return NULL;
}
void refbuf_queue_insert(refbuf_queue_t **queue, refbuf_t *refbuf)
{
refbuf_queue_t *item = (refbuf_queue_t *)malloc(sizeof(refbuf_queue_t));
item->refbuf = refbuf;
item->next = *queue;
if(item->next)
item->total_length = item->next->total_length + item->refbuf->len;
else
item->total_length = item->refbuf->len;
*queue = item;
}
int refbuf_queue_size(refbuf_queue_t **queue)
{
refbuf_queue_t *node = *queue;
int size = 0;
while (node) {
node = node->next;
size++;
}
return size;
}
int refbuf_queue_length(refbuf_queue_t **queue)
{
if(*queue)
return (*queue)->total_length;
else
return 0;
}

View File

@ -22,18 +22,12 @@ typedef struct _refbuf_tag
{
char *data;
long len;
struct _refbuf_tag *associated;
struct _refbuf_tag *next;
unsigned long _count;
} refbuf_t;
typedef struct _refbuf_queue_tag
{
refbuf_t *refbuf;
long total_length;
struct _refbuf_queue_tag *next;
} refbuf_queue_t;
void refbuf_initialize(void);
void refbuf_shutdown(void);
@ -41,22 +35,5 @@ refbuf_t *refbuf_new(unsigned long size);
void refbuf_addref(refbuf_t *self);
void refbuf_release(refbuf_t *self);
void refbuf_queue_add(refbuf_queue_t **queue, refbuf_t *refbuf);
refbuf_t *refbuf_queue_remove(refbuf_queue_t **queue);
void refbuf_queue_insert(refbuf_queue_t **queue, refbuf_t *refbuf);
/* Size in buffers */
int refbuf_queue_size(refbuf_queue_t **queue);
/* Size in bytes */
int refbuf_queue_length(refbuf_queue_t **queue);
refbuf_t * refbuf_queue_get(refbuf_queue_t **queue, int item);
#endif /* __REFBUF_H__ */

View File

@ -60,6 +60,7 @@ mutex_t move_clients_mutex;
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
static void _parse_audio_info (source_t *source, const char *s);
static void source_shutdown (source_t *source);
/* Allocate a new source with the stated mountpoint, if one already
* exists with that mountpoint in the global source tree then return
@ -189,7 +190,6 @@ int source_compare_sources(void *arg, void *a, void *b)
void source_clear_source (source_t *source)
{
refbuf_t *refbuf;
DEBUG1 ("clearing source \"%s\"", source->mount);
client_destroy(source->client);
source->client = NULL;
@ -228,6 +228,10 @@ void source_clear_source (source_t *source)
if (source->yp_public)
yp_remove (source->mount);
source->burst_point = NULL;
source->burst_size = 0;
source->burst_offset = 0;
source->queue_size = 0;
source->queue_size_limit = 0;
source->listeners = 0;
source->no_mount = 0;
@ -239,12 +243,18 @@ void source_clear_source (source_t *source)
free(source->dumpfilename);
source->dumpfilename = NULL;
/* Lets clear out the source queue too */
while ((refbuf = refbuf_queue_remove(&source->queue)))
refbuf_release(refbuf);
source->queue = NULL;
source->burst_on_connect = 1;
thread_mutex_destroy(&source->queue_mutex);
while (source->stream_data)
{
refbuf_t *p = source->stream_data;
source->stream_data = p->next;
/* can be referenced by burst handler as well */
while (p->_count > 1)
refbuf_release (p);
refbuf_release (p);
}
source->stream_data_tail = NULL;
}
@ -332,7 +342,8 @@ void source_move_clients (source_t *source, source_t *dest)
client = (client_t *)(node->key);
avl_delete (source->pending_tree, client, NULL);
/* TODO: reset client local format data? */
/* switch client to different queue */
client_set_queue (client, dest->stream_data_tail);
avl_insert (dest->pending_tree, (void *)client);
}
@ -346,7 +357,8 @@ void source_move_clients (source_t *source, source_t *dest)
client = (client_t *)(node->key);
avl_delete (source->client_tree, client, NULL);
/* TODO: reset client local format data? */
/* switch client to different queue */
client_set_queue (client, dest->stream_data_tail);
avl_insert (dest->pending_tree, (void *)client);
}
source->listeners = 0;
@ -360,6 +372,105 @@ void source_move_clients (source_t *source, source_t *dest)
thread_mutex_unlock (&move_clients_mutex);
}
/* get some data from the source. The stream data is placed in a refbuf
* and sent back, however NULL is also valid as in the case of a short
* timeout and there's no data pending.
*/
static refbuf_t *get_next_buffer (source_t *source)
{
refbuf_t *refbuf = NULL;
int delay = 250;
if (source->short_delay)
delay = 0;
while (global.running == ICE_RUNNING && source->running)
{
int fds;
time_t current = time (NULL);
fds = util_timed_wait_for_fd (source->con->sock, delay);
if (fds < 0)
{
if (! sock_recoverable (sock_error()))
{
WARN0 ("Error while waiting on socket, Disconnecting source");
source->running = 0;
}
break;
}
if (fds == 0)
{
if (source->last_read + (time_t)source->timeout < current)
{
DEBUG3 ("last %ld, timeout %ld, now %ld", source->last_read, source->timeout, current);
WARN0 ("Disconnecting source due to socket timeout");
source->running = 0;
}
break;
}
source->last_read = current;
refbuf = source->format->get_buffer (source);
if (refbuf)
break;
}
return refbuf;
}
/* general send routine per listener. The deletion_expected tells us whether
* the last in the queue is about to disappear, so if this client is still
* referring to it after writing then drop the client as it's fallen too far
* behind
*/
static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
{
int bytes;
int loop = 10; /* max number of iterations in one go */
int total_written = 0;
/* new users need somewhere to start from */
if (client->refbuf == NULL)
{
/* make clients start at the per source burst point on the queue */
client_set_queue (client, source->burst_point);
if (client->refbuf == NULL)
return;
}
while (1)
{
/* jump out if client connection has died */
if (client->con->error)
break;
/* lets not send too much to one client in one go, but don't
sleep for too long if more data can be sent */
if (total_written > 20000 || loop == 0)
{
source->short_delay = 1;
break;
}
loop--;
bytes = source->format->write_buf_to_client (source->format, client);
if (bytes <= 0)
break; /* can't write any more */
total_written += bytes;
}
/* the refbuf referenced at head (last in queue) may be marked for deletion
* if so, check to see if this client is still referring to it */
if (deletion_expected && client->refbuf == source->stream_data)
{
DEBUG0("Client has fallen too far behind, removing");
client->con->error = 1;
}
}
static void source_init (source_t *source)
{
@ -375,7 +486,6 @@ static void source_init (source_t *source)
memset (listenurl, '\000', listen_url_size);
snprintf (listenurl, listen_url_size, "http://%s:%d%s",
config->hostname, config->port, source->mount);
source->burst_on_connect = config->burst_on_connect;
config_release_config();
/* maybe better in connection.c */
@ -423,9 +533,8 @@ static void source_init (source_t *source)
sock_set_blocking (source->con->sock, SOCK_NONBLOCK);
thread_mutex_create(&source->queue_mutex);
DEBUG0("Source creation complete");
source->last_read = time (NULL);
source->running = 1;
/*
@ -454,212 +563,70 @@ static void source_init (source_t *source)
void source_main (source_t *source)
{
char buffer[4096];
long bytes, sbytes;
int ret, i;
unsigned int listeners;
refbuf_t *refbuf;
client_t *client;
avl_node *client_node;
refbuf_t *refbuf, *abuf, *stale_refbuf;
int data_done;
source_init (source);
while (global.running == ICE_RUNNING && source->running) {
ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
if(ret < 0) {
WARN0("Bad data from source");
break;
}
if (source->burst_on_connect) {
thread_mutex_lock(&source->queue_mutex);
/* Add to the source buffer */
if (refbuf) {
refbuf_addref(refbuf);
refbuf_queue_add(&(source->queue), refbuf);
/* We derive the size of the source buffer queue based off the
setting for queue_size_limit (client buffer queue size).
This is because the source buffer queue size should be a
percentage of the client buffer size (definately cannot
be larger). Why 50% ? Because > 75% does not give the
client enough leeway for lagging on initial connection
and < 25% does not provide a good enough burst on connect. */
if (refbuf_queue_length(&(source->queue)) >
source->queue_size_limit/2) {
stale_refbuf = refbuf_queue_remove(&(source->queue));
refbuf_release(stale_refbuf);
}
}
thread_mutex_unlock(&source->queue_mutex);
}
bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
while (refbuf == NULL) {
bytes = 0;
while (bytes <= 0) {
ret = util_timed_wait_for_fd(source->con->sock, source->timeout*1000);
int remove_from_q;
if (ret < 0 && sock_recoverable (sock_error()))
continue;
if (ret <= 0) { /* timeout expired */
WARN1("Disconnecting source: socket timeout (%d s) expired",
source->timeout);
bytes = 0;
break;
}
refbuf = get_next_buffer (source);
bytes = sock_read_bytes(source->con->sock, buffer, 4096);
if (bytes == 0 ||
(bytes < 0 && !sock_recoverable(sock_error())))
{
DEBUG1("Disconnecting source due to socket read error: %s",
strerror(sock_error()));
break;
}
}
if (bytes <= 0) break;
source->client->con->sent_bytes += bytes;
ret = source->format->get_buffer(source->format, buffer, bytes,
&refbuf);
if(ret < 0) {
WARN0("Bad data from source");
goto done;
}
if (source->burst_on_connect) {
/* Add to the source buffer */
thread_mutex_lock(&source->queue_mutex);
if (refbuf) {
refbuf_addref(refbuf);
refbuf_queue_add(&(source->queue), refbuf);
if (refbuf_queue_length(&(source->queue)) >
source->queue_size_limit/2) {
stale_refbuf = refbuf_queue_remove(&(source->queue));
refbuf_release(stale_refbuf);
}
}
thread_mutex_unlock(&source->queue_mutex);
}
}
remove_from_q = 0;
source->short_delay = 0;
if (bytes <= 0) {
INFO0("Removing source following disconnection");
break;
}
/* we have a refbuf buffer, which a data block to be sent to
** all clients. if a client is not able to send the buffer
** immediately, it should store it on its queue for the next
** go around.
**
** instead of sending the current block, a client should send
** all data in the queue, plus the current block, until either
** it runs out of data, or it hits a recoverable error like
** EAGAIN. this will allow a client that got slightly lagged
** to catch back up if it can
*/
/* First, stream dumping, if enabled */
if(source->dumpfile) {
if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
refbuf->len)
if (refbuf)
{
/* append buffer to the in-flight data queue, */
if (source->stream_data == NULL)
{
WARN1("Write to dump file failed, disabling: %s",
strerror(errno));
fclose(source->dumpfile);
source->dumpfile = NULL;
source->stream_data = refbuf;
source->burst_point = refbuf;
}
}
if (source->stream_data_tail)
source->stream_data_tail->next = refbuf;
source->stream_data_tail = refbuf;
source->queue_size += refbuf->len;
/* new buffer is referenced for burst */
refbuf_addref (refbuf);
/* acquire read lock on client_tree */
avl_tree_rlock(source->client_tree);
client_node = avl_get_first(source->client_tree);
while (client_node) {
/* acquire read lock on node */
avl_node_wlock(client_node);
client = (client_t *)client_node->key;
data_done = 0;
/* do we have any old buffers? */
abuf = refbuf_queue_remove(&client->queue);
while (abuf) {
bytes = abuf->len - client->pos;
sbytes = source->format->write_buf_to_client(source->format,
client, &abuf->data[client->pos], bytes);
if (sbytes < bytes) {
if (client->con->error) {
refbuf_release (abuf);
}
else {
/* We didn't send the entire buffer. Leave it for
* the moment, handle it in the next iteration.
*/
client->pos += sbytes<0?0:sbytes;
refbuf_queue_insert (&client->queue, abuf);
}
data_done = 1;
break;
}
/* we're done with that refbuf, release it and reset the pos */
refbuf_release(abuf);
client->pos = 0;
abuf = refbuf_queue_remove(&client->queue);
}
/* now send or queue the new data */
if (data_done) {
refbuf_addref(refbuf);
refbuf_queue_add(&client->queue, refbuf);
} else {
sbytes = source->format->write_buf_to_client(source->format,
client, refbuf->data, refbuf->len);
if (client->con->error == 0 && sbytes < refbuf->len) {
/* Didn't send the entire buffer, queue it */
client->pos = sbytes<0?0:sbytes;
refbuf_addref(refbuf);
refbuf_queue_insert(&client->queue, refbuf);
/* new data on queue, so check the burst point */
source->burst_offset += refbuf->len;
if (source->burst_offset > source->burst_size)
{
if (source->burst_point->next)
{
refbuf_release (source->burst_point);
source->burst_point = source->burst_point->next;
source->burst_offset -= source->burst_point->len;
}
}
/* if the client is too slow, its queue will slowly build up.
** we need to make sure the client is keeping up with the
** data, so we'll kick any client who's queue gets to large.
*/
if (refbuf_queue_length(&client->queue) > source->queue_size_limit) {
DEBUG0("Client has fallen too far behind, removing");
client->con->error = 1;
}
/* release read lock on node */
avl_node_unlock(client_node);
/* get the next node */
client_node = avl_get_next(client_node);
}
/* release read lock on client_tree */
avl_tree_unlock(source->client_tree);
/* Only release the refbuf if we didn't add it to the source queue */
if (!source->burst_on_connect) {
refbuf_release(refbuf);
/* save stream to file */
if (source->dumpfile && source->format->write_buf_to_file)
source->format->write_buf_to_file (source, refbuf);
}
/* lets see if we have too much data in the queue, but don't remove it until later */
if (source->queue_size > source->queue_size_limit)
remove_from_q = 1;
/* acquire write lock on client_tree */
avl_tree_wlock(source->client_tree);
/** delete bad clients **/
listeners = source->listeners;
client_node = avl_get_first(source->client_tree);
while (client_node) {
client = (client_t *)client_node->key;
send_to_listener (source, client, remove_from_q);
if (client->con->error) {
client_node = avl_get_next(client_node);
avl_delete(source->client_tree, (void *)client, _free_client);
source->listeners--;
stats_event_args(source->mount, "listeners", "%d",
source->listeners);
DEBUG0("Client removed");
continue;
}
@ -696,33 +663,6 @@ void source_main (source_t *source)
DEBUG0("Client added");
stats_event_inc(NULL, "clients");
stats_event_inc(source->mount, "connections");
stats_event_args(source->mount, "listeners", "%d",
source->listeners);
/* we have to send cached headers for some data formats
** this is where we queue up the buffers to send
*/
client = (client_t *)client_node->key;
if (source->format->has_predata) {
client->queue = source->format->get_predata(source->format);
}
if (source->burst_on_connect) {
/* here is where we fill up the new client with refbufs from
the source buffer. this will allow an initial burst of
audio data to be sent to the client, and allow for a faster
startup time (from listener perspective) for the stream */
if (!client->burst_sent) {
thread_mutex_lock(&source->queue_mutex);
for (i=0;i<refbuf_queue_size(&(source->queue));i++) {
refbuf_queue_add(&(client->queue),
refbuf_queue_get(&(source->queue), i));
}
thread_mutex_unlock(&source->queue_mutex);
client->burst_sent = 1;
DEBUG1("Added %d buffers to initial client queue",
refbuf_queue_length(&(source->queue)));
}
}
client_node = avl_get_next(client_node);
}
@ -737,12 +677,47 @@ void source_main (source_t *source)
/* release write lock on pending_tree */
avl_tree_unlock(source->pending_tree);
/* update the stats if need be */
if (source->listeners != listeners)
{
INFO2("listener count on %s now %d", source->mount, source->listeners);
stats_event_args (source->mount, "listeners", "%d", source->listeners);
}
/* lets reduce the queue, any lagging clients should of been
* terminated by now
*/
if (source->stream_data)
{
/* normal unreferenced queue data will have a refcount 1, but
* burst queue data will be at least 2, active clients will also
* increase refcount */
while (source->stream_data->_count == 1)
{
refbuf_t *to_go = source->stream_data;
if (to_go->next == NULL || source->burst_point == to_go)
{
/* this should not happen */
ERROR0 ("queue state is unexpected");
source->running = 0;
break;
}
source->stream_data = to_go->next;
source->queue_size -= to_go->len;
refbuf_release (to_go);
}
}
/* release write lock on client_tree */
avl_tree_unlock(source->client_tree);
}
source_shutdown (source);
}
done:
static void source_shutdown (source_t *source)
{
source->running = 0;
INFO1("Source \"%s\" exiting", source->mount);
@ -776,8 +751,6 @@ done:
/* release our hold on the lock so the main thread can continue cleaning up */
thread_rwlock_unlock(source->shutdown_rwlock);
return;
}
static int _compare_clients(void *compare_arg, void *a, void *b)
@ -816,7 +789,7 @@ static int _free_client(void *key)
static void _parse_audio_info (source_t *source, const char *s)
{
const char *start = s;
unsigned len;
unsigned int len;
while (start != NULL && *start != '\0')
{
@ -878,6 +851,9 @@ void source_apply_mount (source_t *source, mount_proxy *mountinfo)
source->timeout = mountinfo->source_timeout;
DEBUG1 ("source timeout to %u", source->timeout);
}
if (mountinfo->burst_size > -1)
source->burst_size = mountinfo->burst_size;
DEBUG1 ("amount to burst on client connect set to %u", source->burst_size);
}

View File

@ -55,11 +55,22 @@ typedef struct source_tag
struct auth_tag *authenticator;
int fallback_override;
int no_mount;
unsigned queue_size_limit;
/* per source burst handling for connecting clients */
unsigned int burst_size; /* trigger level for burst on connect */
unsigned int burst_offset;
refbuf_t *burst_point;
unsigned int queue_size;
unsigned int queue_size_limit;
unsigned timeout; /* source timeout in seconds */
refbuf_queue_t *queue;
mutex_t queue_mutex;
int burst_on_connect;
time_t last_read;
int short_delay;
refbuf_t *stream_data;
refbuf_t *stream_data_tail;
} source_t;
source_t *source_reserve (const char *mount);