From 7fadd89273c2947e4fdf5cfab84395e136673c52 Mon Sep 17 00:00:00 2001 From: Karl Heyes Date: Sun, 8 May 2005 13:51:05 +0000 Subject: [PATCH] use a client function to read an incoming stream, simplifies handling within the format specific files. Also add total read/sent stats per mountpoint. Updates the stats every 5 secs currently svn path=/icecast/trunk/icecast/; revision=9241 --- src/client.c | 19 +++++++++++++++++++ src/client.h | 1 + src/compat.h | 2 ++ src/format.h | 2 ++ src/format_mp3.c | 29 ++++++++--------------------- src/format_ogg.c | 15 +++------------ src/source.c | 16 ++++++++++++++++ src/source.h | 1 + 8 files changed, 52 insertions(+), 33 deletions(-) diff --git a/src/client.c b/src/client.c index 6658c744..543cec98 100644 --- a/src/client.c +++ b/src/client.c @@ -96,6 +96,25 @@ void client_destroy(client_t *client) free(client); } + +/* helper function for reading data from a client */ +int client_read_bytes (client_t *client, void *buf, unsigned len) +{ + int bytes = sock_read_bytes (client->con->sock, buf, len); + if (bytes > 0) + return bytes; + + if (bytes < 0) + { + if (sock_recoverable (sock_error())) + return -1; + WARN0 ("source connection has died"); + } + client->con->error = 1; + return -1; +} + + void client_send_400(client_t *client, char *message) { int bytes; bytes = sock_write(client->con->sock, "HTTP/1.0 400 Bad Request\r\n" diff --git a/src/client.h b/src/client.h index 4d52eee2..d864806e 100644 --- a/src/client.h +++ b/src/client.h @@ -56,6 +56,7 @@ void client_send_401(client_t *client); void client_send_403(client_t *client); void client_send_400(client_t *client, char *message); int client_send_bytes (client_t *client, const void *buf, unsigned len); +int client_read_bytes (client_t *client, void *buf, unsigned len); void client_set_queue (client_t *client, refbuf_t *refbuf); #endif /* __CLIENT_H__ */ diff --git a/src/compat.h b/src/compat.h index 6ca6e107..6cfb6450 100644 --- a/src/compat.h +++ b/src/compat.h @@ -35,8 +35,10 @@ #ifdef _WIN32 #define FORMAT_INT64 "%I64d" +#define FORMAT_UINT64 "%I64u" #else #define FORMAT_INT64 "%lld" +#define FORMAT_UINT64 "%llu" #endif #endif /* __COMPAT_H__ */ diff --git a/src/format.h b/src/format.h index f34b91a4..c5eb70fb 100644 --- a/src/format.h +++ b/src/format.h @@ -40,6 +40,8 @@ typedef struct _format_plugin_tag char *mount; char *contenttype; + uint64_t read_bytes; + uint64_t sent_bytes; refbuf_t *(*get_buffer)(struct source_tag *); int (*write_buf_to_client)(struct _format_plugin_tag *format, client_t *client); diff --git a/src/format_mp3.c b/src/format_mp3.c index 96a5bdf1..2a5f33b7 100644 --- a/src/format_mp3.c +++ b/src/format_mp3.c @@ -428,18 +428,18 @@ static refbuf_t *mp3_get_no_meta (source_t *source) int bytes; refbuf_t *refbuf; mp3_state *source_mp3 = source->format->_state; + format_plugin_t *format = source->format; if ((refbuf = refbuf_new (2048)) == NULL) return NULL; - bytes = sock_read_bytes (source->con->sock, refbuf->data, 2048); - if (bytes == 0) + bytes = client_read_bytes (source->client, refbuf->data, 2048); + if (bytes < 0) { - INFO1 ("End of stream %s", source->mount); - source->running = 0; refbuf_release (refbuf); return NULL; } + format->read_bytes += bytes; if (source_mp3->update_metadata) { mp3_set_title (source); @@ -455,9 +455,6 @@ static refbuf_t *mp3_get_no_meta (source_t *source) } refbuf_release (refbuf); - if (!sock_recoverable (sock_error())) - source->running = 0; - return NULL; } @@ -471,6 +468,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source) refbuf_t *refbuf; format_plugin_t *plugin = source->format; mp3_state *source_mp3 = plugin->_state; + format_plugin_t *format = source->format; unsigned char *src; unsigned int bytes, mp3_block; int ret; @@ -478,29 +476,18 @@ static refbuf_t *mp3_get_filter_meta (source_t *source) refbuf = refbuf_new (2048); src = refbuf->data; - ret = sock_read_bytes (source->con->sock, refbuf->data, 2048); - - if (ret == 0) + ret = client_read_bytes (source->client, refbuf->data, 2048); + if (ret < 0) { - INFO1 ("End of stream %s", source->mount); - source->running = 0; refbuf_release (refbuf); return NULL; } + format->read_bytes += ret; if (source_mp3->update_metadata) { mp3_set_title (source); source_mp3->update_metadata = 0; } - if (ret < 0) - { - refbuf_release (refbuf); - if (sock_recoverable (sock_error())) - return NULL; /* go back to waiting */ - INFO0 ("Error on connection from source"); - source->running = 0; - return NULL; - } /* fill the buffer with the read data */ bytes = (unsigned int)ret; refbuf->len = 0; diff --git a/src/format_ogg.c b/src/format_ogg.c index 3d69c93a..2ae5949c 100644 --- a/src/format_ogg.c +++ b/src/format_ogg.c @@ -375,6 +375,7 @@ static refbuf_t *process_ogg_page (ogg_state_t *ogg_info, ogg_page *page) static refbuf_t *ogg_get_buffer (source_t *source) { ogg_state_t *ogg_info = source->format->_state; + format_plugin_t *format = source->format; char *data = NULL; int bytes; @@ -421,23 +422,13 @@ static refbuf_t *ogg_get_buffer (source_t *source) /* we need more data to continue getting pages */ data = ogg_sync_buffer (&ogg_info->oy, 4096); - bytes = sock_read_bytes (source->con->sock, data, 4096); + bytes = client_read_bytes (source->client, data, 4096); if (bytes < 0) { - if (sock_recoverable (sock_error())) - return NULL; - WARN0 ("source connection has died"); ogg_sync_wrote (&ogg_info->oy, 0); - source->running = 0; - return NULL; - } - if (bytes == 0) - { - INFO1 ("End of Stream %s", source->mount); - ogg_sync_wrote (&ogg_info->oy, 0); - source->running = 0; return NULL; } + format->read_bytes += bytes; ogg_sync_wrote (&ogg_info->oy, bytes); } } diff --git a/src/source.c b/src/source.c index 663a24a0..2afd9534 100644 --- a/src/source.c +++ b/src/source.c @@ -243,6 +243,7 @@ void source_clear_source (source_t *source) source->yp_public = 0; source->yp_prevent = 0; source->hidden = 0; + source->client_stats_update = 0; util_dict_free (source->audio_info); source->audio_info = NULL; @@ -411,6 +412,14 @@ static refbuf_t *get_next_buffer (source_t *source) fds = util_timed_wait_for_fd (source->con->sock, delay); + if (current >= source->client_stats_update) + { + stats_event_args (source->mount, "total_bytes_read", + FORMAT_UINT64, source->format->read_bytes); + stats_event_args (source->mount, "total_bytes_sent", + FORMAT_UINT64, source->format->sent_bytes); + source->client_stats_update = current + 5; + } if (fds < 0) { if (! sock_recoverable (sock_error())) @@ -432,6 +441,12 @@ static refbuf_t *get_next_buffer (source_t *source) } source->last_read = current; refbuf = source->format->get_buffer (source); + if (source->client->con->error) + { + INFO1 ("End of Stream %s", source->mount); + source->running = 0; + continue; + } if (refbuf) break; } @@ -481,6 +496,7 @@ static void send_to_listener (source_t *source, client_t *client, int deletion_e total_written += bytes; } + source->format->sent_bytes += total_written; /* the refbuf referenced at head (last in queue) may be marked for deletion * if so, check to see if this client is still referring to it */ diff --git a/src/source.h b/src/source.h index 269cc5c4..2663eb4f 100644 --- a/src/source.h +++ b/src/source.h @@ -27,6 +27,7 @@ typedef struct source_tag client_t *client; connection_t *con; http_parser_t *parser; + time_t client_stats_update; char *mount;