diff --git a/src/admin.c b/src/admin.c index dd76a871..8a7362eb 100644 --- a/src/admin.c +++ b/src/admin.c @@ -799,11 +799,7 @@ static void command_metadata(client_t *client, source_t *source) state = source->format->_state; - thread_mutex_lock(&(state->lock)); - free(state->metadata); - state->metadata = strdup(value); - state->metadata_age++; - thread_mutex_unlock(&(state->lock)); + mp3_set_tag (source->format, "title", value); DEBUG2("Metadata on mountpoint %s changed to \"%s\"", source->mount, value); diff --git a/src/cfgfile.c b/src/cfgfile.c index bbd51166..5c237dd4 100644 --- a/src/cfgfile.c +++ b/src/cfgfile.c @@ -339,7 +339,8 @@ static void _set_defaults(ice_config_t *configuration) configuration->num_yp_directories = 0; configuration->relay_username = NULL; configuration->relay_password = NULL; - configuration->burst_on_connect = 1; + /* default to a typical prebuffer size used by clients */ + configuration->burst_size = 65536; } static void _parse_root(xmlDocPtr doc, xmlNodePtr node, @@ -463,7 +464,12 @@ static void _parse_limits(xmlDocPtr doc, xmlNodePtr node, if (tmp) xmlFree(tmp); } else if (strcmp(node->name, "burst-on-connect") == 0) { tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); - configuration->burst_on_connect = atoi(tmp); + if (atoi(tmp) == 0) + configuration->burst_size = 0; + if (tmp) xmlFree(tmp); + } else if (strcmp(node->name, "burst-size") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + configuration->burst_size = atoi(tmp); if (tmp) xmlFree(tmp); } } while ((node = node->next)); @@ -489,7 +495,9 @@ static void _parse_mount(xmlDocPtr doc, xmlNodePtr node, else configuration->mounts = mount; + /* default settings */ mount->max_listeners = -1; + mount->burst_size = -1; mount->next = NULL; do { @@ -574,6 +582,10 @@ static void _parse_mount(xmlDocPtr doc, xmlNodePtr node, mount->source_timeout = atoi (tmp); xmlFree(tmp); } + } else if (strcmp(node->name, "burst-size") == 0) { + tmp = (char *)xmlNodeListGetString(doc, node->xmlChildrenNode, 1); + mount->burst_size = atoi(tmp); + if (tmp) xmlFree(tmp); } } while ((node = node->next)); } diff --git a/src/cfgfile.h b/src/cfgfile.h index 3e9ee747..e4ff21fb 100644 --- a/src/cfgfile.h +++ b/src/cfgfile.h @@ -54,8 +54,10 @@ typedef struct _mount_proxy { clients from the fallback? */ int no_mount; /* Do we permit direct requests of this mountpoint? (or only indirect, through fallbacks) */ - unsigned queue_size_limit; - unsigned source_timeout; /* source timeout in seconds */ + int burst_size; /* amount to send to a new client if possible, -1 take + * from global setting */ + unsigned int queue_size_limit; + unsigned int source_timeout; /* source timeout in seconds */ char *auth_type; /* Authentication type */ config_options_t *auth_options; /* Options for this type */ @@ -84,8 +86,9 @@ typedef struct ice_config_tag int client_limit; int source_limit; - unsigned queue_size_limit; + unsigned int queue_size_limit; int threadpool_size; + unsigned int burst_size; int client_timeout; int header_timeout; int source_timeout; @@ -133,7 +136,6 @@ typedef struct ice_config_tag char *yp_url[MAX_YP_DIRECTORIES]; int yp_url_timeout[MAX_YP_DIRECTORIES]; int num_yp_directories; - int burst_on_connect; } ice_config_t; typedef struct { diff --git a/src/client.c b/src/client.c index 2ab424df..8d5b1670 100644 --- a/src/client.c +++ b/src/client.c @@ -42,17 +42,14 @@ client_t *client_create(connection_t *con, http_parser_t *parser) client->con = con; client->parser = parser; - client->queue = NULL; + client->refbuf = NULL; client->pos = 0; - client->burst_sent = 0; return client; } void client_destroy(client_t *client) { - refbuf_t *refbuf; - if (client == NULL) return; /* write log entry if ip is set (some things don't set it, like outgoing @@ -64,9 +61,9 @@ void client_destroy(client_t *client) connection_close(client->con); httpp_destroy(client->parser); - while ((refbuf = refbuf_queue_remove(&client->queue))) - refbuf_release(refbuf); - + /* drop ref counts if need be */ + if (client->refbuf) + refbuf_release (client->refbuf); /* we need to free client specific format data (if any) */ if (client->free_client_data) client->free_client_data (client); @@ -144,3 +141,14 @@ int client_send_bytes (client_t *client, const void *buf, unsigned len) return ret; } +void client_set_queue (client_t *client, refbuf_t *refbuf) +{ + refbuf_t *to_release = client->refbuf; + + client->refbuf = refbuf; + refbuf_addref (client->refbuf); + client->pos = 0; + if (to_release) + refbuf_release (to_release); +} + diff --git a/src/client.h b/src/client.h index dcb09057..4d52eee2 100644 --- a/src/client.h +++ b/src/client.h @@ -32,8 +32,9 @@ typedef struct _client_tag /* http response code for this client */ int respcode; - /* buffer queue */ - refbuf_queue_t *queue; + /* where in the queue the client is */ + refbuf_t *refbuf; + /* position in first buffer */ unsigned long pos; @@ -45,7 +46,6 @@ typedef struct _client_tag /* function to call to release format specific resources */ void (*free_client_data)(struct _client_tag *client); - int burst_sent; } client_t; client_t *client_create(connection_t *con, http_parser_t *parser); @@ -56,5 +56,6 @@ void client_send_401(client_t *client); void client_send_403(client_t *client); void client_send_400(client_t *client, char *message); int client_send_bytes (client_t *client, const void *buf, unsigned len); +void client_set_queue (client_t *client, refbuf_t *refbuf); #endif /* __CLIENT_H__ */ diff --git a/src/connection.c b/src/connection.c index 2708f006..ca7bd6cb 100644 --- a/src/connection.c +++ b/src/connection.c @@ -465,9 +465,8 @@ int connection_complete_source (source_t *source) "for icecast 1.x relays. Assuming content is mp3."); format_type = FORMAT_TYPE_MP3; } - source->format = format_get_plugin (format_type, source->mount, source->parser); - if (source->format == NULL) + if (format_get_plugin (format_type, source) < 0) { global_unlock(); config_release_config(); @@ -483,6 +482,7 @@ int connection_complete_source (source_t *source) /* set global settings first */ source->queue_size_limit = config->queue_size_limit; source->timeout = config->source_timeout; + source->burst_size = config->burst_size; /* for relays, we don't yet have a client, however we do require one * to retrieve the stream from. This is created here, quite late, @@ -932,8 +932,7 @@ static void _handle_get_request(connection_t *con, global.clients++; global_unlock(); - client->format_data = source->format->create_client_data( - source->format, source, client); + source->format->create_client_data (source, client); source->format->client_send_headers(source->format, source, client); diff --git a/src/format.c b/src/format.c index 5e920f8a..8280dc2c 100644 --- a/src/format.c +++ b/src/format.c @@ -75,32 +75,22 @@ char *format_get_mimetype(format_type_t type) } } -format_plugin_t *format_get_plugin(format_type_t type, char *mount, - http_parser_t *parser) +int format_get_plugin(format_type_t type, source_t *source) { - format_plugin_t *plugin; + int ret = -1; switch (type) { case FORMAT_TYPE_VORBIS: - plugin = format_vorbis_get_plugin(); - if (plugin) plugin->mount = mount; + ret = format_vorbis_get_plugin (source); break; case FORMAT_TYPE_MP3: - plugin = format_mp3_get_plugin(parser); - if (plugin) plugin->mount = mount; + ret = format_mp3_get_plugin (source); break; default: - plugin = NULL; break; } - return plugin; -} - -int format_generic_write_buf_to_client(format_plugin_t *format, - client_t *client, unsigned char *buf, int len) -{ - return client_send_bytes (client, buf, len); + return ret; } void format_send_general_headers(format_plugin_t *format, diff --git a/src/format.h b/src/format.h index 40c960f7..81a92421 100644 --- a/src/format.h +++ b/src/format.h @@ -45,13 +45,10 @@ typedef struct _format_plugin_tag */ int has_predata; - int (*get_buffer)(struct _format_plugin_tag *self, char *data, unsigned long - len, refbuf_t **buffer); - refbuf_queue_t *(*get_predata)(struct _format_plugin_tag *self); - int (*write_buf_to_client)(struct _format_plugin_tag *format, - client_t *client, unsigned char *buf, int len); - void *(*create_client_data)(struct _format_plugin_tag *format, - struct source_tag *source, client_t *client); + refbuf_t *(*get_buffer)(struct source_tag *); + int (*write_buf_to_client)(struct _format_plugin_tag *format, client_t *client); + void (*write_buf_to_file)(struct source_tag *source, refbuf_t *refbuf); + int (*create_client_data)(struct source_tag *source, client_t *client); void (*client_send_headers)(struct _format_plugin_tag *format, struct source_tag *source, client_t *client); void (*free_plugin)(struct _format_plugin_tag *self); @@ -62,11 +59,8 @@ typedef struct _format_plugin_tag format_type_t format_get_type(char *contenttype); char *format_get_mimetype(format_type_t type); -format_plugin_t *format_get_plugin(format_type_t type, char *mount, - http_parser_t *parser); +int format_get_plugin(format_type_t type, struct source_tag *source); -int format_generic_write_buf_to_client(format_plugin_t *format, - client_t *client, unsigned char *buf, int len); void format_send_general_headers(format_plugin_t *format, struct source_tag *source, client_t *client); diff --git a/src/format_mp3.c b/src/format_mp3.c index 1003749b..8eab1995 100644 --- a/src/format_mp3.c +++ b/src/format_mp3.c @@ -54,38 +54,38 @@ #define ICY_METADATA_INTERVAL 16000 static void format_mp3_free_plugin(format_plugin_t *self); -static int format_mp3_get_buffer(format_plugin_t *self, char *data, - unsigned long len, refbuf_t **buffer); -static refbuf_queue_t *format_mp3_get_predata(format_plugin_t *self); -static void *format_mp3_create_client_data(format_plugin_t *self, - source_t *source, client_t *client); +static refbuf_t *mp3_get_filter_meta (source_t *source); +static refbuf_t *mp3_get_no_meta (source_t *source); + +static int format_mp3_create_client_data (source_t *source, client_t *client); static void free_mp3_client_data (client_t *client); -static int format_mp3_write_buf_to_client(format_plugin_t *self, - client_t *client, unsigned char *buf, int len); +static int format_mp3_write_buf_to_client(format_plugin_t *self, client_t *client); static void format_mp3_send_headers(format_plugin_t *self, source_t *source, client_t *client); +static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf); + typedef struct { int use_metadata; - int interval; - int offset; - int metadata_age; int metadata_offset; + unsigned int since_meta_block; + int in_metadata; + refbuf_t *associated; } mp3_client_data; -format_plugin_t *format_mp3_get_plugin(http_parser_t *parser) +int format_mp3_get_plugin (source_t *source) { char *metadata; format_plugin_t *plugin; mp3_state *state = calloc(1, sizeof(mp3_state)); + refbuf_t *meta; plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t)); plugin->type = FORMAT_TYPE_MP3; - plugin->has_predata = 0; - plugin->get_buffer = format_mp3_get_buffer; - plugin->get_predata = format_mp3_get_predata; + plugin->get_buffer = mp3_get_no_meta; plugin->write_buf_to_client = format_mp3_write_buf_to_client; + plugin->write_buf_to_file = write_mp3_to_file; plugin->create_client_data = format_mp3_create_client_data; plugin->client_send_headers = format_mp3_send_headers; plugin->free_plugin = format_mp3_free_plugin; @@ -93,309 +93,485 @@ format_plugin_t *format_mp3_get_plugin(http_parser_t *parser) plugin->_state = state; - state->metadata_age = 0; - state->metadata = strdup(""); - thread_mutex_create(&(state->lock)); + meta = refbuf_new (1); + memcpy (meta->data, "", 1); + state->metadata = meta; + state->interval = ICY_METADATA_INTERVAL; - metadata = httpp_getvar(parser, "icy-metaint"); - if(metadata) - state->inline_metadata_interval = atoi(metadata); - - return plugin; -} - - -static int send_metadata(client_t *client, mp3_client_data *client_state, - mp3_state *source_state) -{ - int len_byte; - int len; - int ret = -1; - unsigned char *buf; - int source_age; - char *fullmetadata = NULL; - int fullmetadata_size = 0; - const char meta_fmt[] = "StreamTitle='';"; - - do + metadata = httpp_getvar (source->parser, "icy-metaint"); + if (metadata) { - thread_mutex_lock (&(source_state->lock)); - if (source_state->metadata == NULL) - break; /* Shouldn't be possible */ + state->inline_metadata_interval = atoi (metadata); + state->offset = 0; + plugin->get_buffer = mp3_get_filter_meta; + } + source->format = plugin; + thread_mutex_create (&state->url_lock); - fullmetadata_size = strlen (source_state->metadata) + sizeof (meta_fmt); - - /* Noted without comment: When the metadata string is a multiple of - * 16 bytes, the "reference" shoutcast server increases the length - * byte by one and appends 16 nulls, rather than omitting the - * pointless trailing null. */ - if (fullmetadata_size > 4079) - { - fullmetadata_size = 4079; - } - fullmetadata = malloc (fullmetadata_size); - if (fullmetadata == NULL) - break; - - fullmetadata_size = snprintf (fullmetadata, fullmetadata_size, - "StreamTitle='%.*s';", fullmetadata_size-(sizeof (meta_fmt)-1), source_state->metadata); - - source_age = source_state->metadata_age; - - if (fullmetadata_size > 0 && source_age != client_state->metadata_age) - { - len_byte = (fullmetadata_size)/16 + 1; /* to give 1-255 */ - client_state->metadata_offset = 0; - } - else - len_byte = 0; - len = 1 + len_byte*16; - buf = calloc (1, len); - if (buf == NULL) - break; - - buf[0] = len_byte; - - if (len > 1) { - strncpy (buf+1, fullmetadata, len-1); - buf[len-1] = '\0'; - } - - thread_mutex_unlock (&(source_state->lock)); - - /* only write what hasn't been written already */ - ret = client_send_bytes (client, buf + client_state->metadata_offset, - len - client_state->metadata_offset); - - if (ret > 0 && ret < len) { - client_state->metadata_offset += ret; - } - else if (ret == len) { - client_state->metadata_age = source_age; - client_state->offset = 0; - client_state->metadata_offset = 0; - } - free (buf); - free (fullmetadata); - return ret; - - } while (0); - - thread_mutex_unlock(&(source_state->lock)); - free (fullmetadata); - return -1; + return 0; } -static int format_mp3_write_buf_to_client(format_plugin_t *self, - client_t *client, unsigned char *buf, int len) + +void mp3_set_tag (format_plugin_t *plugin, char *tag, char *value) +{ + mp3_state *source_mp3 = plugin->_state; + unsigned int len; + const char meta[] = "StreamTitle='"; + int size = sizeof (meta) + 1; + + if (tag==NULL || value == NULL) + return; + + len = strlen (value)+1; + size += len; + /* protect against multiple updaters */ + thread_mutex_lock (&source_mp3->url_lock); + if (strcmp (tag, "title") == 0 || strcmp (tag, "song") == 0) + { + char *p = strdup (value); + if (p) + { + free (source_mp3->url_title); + free (source_mp3->url_artist); + source_mp3->url_artist = NULL; + source_mp3->url_title = p; + source_mp3->update_metadata = 1; + } + } + else if (strcmp (tag, "artist") == 0) + { + char *p = strdup (value); + if (p) + { + free (source_mp3->url_artist); + source_mp3->url_artist = p; + source_mp3->update_metadata = 1; + } + } + thread_mutex_unlock (&source_mp3->url_lock); +} + + +static void filter_shoutcast_metadata (source_t *source, char *metadata, unsigned int meta_len) +{ + if (metadata) + { + char *end, *p; + int len; + + do + { + metadata++; + if (strncmp (metadata, "StreamTitle='", 13)) + break; + if ((end = strstr (metadata, "\';")) == NULL) + break; + len = (end - metadata) - 13; + p = calloc (1, len+1); + if (p) + { + memcpy (p, metadata+13, len); + stats_event (source->mount, "title", p); + yp_touch (source->mount); + free (p); + } + } while (0); + } +} + + +/* called from the source thread when the metadata has been updated. + * The artist title are checked and made ready for clients to send + */ +void mp3_set_title (source_t *source) +{ + const char meta[] = "StreamTitle='"; + int size; + unsigned char len_byte; + refbuf_t *p; + unsigned int len = sizeof(meta) + 2; /* the StreamTitle, quotes, ; and null */ + mp3_state *source_mp3 = source->format->_state; + + /* make sure the url data does not disappear from under us */ + thread_mutex_lock (&source_mp3->url_lock); + + /* work out message length */ + if (source_mp3->url_artist) + len += strlen (source_mp3->url_artist); + if (source_mp3->url_title) + len += strlen (source_mp3->url_title); + if (source_mp3->url_artist && source_mp3->url_title) + len += 3; +#define MAX_META_LEN 255*16 + if (len > MAX_META_LEN) + { + thread_mutex_unlock (&source_mp3->url_lock); + WARN1 ("Metadata too long at %d chars", len); + return; + } + /* work out the metadata len byte */ + len_byte = (len-1) / 16 + 1; + + /* now we know how much space to allocate, +1 for the len byte */ + size = len_byte * 16 + 1; + + p = refbuf_new (size); + if (p) + { + mp3_state *source_mp3 = source->format->_state; + + memset (p->data, '\0', size); + if (source_mp3->url_artist && source_mp3->url_title) + snprintf (p->data, size, "%c%s%s - %s';", len_byte, meta, + source_mp3->url_artist, source_mp3->url_title); + else + snprintf (p->data, size, "%c%s%s';", len_byte, meta, + source_mp3->url_title); + filter_shoutcast_metadata (source, p->data, size); + + refbuf_release (source_mp3->metadata); + source_mp3->metadata = p; + } + thread_mutex_unlock (&source_mp3->url_lock); +} + + +/* send the appropriate metadata, and return the number of bytes written + * which is 0 or greater. Check the client in_metadata value afterwards + * to see if all metadata has been sent + */ +static int send_mp3_metadata (client_t *client, refbuf_t *associated) { int ret = 0; - mp3_client_data *mp3data = client->format_data; - - if(((mp3_state *)self->_state)->metadata && mp3data->use_metadata) + unsigned char *metadata; + int meta_len; + mp3_client_data *client_mp3 = client->format_data; + + /* If there is a change in metadata then send it else + * send a single zero value byte in its place + */ + if (associated == client_mp3->associated) { - mp3_client_data *state = client->format_data; - int max = state->interval - state->offset; - - if(len == 0) /* Shouldn't happen */ - return 0; - - if(max > len) - max = len; - - if(max > 0) { - ret = client_send_bytes (client, buf, max); - if(ret > 0) - state->offset += ret; - } - else { - send_metadata (client, state, self->_state); - } - + metadata = "\0"; + meta_len = 1; } - else { - ret = client_send_bytes (client, buf, len); + else + { + metadata = associated->data + client_mp3->metadata_offset; + meta_len = associated->len - client_mp3->metadata_offset; } + ret = client_send_bytes (client, metadata, meta_len); + + if (ret == meta_len) + { + client_mp3->associated = associated; + client_mp3->metadata_offset = 0; + client_mp3->in_metadata = 0; + client_mp3->since_meta_block = 0; + return ret; + } + if (ret > 0) + client_mp3->metadata_offset += ret; + else + ret = 0; + client_mp3->in_metadata = 1; return ret; } + +/* Handler for writing mp3 data to a client, taking into account whether + * client has requested shoutcast style metadata updates + */ +static int format_mp3_write_buf_to_client (format_plugin_t *self, client_t *client) +{ + int ret, written = 0; + mp3_client_data *client_mp3 = client->format_data; + mp3_state *source_mp3 = self->_state; + refbuf_t *refbuf = client->refbuf; + char *buf; + unsigned int len; + + if (refbuf->next == NULL && client->pos == refbuf->len) + return 0; + + /* move to the next buffer if we have finished with the current one */ + if (refbuf->next && client->pos == refbuf->len) + { + client_set_queue (client, refbuf->next); + refbuf = client->refbuf; + } + + buf = refbuf->data + client->pos; + len = refbuf->len - client->pos; + + do + { + /* send any unwritten metadata to the client */ + if (client_mp3->in_metadata) + { + refbuf_t *associated = refbuf->associated; + ret = send_mp3_metadata (client, associated); + + if (client_mp3->in_metadata) + break; + written += ret; + } + /* see if we need to send the current metadata to the client */ + if (client_mp3->use_metadata) + { + unsigned int remaining = source_mp3->interval - + client_mp3->since_meta_block; + + /* sending the metadata block */ + if (remaining <= len) + { + /* send any mp3 before the metadata block */ + if (remaining) + { + ret = client_send_bytes (client, buf, remaining); + + if (ret > 0) + { + client_mp3->since_meta_block += ret; + client->pos += ret; + } + if (ret < (int)remaining) + break; + written += ret; + } + ret = send_mp3_metadata (client, refbuf->associated); + if (client_mp3->in_metadata) + break; + written += ret; + /* change buf and len */ + buf += remaining; + len -= remaining; + } + } + /* write any mp3, maybe after the metadata block */ + if (len) + { + ret = client_send_bytes (client, buf, len); + + if (ret > 0) + { + client_mp3->since_meta_block += ret; + client->pos += ret; + } + if (ret < (int)len) + break; + written += ret; + } + ret = 0; + } while (0); + + if (ret > 0) + written += ret; + return written; +} + static void format_mp3_free_plugin(format_plugin_t *self) { /* free the plugin instance */ mp3_state *state = self->_state; - thread_mutex_destroy(&(state->lock)); - free(state->metadata); + thread_mutex_destroy (&state->url_lock); + free (state->url_artist); + free (state->url_title); + refbuf_release (state->metadata); free(state); free(self); } -static int format_mp3_get_buffer(format_plugin_t *self, char *data, - unsigned long len, refbuf_t **buffer) + +/* read an mp3 stream which does not have shoutcast style metadata */ +static refbuf_t *mp3_get_no_meta (source_t *source) { + int bytes; refbuf_t *refbuf; - mp3_state *state = self->_state; + mp3_state *source_mp3 = source->format->_state; - /* Set this to NULL in case it doesn't get set to a valid buffer later */ - *buffer = NULL; + if ((refbuf = refbuf_new (2048)) == NULL) + return NULL; + bytes = sock_read_bytes (source->con->sock, refbuf->data, 2048); - if(!data) - return 0; - - if(state->inline_metadata_interval) { - /* Source is sending metadata, handle it... */ - - while(len > 0) { - int to_read = state->inline_metadata_interval - state->offset; - if(to_read > 0) { - refbuf_t *old_refbuf = *buffer; - - if(to_read > len) - to_read = len; - - if(old_refbuf) { - refbuf = refbuf_new(to_read + old_refbuf->len); - memcpy(refbuf->data, old_refbuf->data, old_refbuf->len); - memcpy(refbuf->data+old_refbuf->len, data, to_read); - - refbuf_release(old_refbuf); - } - else { - refbuf = refbuf_new(to_read); - memcpy(refbuf->data, data, to_read); - } - - *buffer = refbuf; - - state->offset += to_read; - data += to_read; - len -= to_read; - } - else if(!state->metadata_length) { - /* Next up is the metadata byte... */ - unsigned char byte = data[0]; - data++; - len--; - - /* According to the "spec"... this byte * 16 */ - state->metadata_length = byte * 16; - - if(state->metadata_length) { - state->metadata_buffer = - calloc(state->metadata_length + 1, 1); - - /* Ensure we have a null-terminator even if the source - * stream is invalid. - */ - state->metadata_buffer[state->metadata_length] = 0; - } - else { - state->offset = 0; - } - - state->metadata_offset = 0; - } - else { - /* Metadata to read! */ - int readable = state->metadata_length - state->metadata_offset; - - if(readable > len) - readable = len; - - memcpy(state->metadata_buffer + state->metadata_offset, - data, readable); - - state->metadata_offset += readable; - - data += readable; - len -= readable; - - if(state->metadata_offset == state->metadata_length) - { - if(state->metadata_length) - { - thread_mutex_lock(&(state->lock)); - free(state->metadata); - /* Now, reformat state->metadata_buffer to strip off - StreamTitle=' and the closing '; (but only if there's - enough data for it to be correctly formatted) */ - if(state->metadata_length >= 15) { - /* This is overly complex because the - metadata_length is the length of the actual raw - data, but the (null-terminated) string is going - to be shorter than this, and we can't trust that - the raw data doesn't have other embedded-nulls */ - int stringlength; - - state->metadata = malloc(state->metadata_length - - 12); - memcpy(state->metadata, - state->metadata_buffer + 13, - state->metadata_length - 13); - /* Make sure we've got a null-terminator of some - sort */ - state->metadata[state->metadata_length - 13] = 0; - - /* Now figure out the _right_ one */ - stringlength = strlen(state->metadata); - if(stringlength > 2) - state->metadata[stringlength - 2] = 0; - free(state->metadata_buffer); - } - else - state->metadata = state->metadata_buffer; - - stats_event(self->mount, "title", state->metadata); - state->metadata_buffer = NULL; - state->metadata_age++; - thread_mutex_unlock(&(state->lock)); - yp_touch (self->mount); - } - - state->offset = 0; - state->metadata_length = 0; - } - } - } - - /* Either we got a buffer above (in which case it can be used), or - * we set *buffer to NULL in the prologue, so the return value is - * correct anyway... - */ - return 0; + if (bytes == 0) + { + INFO1 ("End of stream %s", source->mount); + source->running = 0; + refbuf_release (refbuf); + return NULL; } - else { - /* Simple case - no metadata, just dump data directly to a buffer */ - refbuf = refbuf_new(len); - - memcpy(refbuf->data, data, len); - - *buffer = refbuf; - return 0; + if (source_mp3->update_metadata) + { + mp3_set_title (source); + source_mp3->update_metadata = 0; } -} + if (bytes > 0) + { + refbuf->len = bytes; + refbuf->associated = source_mp3->metadata; + refbuf_addref (source_mp3->metadata); + return refbuf; + } + refbuf_release (refbuf); + + if (!sock_recoverable (sock_error())) + source->running = 0; -static refbuf_queue_t *format_mp3_get_predata(format_plugin_t *self) -{ return NULL; } -static void *format_mp3_create_client_data(format_plugin_t *self, - source_t *source, client_t *client) + +/* read mp3 data with inlined metadata from the source. Filter out the + * metadata so that the mp3 data itself is store on the queue and the + * metadata is is associated with it + */ +static refbuf_t *mp3_get_filter_meta (source_t *source) +{ + refbuf_t *refbuf; + format_plugin_t *plugin = source->format; + mp3_state *source_mp3 = plugin->_state; + unsigned char *src; + unsigned int bytes, mp3_block; + int ret; + + refbuf = refbuf_new (2048); + src = refbuf->data; + + ret = sock_read_bytes (source->con->sock, refbuf->data, 2048); + + if (ret == 0) + { + INFO1 ("End of stream %s", source->mount); + source->running = 0; + refbuf_release (refbuf); + return NULL; + } + if (source_mp3->update_metadata) + { + mp3_set_title (source); + source_mp3->update_metadata = 0; + } + if (ret < 0) + { + refbuf_release (refbuf); + if (sock_recoverable (sock_error())) + return NULL; /* go back to waiting */ + INFO0 ("Error on connection from source"); + source->running = 0; + return NULL; + } + /* fill the buffer with the read data */ + bytes = (unsigned int)ret; + refbuf->len = 0; + while (bytes > 0) + { + unsigned int metadata_remaining; + + mp3_block = source_mp3->inline_metadata_interval - source_mp3->offset; + + /* is there only enough to account for mp3 data */ + if (bytes <= mp3_block) + { + refbuf->len += bytes; + source_mp3->offset += bytes; + break; + } + /* we have enough data to get to the metadata + * block, but only transfer upto it */ + if (mp3_block) + { + src += mp3_block; + bytes -= mp3_block; + refbuf->len += mp3_block; + source_mp3->offset += mp3_block; + continue; + } + + /* process the inline metadata, len == 0 indicates not seen any yet */ + if (source_mp3->build_metadata_len == 0) + { + memset (source_mp3->build_metadata, 0, + sizeof (source_mp3->build_metadata)); + source_mp3->build_metadata_offset = 0; + source_mp3->build_metadata_len = 1 + (*src * 16); + } + + /* do we have all of the metatdata block */ + metadata_remaining = source_mp3->build_metadata_len - + source_mp3->build_metadata_offset; + if (bytes < metadata_remaining) + { + memcpy (source_mp3->build_metadata + + source_mp3->build_metadata_offset, src, bytes); + source_mp3->build_metadata_offset += bytes; + break; + } + /* copy all bytes except the last one, that way we + * know a null byte terminates the message */ + memcpy (source_mp3->build_metadata + source_mp3->build_metadata_offset, + src, metadata_remaining-1); + + /* overwrite metadata in the buffer */ + bytes -= metadata_remaining; + memmove (src, src+metadata_remaining, bytes); + + /* assign metadata if it's not 1 byte, as that indicates a change */ + if (source_mp3->build_metadata_len > 1) + { + refbuf_t *meta = refbuf_new (source_mp3->build_metadata_len); + memcpy (meta->data, source_mp3->build_metadata, + source_mp3->build_metadata_len); + + DEBUG1("shoutcast metadata %.4080s", meta->data+1); + if (strncmp (meta->data+1, "StreamTitle=", 12) == 0) + { + filter_shoutcast_metadata (source, source_mp3->build_metadata, + source_mp3->build_metadata_len); + refbuf_release (source_mp3->metadata); + source_mp3->metadata = meta; + } + else + { + ERROR0 ("Incorrect metadata format, ending stream"); + source->running = 0; + refbuf_release (refbuf); + return NULL; + } + } + source_mp3->offset = 0; + source_mp3->build_metadata_len = 0; + } + /* the data we have just read may of just been metadata */ + if (refbuf->len == 0) + { + refbuf_release (refbuf); + return NULL; + } + refbuf->associated = source_mp3->metadata; + refbuf_addref (source_mp3->metadata); + + return refbuf; +} + + +static int format_mp3_create_client_data(source_t *source, client_t *client) { mp3_client_data *data = calloc(1,sizeof(mp3_client_data)); char *metadata; - data->interval = ICY_METADATA_INTERVAL; - data->offset = 0; - client->free_client_data = free_mp3_client_data; + if (data == NULL) + return -1; + client->format_data = data; + client->free_client_data = free_mp3_client_data; metadata = httpp_getvar(client->parser, "icy-metadata"); if(metadata) data->use_metadata = atoi(metadata)>0?1:0; - return data; + return 0; } @@ -432,3 +608,16 @@ static void format_mp3_send_headers(format_plugin_t *self, format_send_general_headers(self, source, client); } + +static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf) +{ + if (refbuf->len == 0) + return; + if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) < (size_t)refbuf->len) + { + WARN0 ("Write to dump file failed, disabling"); + fclose (source->dumpfile); + source->dumpfile = NULL; + } +} + diff --git a/src/format_mp3.h b/src/format_mp3.h index 0b132a7d..3eafa8db 100644 --- a/src/format_mp3.h +++ b/src/format_mp3.h @@ -19,18 +19,23 @@ #define __FORMAT_MP3_H__ typedef struct { - char *metadata; - int metadata_age; - mutex_t lock; - /* These are for inline metadata */ int inline_metadata_interval; int offset; - int metadata_length; - char *metadata_buffer; - int metadata_offset; + unsigned interval; + char *url_artist; + char *url_title; + int update_metadata; + + refbuf_t *metadata; + mutex_t url_lock; + + unsigned build_metadata_len; + unsigned build_metadata_offset; + char build_metadata[4081]; } mp3_state; -format_plugin_t *format_mp3_get_plugin(http_parser_t *parser); +int format_mp3_get_plugin(struct source_tag *src); +void mp3_set_tag (format_plugin_t *plugin, char *tag, char *value); #endif /* __FORMAT_MP3_H__ */ diff --git a/src/format_vorbis.c b/src/format_vorbis.c index 3d4be8a2..0e469265 100644 --- a/src/format_vorbis.c +++ b/src/format_vorbis.c @@ -49,20 +49,31 @@ typedef struct _vstate_tag ogg_page og; unsigned long serialno; int header; - refbuf_t *headbuf[MAX_HEADER_PAGES]; + refbuf_t *file_headers; + refbuf_t *header_pages; + refbuf_t *header_pages_tail; int packets; } vstate_t; +struct client_vorbis +{ + refbuf_t *headers; + refbuf_t *header_page; + unsigned int pos; + int processing_headers; +}; + + static void format_vorbis_free_plugin(format_plugin_t *self); -static int format_vorbis_get_buffer(format_plugin_t *self, char *data, - unsigned long len, refbuf_t **buffer); -static refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self); -static void *format_vorbis_create_client_data(format_plugin_t *self, - source_t *source, client_t *client); +static refbuf_t *format_vorbis_get_buffer (source_t *source); +static int format_vorbis_create_client_data (source_t *source, client_t *client); static void format_vorbis_send_headers(format_plugin_t *self, source_t *source, client_t *client); +static int write_buf_to_client (format_plugin_t *self, client_t *client); +static void write_ogg_to_file (struct source_tag *source, refbuf_t *refbuf); -format_plugin_t *format_vorbis_get_plugin(void) + +int format_vorbis_get_plugin(source_t *source) { format_plugin_t *plugin; vstate_t *state; @@ -70,10 +81,9 @@ format_plugin_t *format_vorbis_get_plugin(void) plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t)); plugin->type = FORMAT_TYPE_VORBIS; - plugin->has_predata = 1; + plugin->write_buf_to_file = write_ogg_to_file; plugin->get_buffer = format_vorbis_get_buffer; - plugin->get_predata = format_vorbis_get_predata; - plugin->write_buf_to_client = format_generic_write_buf_to_client; + plugin->write_buf_to_client = write_buf_to_client; plugin->create_client_data = format_vorbis_create_client_data; plugin->client_send_headers = format_vorbis_send_headers; plugin->free_plugin = format_vorbis_free_plugin; @@ -83,52 +93,67 @@ format_plugin_t *format_vorbis_get_plugin(void) ogg_sync_init(&state->oy); plugin->_state = (void *)state; + source->format = plugin; - return plugin; + return 0; } void format_vorbis_free_plugin(format_plugin_t *self) { - int i; vstate_t *state = (vstate_t *)self->_state; + refbuf_t *header = state->header_pages; /* free memory associated with this plugin instance */ /* free state memory */ + while (header) + { + refbuf_t *to_release = header; + header = header->next; + refbuf_release (to_release); + } ogg_sync_clear(&state->oy); ogg_stream_clear(&state->os); vorbis_comment_clear(&state->vc); vorbis_info_clear(&state->vi); - for (i = 0; i < MAX_HEADER_PAGES; i++) { - if (state->headbuf[i]) { - refbuf_release(state->headbuf[i]); - state->headbuf[i] = NULL; - } - } - free(state); /* free the plugin instance */ free(self); } -int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long len, refbuf_t **buffer) +static refbuf_t *format_vorbis_get_buffer (source_t *source) { - char *buf; - int i, result; + int result; ogg_packet op; char *tag; - refbuf_t *refbuf, *source_refbuf; + refbuf_t *refbuf, *header; + char *data; + format_plugin_t *self = source->format; + int bytes; vstate_t *state = (vstate_t *)self->_state; - source_t *source; - if (data) { - /* write the data to the buffer */ - buf = ogg_sync_buffer(&state->oy, len); - memcpy(buf, data, len); - ogg_sync_wrote(&state->oy, len); + data = ogg_sync_buffer (&state->oy, 4096); + + bytes = sock_read_bytes (source->con->sock, data, 4096); + if (bytes < 0) + { + if (sock_recoverable (sock_error())) + return NULL; + WARN0 ("source connection has died"); + ogg_sync_wrote (&state->oy, 0); + source->running = 0; + return NULL; } + if (bytes == 0) + { + INFO1 ("End of Stream %s", source->mount); + ogg_sync_wrote (&state->oy, 0); + source->running = 0; + return NULL; + } + ogg_sync_wrote (&state->oy, bytes); refbuf = NULL; if (ogg_sync_pageout(&state->oy, &state->og) == 1) { @@ -137,21 +162,25 @@ int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long le memcpy(&refbuf->data[state->og.header_len], state->og.body, state->og.body_len); if (state->serialno != ogg_page_serialno(&state->og)) { + DEBUG0("new stream"); /* this is a new logical bitstream */ state->header = 0; state->packets = 0; - /* release old headers, stream state, vorbis data */ - for (i = 0; i < MAX_HEADER_PAGES; i++) { - if (state->headbuf[i]) { - refbuf_release(state->headbuf[i]); - state->headbuf[i] = NULL; - } - } /* Clear old stuff. Rarely but occasionally needed. */ + header = state->header_pages; + while (header) + { + refbuf_t *to_release = header; + DEBUG0 ("clearing out header page"); + header = header->next; + refbuf_release (to_release); + } ogg_stream_clear(&state->os); vorbis_comment_clear(&state->vc); vorbis_info_clear(&state->vi); + state->header_pages = NULL; + state->header_pages_tail = NULL; state->serialno = ogg_page_serialno(&state->og); ogg_stream_init(&state->os, state->serialno); @@ -164,52 +193,40 @@ int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long le * extras pages beyond the header. We need to collect the pages * here anyway, but they may have to be discarded later. */ + DEBUG1 ("header %d", state->header); if (ogg_page_granulepos(&state->og) <= 0) { state->header++; } else { /* we're done caching headers */ state->header = -1; + DEBUG0 ("doing stats"); /* put known comments in the stats */ tag = vorbis_comment_query(&state->vc, "TITLE", 0); - if (tag) stats_event(self->mount, "title", tag); - else stats_event(self->mount, "title", "unknown"); + if (tag) stats_event(source->mount, "title", tag); + else stats_event(source->mount, "title", "unknown"); tag = vorbis_comment_query(&state->vc, "ARTIST", 0); - if (tag) stats_event(self->mount, "artist", tag); - else stats_event(self->mount, "artist", "unknown"); + if (tag) stats_event(source->mount, "artist", tag); + else stats_event(source->mount, "artist", "unknown"); /* don't need these now */ ogg_stream_clear(&state->os); vorbis_comment_clear(&state->vc); vorbis_info_clear(&state->vi); - /* Drain the source queue on metadata update otherwise you - could have a mismatch between what is on the source queue - and what is in the state->headbuf */ - avl_tree_rlock(global.source_tree); - source = source_find_mount_raw(self->mount); - avl_tree_unlock(global.source_tree); - - thread_mutex_lock(&source->queue_mutex); - while ((source_refbuf = refbuf_queue_remove(&source->queue))) { - refbuf_release(source_refbuf); - } - thread_mutex_unlock(&source->queue_mutex); - - yp_touch (self->mount); + yp_touch (source->mount); } } /* cache header pages */ if (state->header > 0 && state->packets < 3) { - if(state->header > MAX_HEADER_PAGES) { - refbuf_release(refbuf); - ERROR1("Bad vorbis input: header is more than %d pages long", MAX_HEADER_PAGES); + /* build a list of headers pages for attaching */ + if (state->header_pages_tail) + state->header_pages_tail->next = refbuf; + state->header_pages_tail = refbuf; - return -1; - } - refbuf_addref(refbuf); - state->headbuf[state->header - 1] = refbuf; + if (state->header_pages == NULL) + state->header_pages = refbuf; if (state->packets >= 0 && state->packets < 3) { ogg_stream_pagein(&state->os, &state->og); @@ -229,36 +246,40 @@ int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long le } } } + /* we do not place ogg headers on the main queue */ + return NULL; } + /* increase ref counts on each header page going out */ + header = state->header_pages; + while (header) + { + refbuf_addref (header); + header = header->next; + } + refbuf->associated = state->header_pages; } - *buffer = refbuf; - return 0; + return refbuf; } -refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self) +static void free_ogg_client_data (client_t *client) { - refbuf_queue_t *queue; - int i; - vstate_t *state = (vstate_t *)self->_state; + free (client->format_data); + client->format_data = NULL; +} - queue = NULL; - for (i = 0; i < MAX_HEADER_PAGES; i++) { - if (state->headbuf[i]) { - refbuf_addref(state->headbuf[i]); - refbuf_queue_add(&queue, state->headbuf[i]); - } else { - break; - } +static int format_vorbis_create_client_data (source_t *source, client_t *client) +{ + struct client_vorbis *client_data = calloc (1, sizeof (struct client_vorbis)); + int ret = -1; + + if (client_data) + { + client->format_data = client_data; + client->free_client_data = free_ogg_client_data; + ret = 0; } - - return queue; -} - -static void *format_vorbis_create_client_data(format_plugin_t *self, - source_t *source, client_t *client) -{ - return NULL; + return ret; } static void format_vorbis_send_headers(format_plugin_t *self, @@ -277,4 +298,122 @@ static void format_vorbis_send_headers(format_plugin_t *self, format_send_general_headers(self, source, client); } +static int send_ogg_headers (client_t *client, refbuf_t *headers) +{ + struct client_vorbis *client_data = client->format_data; + refbuf_t *refbuf; + int written = 0; + + if (client_data->processing_headers == 0) + { + client_data->header_page = headers; + client_data->pos = 0; + client_data->processing_headers = 1; + } + refbuf = client_data->header_page; + while (refbuf) + { + char *data = refbuf->data + client_data->pos; + unsigned int len = refbuf->len - client_data->pos; + int ret; + + ret = client_send_bytes (client, data, len); + if (ret > 0) + { + written += ret; + client_data->pos += ret; + } + if (ret < (int)len) + return written; + if (client_data->pos == refbuf->len) + { + refbuf = refbuf->next; + client_data->header_page = refbuf; + client_data->pos = 0; + } + } + /* update client info on headers sent */ + client_data->processing_headers = 0; + client_data->headers = headers; + return written; +} + +static int write_buf_to_client (format_plugin_t *self, client_t *client) +{ + refbuf_t *refbuf = client->refbuf; + char *buf; + unsigned int len; + struct client_vorbis *client_data = client->format_data; + int ret, written = 0; + + if (refbuf->next == NULL && client->pos == refbuf->len) + return 0; + + if (refbuf->next && client->pos == refbuf->len) + { + client_set_queue (client, refbuf->next); + refbuf = client->refbuf; + } + do + { + if (client_data->headers != refbuf->associated) + { + /* different headers seen so send the new ones */ + ret = send_ogg_headers (client, refbuf->associated); + if (client_data->processing_headers) + break; + written += ret; + } + buf = refbuf->data + client->pos; + len = refbuf->len - client->pos; + ret = client_send_bytes (client, buf, len); + + if (ret > 0) + client->pos += ret; + + if (ret < (int)len) + break; + written += ret; + /* we have now written the page(s) */ + ret = 0; + } while (0); + + if (ret > 0) + written += ret; + return written; +} + +static int write_ogg_data (struct source_tag *source, refbuf_t *refbuf) +{ + int ret = 1; + + if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) != refbuf->len) + { + WARN0 ("Write to dump file failed, disabling"); + fclose (source->dumpfile); + source->dumpfile = NULL; + ret = 0; + } + return ret; +} + + +static void write_ogg_to_file (struct source_tag *source, refbuf_t *refbuf) +{ + vstate_t *state = (vstate_t *)source->format->_state; + + + if (state->file_headers != refbuf->associated) + { + refbuf_t *header = refbuf->associated; + while (header) + { + if (write_ogg_data (source, header) == 0) + return; + header = header->next; + } + state->file_headers = refbuf->associated; + } + write_ogg_data (source, refbuf); +} diff --git a/src/format_vorbis.h b/src/format_vorbis.h index 315fed79..22e87665 100644 --- a/src/format_vorbis.h +++ b/src/format_vorbis.h @@ -18,6 +18,6 @@ #ifndef __FORMAT_VORBIS_H__ #define __FORMAT_VORBIS_H__ -format_plugin_t *format_vorbis_get_plugin(void); +int format_vorbis_get_plugin(source_t *source); #endif /* __FORMAT_VORBIS_H__ */ diff --git a/src/refbuf.c b/src/refbuf.c index 2ff168fc..dbd6dd51 100644 --- a/src/refbuf.c +++ b/src/refbuf.c @@ -38,9 +38,22 @@ refbuf_t *refbuf_new(unsigned long size) refbuf_t *refbuf; refbuf = (refbuf_t *)malloc(sizeof(refbuf_t)); - refbuf->data = (void *)malloc(size); + if (refbuf == NULL) + return NULL; + refbuf->data = NULL; + if (size) + { + refbuf->data = malloc (size); + if (refbuf->data == NULL) + { + free (refbuf); + return NULL; + } + } refbuf->len = size; refbuf->_count = 1; + refbuf->next = NULL; + refbuf->associated = NULL; return refbuf; } @@ -54,100 +67,15 @@ void refbuf_release(refbuf_t *self) { self->_count--; if (self->_count == 0) { + while (self->associated) + { + refbuf_t *ref = self->associated; + self->associated = ref->next; + refbuf_release (ref); + } free(self->data); free(self); return; } } -void refbuf_queue_add(refbuf_queue_t **queue, refbuf_t *refbuf) -{ - refbuf_queue_t *node; - refbuf_queue_t *item = (refbuf_queue_t *)malloc(sizeof(refbuf_queue_t)); - - item->refbuf = refbuf; - item->next = NULL; - - if (*queue == NULL) { - *queue = item; - (*queue)->total_length = item->refbuf->len; - } else { - node = *queue; - while (node->next) node = node->next; - node->next = item; - (*queue)->total_length += item->refbuf->len; - } -} - -refbuf_t *refbuf_queue_remove(refbuf_queue_t **queue) -{ - refbuf_queue_t *item; - refbuf_t *refbuf; - - if (*queue == NULL) return NULL; - - item = *queue; - *queue = item->next; - item->next = NULL; - - refbuf = item->refbuf; - item->refbuf = NULL; - - if(*queue) - (*queue)->total_length = item->total_length - refbuf->len; - - free(item); - - - return refbuf; -} -refbuf_t * refbuf_queue_get(refbuf_queue_t **queue, int item) -{ - refbuf_queue_t *node = *queue; - int size = 0; - while (node) { - if (size == item) { - return node->refbuf; - } - node = node->next; - size++; - } - return NULL; -} - - -void refbuf_queue_insert(refbuf_queue_t **queue, refbuf_t *refbuf) -{ - refbuf_queue_t *item = (refbuf_queue_t *)malloc(sizeof(refbuf_queue_t)); - - item->refbuf = refbuf; - item->next = *queue; - if(item->next) - item->total_length = item->next->total_length + item->refbuf->len; - else - item->total_length = item->refbuf->len; - *queue = item; -} - -int refbuf_queue_size(refbuf_queue_t **queue) -{ - refbuf_queue_t *node = *queue; - int size = 0; - - while (node) { - node = node->next; - size++; - } - - return size; -} - -int refbuf_queue_length(refbuf_queue_t **queue) -{ - if(*queue) - return (*queue)->total_length; - else - return 0; -} - - diff --git a/src/refbuf.h b/src/refbuf.h index fe54b08e..df182842 100644 --- a/src/refbuf.h +++ b/src/refbuf.h @@ -22,18 +22,12 @@ typedef struct _refbuf_tag { char *data; long len; + struct _refbuf_tag *associated; + struct _refbuf_tag *next; unsigned long _count; } refbuf_t; -typedef struct _refbuf_queue_tag -{ - refbuf_t *refbuf; - long total_length; - - struct _refbuf_queue_tag *next; -} refbuf_queue_t; - void refbuf_initialize(void); void refbuf_shutdown(void); @@ -41,22 +35,5 @@ refbuf_t *refbuf_new(unsigned long size); void refbuf_addref(refbuf_t *self); void refbuf_release(refbuf_t *self); -void refbuf_queue_add(refbuf_queue_t **queue, refbuf_t *refbuf); -refbuf_t *refbuf_queue_remove(refbuf_queue_t **queue); -void refbuf_queue_insert(refbuf_queue_t **queue, refbuf_t *refbuf); - -/* Size in buffers */ -int refbuf_queue_size(refbuf_queue_t **queue); -/* Size in bytes */ -int refbuf_queue_length(refbuf_queue_t **queue); -refbuf_t * refbuf_queue_get(refbuf_queue_t **queue, int item); - #endif /* __REFBUF_H__ */ - - - - - - - diff --git a/src/source.c b/src/source.c index 82d9c139..254de17b 100644 --- a/src/source.c +++ b/src/source.c @@ -60,6 +60,7 @@ mutex_t move_clients_mutex; static int _compare_clients(void *compare_arg, void *a, void *b); static int _free_client(void *key); static void _parse_audio_info (source_t *source, const char *s); +static void source_shutdown (source_t *source); /* Allocate a new source with the stated mountpoint, if one already * exists with that mountpoint in the global source tree then return @@ -189,7 +190,6 @@ int source_compare_sources(void *arg, void *a, void *b) void source_clear_source (source_t *source) { - refbuf_t *refbuf; DEBUG1 ("clearing source \"%s\"", source->mount); client_destroy(source->client); source->client = NULL; @@ -228,6 +228,10 @@ void source_clear_source (source_t *source) if (source->yp_public) yp_remove (source->mount); + source->burst_point = NULL; + source->burst_size = 0; + source->burst_offset = 0; + source->queue_size = 0; source->queue_size_limit = 0; source->listeners = 0; source->no_mount = 0; @@ -239,12 +243,18 @@ void source_clear_source (source_t *source) free(source->dumpfilename); source->dumpfilename = NULL; + /* Lets clear out the source queue too */ - while ((refbuf = refbuf_queue_remove(&source->queue))) - refbuf_release(refbuf); - source->queue = NULL; - source->burst_on_connect = 1; - thread_mutex_destroy(&source->queue_mutex); + while (source->stream_data) + { + refbuf_t *p = source->stream_data; + source->stream_data = p->next; + /* can be referenced by burst handler as well */ + while (p->_count > 1) + refbuf_release (p); + refbuf_release (p); + } + source->stream_data_tail = NULL; } @@ -332,7 +342,8 @@ void source_move_clients (source_t *source, source_t *dest) client = (client_t *)(node->key); avl_delete (source->pending_tree, client, NULL); - /* TODO: reset client local format data? */ + /* switch client to different queue */ + client_set_queue (client, dest->stream_data_tail); avl_insert (dest->pending_tree, (void *)client); } @@ -346,7 +357,8 @@ void source_move_clients (source_t *source, source_t *dest) client = (client_t *)(node->key); avl_delete (source->client_tree, client, NULL); - /* TODO: reset client local format data? */ + /* switch client to different queue */ + client_set_queue (client, dest->stream_data_tail); avl_insert (dest->pending_tree, (void *)client); } source->listeners = 0; @@ -360,6 +372,105 @@ void source_move_clients (source_t *source, source_t *dest) thread_mutex_unlock (&move_clients_mutex); } +/* get some data from the source. The stream data is placed in a refbuf + * and sent back, however NULL is also valid as in the case of a short + * timeout and there's no data pending. + */ +static refbuf_t *get_next_buffer (source_t *source) +{ + refbuf_t *refbuf = NULL; + int delay = 250; + + if (source->short_delay) + delay = 0; + while (global.running == ICE_RUNNING && source->running) + { + int fds; + time_t current = time (NULL); + + fds = util_timed_wait_for_fd (source->con->sock, delay); + + if (fds < 0) + { + if (! sock_recoverable (sock_error())) + { + WARN0 ("Error while waiting on socket, Disconnecting source"); + source->running = 0; + } + break; + } + if (fds == 0) + { + if (source->last_read + (time_t)source->timeout < current) + { + DEBUG3 ("last %ld, timeout %ld, now %ld", source->last_read, source->timeout, current); + WARN0 ("Disconnecting source due to socket timeout"); + source->running = 0; + } + break; + } + source->last_read = current; + refbuf = source->format->get_buffer (source); + if (refbuf) + break; + } + + return refbuf; +} + + +/* general send routine per listener. The deletion_expected tells us whether + * the last in the queue is about to disappear, so if this client is still + * referring to it after writing then drop the client as it's fallen too far + * behind + */ +static void send_to_listener (source_t *source, client_t *client, int deletion_expected) +{ + int bytes; + int loop = 10; /* max number of iterations in one go */ + int total_written = 0; + + /* new users need somewhere to start from */ + if (client->refbuf == NULL) + { + /* make clients start at the per source burst point on the queue */ + client_set_queue (client, source->burst_point); + if (client->refbuf == NULL) + return; + } + + while (1) + { + /* jump out if client connection has died */ + if (client->con->error) + break; + + /* lets not send too much to one client in one go, but don't + sleep for too long if more data can be sent */ + if (total_written > 20000 || loop == 0) + { + source->short_delay = 1; + break; + } + + loop--; + + bytes = source->format->write_buf_to_client (source->format, client); + if (bytes <= 0) + break; /* can't write any more */ + + total_written += bytes; + } + + /* the refbuf referenced at head (last in queue) may be marked for deletion + * if so, check to see if this client is still referring to it */ + if (deletion_expected && client->refbuf == source->stream_data) + { + DEBUG0("Client has fallen too far behind, removing"); + client->con->error = 1; + } +} + static void source_init (source_t *source) { @@ -375,7 +486,6 @@ static void source_init (source_t *source) memset (listenurl, '\000', listen_url_size); snprintf (listenurl, listen_url_size, "http://%s:%d%s", config->hostname, config->port, source->mount); - source->burst_on_connect = config->burst_on_connect; config_release_config(); /* maybe better in connection.c */ @@ -423,9 +533,8 @@ static void source_init (source_t *source) sock_set_blocking (source->con->sock, SOCK_NONBLOCK); - thread_mutex_create(&source->queue_mutex); - DEBUG0("Source creation complete"); + source->last_read = time (NULL); source->running = 1; /* @@ -454,212 +563,70 @@ static void source_init (source_t *source) void source_main (source_t *source) { - char buffer[4096]; - long bytes, sbytes; - int ret, i; + unsigned int listeners; + refbuf_t *refbuf; client_t *client; avl_node *client_node; - refbuf_t *refbuf, *abuf, *stale_refbuf; - int data_done; - source_init (source); while (global.running == ICE_RUNNING && source->running) { - ret = source->format->get_buffer(source->format, NULL, 0, &refbuf); - if(ret < 0) { - WARN0("Bad data from source"); - break; - } - if (source->burst_on_connect) { - thread_mutex_lock(&source->queue_mutex); - /* Add to the source buffer */ - if (refbuf) { - refbuf_addref(refbuf); - refbuf_queue_add(&(source->queue), refbuf); - /* We derive the size of the source buffer queue based off the - setting for queue_size_limit (client buffer queue size). - This is because the source buffer queue size should be a - percentage of the client buffer size (definately cannot - be larger). Why 50% ? Because > 75% does not give the - client enough leeway for lagging on initial connection - and < 25% does not provide a good enough burst on connect. */ - if (refbuf_queue_length(&(source->queue)) > - source->queue_size_limit/2) { - stale_refbuf = refbuf_queue_remove(&(source->queue)); - refbuf_release(stale_refbuf); - } - } - thread_mutex_unlock(&source->queue_mutex); - } - bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */ - while (refbuf == NULL) { - bytes = 0; - while (bytes <= 0) { - ret = util_timed_wait_for_fd(source->con->sock, source->timeout*1000); + int remove_from_q; - if (ret < 0 && sock_recoverable (sock_error())) - continue; - if (ret <= 0) { /* timeout expired */ - WARN1("Disconnecting source: socket timeout (%d s) expired", - source->timeout); - bytes = 0; - break; - } + refbuf = get_next_buffer (source); - bytes = sock_read_bytes(source->con->sock, buffer, 4096); - if (bytes == 0 || - (bytes < 0 && !sock_recoverable(sock_error()))) - { - DEBUG1("Disconnecting source due to socket read error: %s", - strerror(sock_error())); - break; - } - } - if (bytes <= 0) break; - source->client->con->sent_bytes += bytes; - ret = source->format->get_buffer(source->format, buffer, bytes, - &refbuf); - if(ret < 0) { - WARN0("Bad data from source"); - goto done; - } - if (source->burst_on_connect) { - /* Add to the source buffer */ - thread_mutex_lock(&source->queue_mutex); - if (refbuf) { - refbuf_addref(refbuf); - refbuf_queue_add(&(source->queue), refbuf); - if (refbuf_queue_length(&(source->queue)) > - source->queue_size_limit/2) { - stale_refbuf = refbuf_queue_remove(&(source->queue)); - refbuf_release(stale_refbuf); - } - } - thread_mutex_unlock(&source->queue_mutex); - } - } + remove_from_q = 0; + source->short_delay = 0; - if (bytes <= 0) { - INFO0("Removing source following disconnection"); - break; - } - - /* we have a refbuf buffer, which a data block to be sent to - ** all clients. if a client is not able to send the buffer - ** immediately, it should store it on its queue for the next - ** go around. - ** - ** instead of sending the current block, a client should send - ** all data in the queue, plus the current block, until either - ** it runs out of data, or it hits a recoverable error like - ** EAGAIN. this will allow a client that got slightly lagged - ** to catch back up if it can - */ - - /* First, stream dumping, if enabled */ - if(source->dumpfile) { - if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) != - refbuf->len) + if (refbuf) + { + /* append buffer to the in-flight data queue, */ + if (source->stream_data == NULL) { - WARN1("Write to dump file failed, disabling: %s", - strerror(errno)); - fclose(source->dumpfile); - source->dumpfile = NULL; + source->stream_data = refbuf; + source->burst_point = refbuf; } - } + if (source->stream_data_tail) + source->stream_data_tail->next = refbuf; + source->stream_data_tail = refbuf; + source->queue_size += refbuf->len; + /* new buffer is referenced for burst */ + refbuf_addref (refbuf); - /* acquire read lock on client_tree */ - avl_tree_rlock(source->client_tree); - - client_node = avl_get_first(source->client_tree); - while (client_node) { - /* acquire read lock on node */ - avl_node_wlock(client_node); - - client = (client_t *)client_node->key; - - data_done = 0; - - /* do we have any old buffers? */ - abuf = refbuf_queue_remove(&client->queue); - while (abuf) { - bytes = abuf->len - client->pos; - - sbytes = source->format->write_buf_to_client(source->format, - client, &abuf->data[client->pos], bytes); - if (sbytes < bytes) { - if (client->con->error) { - refbuf_release (abuf); - } - else { - /* We didn't send the entire buffer. Leave it for - * the moment, handle it in the next iteration. - */ - client->pos += sbytes<0?0:sbytes; - refbuf_queue_insert (&client->queue, abuf); - } - data_done = 1; - break; - } - /* we're done with that refbuf, release it and reset the pos */ - refbuf_release(abuf); - client->pos = 0; - - abuf = refbuf_queue_remove(&client->queue); - } - - /* now send or queue the new data */ - if (data_done) { - refbuf_addref(refbuf); - refbuf_queue_add(&client->queue, refbuf); - } else { - sbytes = source->format->write_buf_to_client(source->format, - client, refbuf->data, refbuf->len); - if (client->con->error == 0 && sbytes < refbuf->len) { - /* Didn't send the entire buffer, queue it */ - client->pos = sbytes<0?0:sbytes; - refbuf_addref(refbuf); - refbuf_queue_insert(&client->queue, refbuf); + /* new data on queue, so check the burst point */ + source->burst_offset += refbuf->len; + if (source->burst_offset > source->burst_size) + { + if (source->burst_point->next) + { + refbuf_release (source->burst_point); + source->burst_point = source->burst_point->next; + source->burst_offset -= source->burst_point->len; } } - /* if the client is too slow, its queue will slowly build up. - ** we need to make sure the client is keeping up with the - ** data, so we'll kick any client who's queue gets to large. - */ - if (refbuf_queue_length(&client->queue) > source->queue_size_limit) { - DEBUG0("Client has fallen too far behind, removing"); - client->con->error = 1; - } - - /* release read lock on node */ - avl_node_unlock(client_node); - - /* get the next node */ - client_node = avl_get_next(client_node); - } - /* release read lock on client_tree */ - avl_tree_unlock(source->client_tree); - - /* Only release the refbuf if we didn't add it to the source queue */ - if (!source->burst_on_connect) { - refbuf_release(refbuf); + /* save stream to file */ + if (source->dumpfile && source->format->write_buf_to_file) + source->format->write_buf_to_file (source, refbuf); } + /* lets see if we have too much data in the queue, but don't remove it until later */ + if (source->queue_size > source->queue_size_limit) + remove_from_q = 1; /* acquire write lock on client_tree */ avl_tree_wlock(source->client_tree); - /** delete bad clients **/ + listeners = source->listeners; client_node = avl_get_first(source->client_tree); while (client_node) { client = (client_t *)client_node->key; + + send_to_listener (source, client, remove_from_q); + if (client->con->error) { client_node = avl_get_next(client_node); avl_delete(source->client_tree, (void *)client, _free_client); source->listeners--; - stats_event_args(source->mount, "listeners", "%d", - source->listeners); DEBUG0("Client removed"); continue; } @@ -696,33 +663,6 @@ void source_main (source_t *source) DEBUG0("Client added"); stats_event_inc(NULL, "clients"); stats_event_inc(source->mount, "connections"); - stats_event_args(source->mount, "listeners", "%d", - source->listeners); - - /* we have to send cached headers for some data formats - ** this is where we queue up the buffers to send - */ - client = (client_t *)client_node->key; - if (source->format->has_predata) { - client->queue = source->format->get_predata(source->format); - } - if (source->burst_on_connect) { - /* here is where we fill up the new client with refbufs from - the source buffer. this will allow an initial burst of - audio data to be sent to the client, and allow for a faster - startup time (from listener perspective) for the stream */ - if (!client->burst_sent) { - thread_mutex_lock(&source->queue_mutex); - for (i=0;iqueue));i++) { - refbuf_queue_add(&(client->queue), - refbuf_queue_get(&(source->queue), i)); - } - thread_mutex_unlock(&source->queue_mutex); - client->burst_sent = 1; - DEBUG1("Added %d buffers to initial client queue", - refbuf_queue_length(&(source->queue))); - } - } client_node = avl_get_next(client_node); } @@ -737,12 +677,47 @@ void source_main (source_t *source) /* release write lock on pending_tree */ avl_tree_unlock(source->pending_tree); + /* update the stats if need be */ + if (source->listeners != listeners) + { + INFO2("listener count on %s now %d", source->mount, source->listeners); + stats_event_args (source->mount, "listeners", "%d", source->listeners); + } + + /* lets reduce the queue, any lagging clients should of been + * terminated by now + */ + if (source->stream_data) + { + /* normal unreferenced queue data will have a refcount 1, but + * burst queue data will be at least 2, active clients will also + * increase refcount */ + while (source->stream_data->_count == 1) + { + refbuf_t *to_go = source->stream_data; + + if (to_go->next == NULL || source->burst_point == to_go) + { + /* this should not happen */ + ERROR0 ("queue state is unexpected"); + source->running = 0; + break; + } + source->stream_data = to_go->next; + source->queue_size -= to_go->len; + refbuf_release (to_go); + } + } + /* release write lock on client_tree */ avl_tree_unlock(source->client_tree); } + source_shutdown (source); +} -done: +static void source_shutdown (source_t *source) +{ source->running = 0; INFO1("Source \"%s\" exiting", source->mount); @@ -776,8 +751,6 @@ done: /* release our hold on the lock so the main thread can continue cleaning up */ thread_rwlock_unlock(source->shutdown_rwlock); - - return; } static int _compare_clients(void *compare_arg, void *a, void *b) @@ -816,7 +789,7 @@ static int _free_client(void *key) static void _parse_audio_info (source_t *source, const char *s) { const char *start = s; - unsigned len; + unsigned int len; while (start != NULL && *start != '\0') { @@ -878,6 +851,9 @@ void source_apply_mount (source_t *source, mount_proxy *mountinfo) source->timeout = mountinfo->source_timeout; DEBUG1 ("source timeout to %u", source->timeout); } + if (mountinfo->burst_size > -1) + source->burst_size = mountinfo->burst_size; + DEBUG1 ("amount to burst on client connect set to %u", source->burst_size); } diff --git a/src/source.h b/src/source.h index 314cf22d..ec93ec84 100644 --- a/src/source.h +++ b/src/source.h @@ -55,11 +55,22 @@ typedef struct source_tag struct auth_tag *authenticator; int fallback_override; int no_mount; - unsigned queue_size_limit; + + /* per source burst handling for connecting clients */ + unsigned int burst_size; /* trigger level for burst on connect */ + unsigned int burst_offset; + refbuf_t *burst_point; + + unsigned int queue_size; + unsigned int queue_size_limit; + unsigned timeout; /* source timeout in seconds */ - refbuf_queue_t *queue; - mutex_t queue_mutex; - int burst_on_connect; + time_t last_read; + int short_delay; + + refbuf_t *stream_data; + refbuf_t *stream_data_tail; + } source_t; source_t *source_reserve (const char *mount);