#include #include #include #include #include #ifndef _WIN32 #include #include #include #else #include #include #endif #include "thread.h" #include "avl.h" #include "httpp.h" #include "sock.h" #include "connection.h" #include "global.h" #include "refbuf.h" #include "client.h" #include "stats.h" #include "log.h" #include "logging.h" #include "config.h" #include "util.h" #include "source.h" #include "format.h" #undef CATMODULE #define CATMODULE "source" /* avl tree helper */ static int _compare_clients(void *compare_arg, void *a, void *b); static int _remove_client(void *key); static int _free_client(void *key); source_t *source_create(client_t *client, connection_t *con, http_parser_t *parser, const char *mount, format_type_t type) { source_t *src; src = (source_t *)malloc(sizeof(source_t)); src->client = client; src->mount = (char *)strdup(mount); src->fallback_mount = NULL; src->format = format_get_plugin(type, src->mount); src->con = con; src->parser = parser; src->client_tree = avl_tree_new(_compare_clients, NULL); src->pending_tree = avl_tree_new(_compare_clients, NULL); return src; } /* you must already have a read lock on the global source tree ** to call this function */ source_t *source_find_mount(const char *mount) { source_t *source; avl_node *node; int cmp; if (!mount) { return NULL; } /* get the root node */ node = global.source_tree->root->right; while (node) { source = (source_t *)node->key; cmp = strcmp(mount, source->mount); if (cmp < 0) node = node->left; else if (cmp > 0) node = node->right; else return source; } /* didn't find it */ return NULL; } int source_compare_sources(void *arg, void *a, void *b) { source_t *srca = (source_t *)a; source_t *srcb = (source_t *)b; return strcmp(srca->mount, srcb->mount); } int source_free_source(void *key) { source_t *source = (source_t *)key; free(source->mount); free(source->fallback_mount); client_destroy(source->client); avl_tree_free(source->pending_tree, _free_client); avl_tree_free(source->client_tree, _free_client); source->format->free_plugin(source->format); free(source); return 1; } void *source_main(void *arg) { source_t *source = (source_t *)arg; source_t *fallback_source; char buffer[4096]; long bytes, sbytes; int ret, timeout; client_t *client; avl_node *client_node; char *s; refbuf_t *refbuf, *abuf; int data_done; int listeners = 0; timeout = config_get_config()->source_timeout; /* grab a read lock, to make sure we get a chance to cleanup */ thread_rwlock_rlock(source->shutdown_rwlock); /* get a write lock on the global source tree */ avl_tree_wlock(global.source_tree); /* insert source onto source tree */ avl_insert(global.source_tree, (void *)source); /* release write lock on global source tree */ avl_tree_unlock(global.source_tree); /* start off the statistics */ stats_event(source->mount, "listeners", "0"); if ((s = httpp_getvar(source->parser, "ice-name"))) stats_event(source->mount, "name", s); if ((s = httpp_getvar(source->parser, "ice-url"))) stats_event(source->mount, "url", s); if ((s = httpp_getvar(source->parser, "ice-genre"))) stats_event(source->mount, "genre", s); if ((s = httpp_getvar(source->parser, "ice-bitrate"))) stats_event(source->mount, "bitrate", s); if ((s = httpp_getvar(source->parser, "ice-description"))) stats_event(source->mount, "description", s); stats_event(source->mount, "type", source->format->format_description); while (global.running == ICE_RUNNING) { ret = source->format->get_buffer(source->format, NULL, 0, &refbuf); if(ret < 0) { WARN0("Bad data from source"); break; } bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */ while (refbuf == NULL) { bytes = 0; while (bytes <= 0) { ret = util_timed_wait_for_fd(source->con->sock, timeout*1000); if (ret <= 0) { /* timeout expired */ WARN1("Disconnecting source: socket timeout (%d s) expired", timeout); bytes = 0; break; } bytes = sock_read_bytes(source->con->sock, buffer, 4096); if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) { DEBUG1("Disconnecting source due to socket read error: %s", strerror(sock_error())); break; } } if (bytes <= 0) break; source->client->con->sent_bytes += bytes; ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf); if(ret < 0) { WARN0("Bad data from source"); goto done; } } if (bytes <= 0) { INFO0("Removing source following disconnection"); break; } /* we have a refbuf buffer, which a data block to be sent to ** all clients. if a client is not able to send the buffer ** immediately, it should store it on its queue for the next ** go around. ** ** instead of sending the current block, a client should send ** all data in the queue, plus the current block, until either ** it runs out of data, or it hits a recoverable error like ** EAGAIN. this will allow a client that got slightly lagged ** to catch back up if it can */ /* acquire read lock on client_tree */ avl_tree_rlock(source->client_tree); client_node = avl_get_first(source->client_tree); while (client_node) { /* acquire read lock on node */ avl_node_wlock(client_node); client = (client_t *)client_node->key; data_done = 0; /* do we have any old buffers? */ abuf = refbuf_queue_remove(&client->queue); while (abuf) { if (client->pos > 0) bytes = abuf->len - client->pos; else bytes = abuf->len; sbytes = source->format->write_buf_to_client(source->format, client, &abuf->data[client->pos], bytes); if (sbytes >= 0) { if(sbytes != bytes) { /* We didn't send the entire buffer. Leave it for * the moment, handle it in the next iteration. */ client->pos += sbytes; refbuf_queue_insert(&client->queue, abuf); data_done = 1; break; } } else { DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected"); client->con->error = 1; data_done = 1; break; } /* we're done with that refbuf, release it and reset the pos */ refbuf_release(abuf); client->pos = 0; abuf = refbuf_queue_remove(&client->queue); } /* now send or queue the new data */ if (data_done) { refbuf_addref(refbuf); refbuf_queue_add(&client->queue, refbuf); } else { sbytes = source->format->write_buf_to_client(source->format, client, refbuf->data, refbuf->len); if (sbytes >= 0) { if(sbytes != refbuf->len) { /* Didn't send the entire buffer, queue it */ client->pos = sbytes; refbuf_addref(refbuf); refbuf_queue_insert(&client->queue, refbuf); } } else { DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection"); client->con->error = 1; } } /* if the client is too slow, its queue will slowly build up. ** we need to make sure the client is keeping up with the ** data, so we'll kick any client who's queue gets to large. ** the queue_limit might need to be tuned, but should work fine. ** TODO: put queue_limit in a config file */ if (refbuf_queue_size(&client->queue) > 25) { DEBUG0("Client has fallen too far behind, removing"); client->con->error = 1; } /* release read lock on node */ avl_node_unlock(client_node); /* get the next node */ client_node = avl_get_next(client_node); } /* release read lock on client_tree */ avl_tree_unlock(source->client_tree); refbuf_release(refbuf); /* acquire write lock on client_tree */ avl_tree_wlock(source->client_tree); /** delete bad clients **/ client_node = avl_get_first(source->client_tree); while (client_node) { client = (client_t *)client_node->key; if (client->con->error) { client_node = avl_get_next(client_node); avl_delete(source->client_tree, (void *)client, _free_client); listeners--; stats_event_args(source->mount, "listeners", "%d", listeners); DEBUG0("Client removed"); continue; } client_node = avl_get_next(client_node); } /* acquire write lock on pending_tree */ avl_tree_wlock(source->pending_tree); /** add pending clients **/ client_node = avl_get_first(source->pending_tree); while (client_node) { avl_insert(source->client_tree, client_node->key); listeners++; DEBUG0("Client added"); stats_event_inc(NULL, "clients"); stats_event_inc(source->mount, "connections"); stats_event_args(source->mount, "listeners", "%d", listeners); /* we have to send cached headers for some data formats ** this is where we queue up the buffers to send */ if (source->format->has_predata) { client = (client_t *)client_node->key; client->queue = source->format->get_predata(source->format); } client_node = avl_get_next(client_node); } /** clear pending tree **/ while (avl_get_first(source->pending_tree)) { avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, _remove_client); } /* release write lock on pending_tree */ avl_tree_unlock(source->pending_tree); /* release write lock on client_tree */ avl_tree_unlock(source->client_tree); } done: DEBUG0("Source exiting"); avl_tree_rlock(global.source_tree); fallback_source = source_find_mount(source->fallback_mount); avl_tree_unlock(global.source_tree); /* we need to empty the client and pending trees */ avl_tree_wlock(source->pending_tree); while (avl_get_first(source->pending_tree)) { client_t *client = (client_t *)avl_get_first( source->pending_tree)->key; if(fallback_source) { avl_delete(source->pending_tree, client, _remove_client); // TODO: reset client local format data? avl_tree_wlock(fallback_source->pending_tree); avl_insert(fallback_source->pending_tree, (void *)client); avl_tree_unlock(fallback_source->pending_tree); } else { avl_delete(source->pending_tree, client, _free_client); } } avl_tree_unlock(source->pending_tree); avl_tree_wlock(source->client_tree); while (avl_get_first(source->client_tree)) { client_t *client = (client_t *)avl_get_first(source->client_tree)->key; if(fallback_source) { avl_delete(source->client_tree, client, _remove_client); // TODO: reset client local format data? avl_tree_wlock(fallback_source->pending_tree); avl_insert(fallback_source->pending_tree, (void *)client); avl_tree_unlock(fallback_source->pending_tree); } else { avl_delete(source->client_tree, client, _free_client); } } avl_tree_unlock(source->client_tree); /* delete this sources stats */ stats_event_dec(NULL, "sources"); stats_event(source->mount, "listeners", NULL); global_lock(); global.sources--; global_unlock(); /* release our hold on the lock so the main thread can continue cleaning up */ thread_rwlock_unlock(source->shutdown_rwlock); avl_tree_wlock(global.source_tree); avl_delete(global.source_tree, source, source_free_source); avl_tree_unlock(global.source_tree); thread_exit(0); return NULL; } static int _compare_clients(void *compare_arg, void *a, void *b) { connection_t *cona = (connection_t *)a; connection_t *conb = (connection_t *)b; if (cona->id < conb->id) return -1; if (cona->id > conb->id) return 1; return 0; } static int _remove_client(void *key) { return 1; } static int _free_client(void *key) { client_t *client = (client_t *)key; global_lock(); global.clients--; global_unlock(); stats_event_dec(NULL, "clients"); client_destroy(client); return 1; }