mirror of
https://gitlab.xiph.org/xiph/icecast-server.git
synced 2024-06-23 06:25:24 +00:00
1272 lines
38 KiB
C
1272 lines
38 KiB
C
/* Icecast
|
|
*
|
|
* This program is distributed under the GNU General Public License, version 2.
|
|
* A copy of this license is included with this source.
|
|
*
|
|
* Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
|
|
* Michael Smith <msmith@xiph.org>,
|
|
* oddsock <oddsock@xiph.org>,
|
|
* Karl Heyes <karl@xiph.org>
|
|
* and others (see AUTHORS for details).
|
|
* Copyright 2011, Dave 'justdave' Miller <justdave@mozilla.com>,
|
|
* Copyright 2011-2022, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
|
|
*/
|
|
|
|
/* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
|
|
#ifdef HAVE_CONFIG_H
|
|
#include <config.h>
|
|
#endif
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <stdbool.h>
|
|
#include <errno.h>
|
|
#include <string.h>
|
|
#ifdef HAVE_POLL
|
|
#include <poll.h>
|
|
#endif
|
|
#include <sys/types.h>
|
|
|
|
#ifndef _WIN32
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#else
|
|
#include <winsock2.h>
|
|
#endif
|
|
|
|
#include "common/thread/thread.h"
|
|
#include "common/avl/avl.h"
|
|
#include "common/net/sock.h"
|
|
#include "common/httpp/httpp.h"
|
|
|
|
#include "compat.h"
|
|
#include "connection.h"
|
|
#include "connection_handle.h"
|
|
#include "cfgfile.h"
|
|
#include "global.h"
|
|
#include "util.h"
|
|
#include "refobject.h"
|
|
#include "refbuf.h"
|
|
#include "client.h"
|
|
#include "errors.h"
|
|
#include "stats.h"
|
|
#include "logging.h"
|
|
#include "fserve.h"
|
|
#include "slave.h"
|
|
|
|
#include "source.h"
|
|
#include "admin.h"
|
|
#include "auth.h"
|
|
#include "matchfile.h"
|
|
#include "tls.h"
|
|
#include "acl.h"
|
|
#include "refobject.h"
|
|
#include "listensocket.h"
|
|
#include "fastevent.h"
|
|
#include "navigation.h"
|
|
|
|
#define CATMODULE "connection"
|
|
|
|
/* Two different major types of source authentication.
|
|
Shoutcast style is used only by the Shoutcast DSP
|
|
and is a crazy version of HTTP. It looks like :
|
|
Source Client -> Connects to port + 1
|
|
Source Client -> sends encoder password (plaintext)\r\n
|
|
Icecast -> reads encoder password, if ok, sends OK2\r\n, else disconnects
|
|
Source Client -> reads OK2\r\n, then sends http-type request headers
|
|
that contain the stream details (icy-name, etc..)
|
|
Icecast -> reads headers, stores them
|
|
Source Client -> starts sending MP3 data
|
|
Source Client -> periodically updates metadata via admin.cgi call
|
|
|
|
Icecast auth style uses HTTP and Basic Authorization.
|
|
*/
|
|
|
|
typedef struct client_queue_tag {
|
|
client_t *client;
|
|
int offset;
|
|
int shoutcast;
|
|
char *shoutcast_mount;
|
|
char *bodybuffer;
|
|
size_t bodybufferlen;
|
|
int tried_body;
|
|
bool ready;
|
|
struct client_queue_tag *next;
|
|
} client_queue_entry_t;
|
|
|
|
typedef struct {
|
|
client_queue_entry_t *head;
|
|
client_queue_entry_t **tail;
|
|
mutex_t mutex;
|
|
cond_t cond;
|
|
thread_type *thread;
|
|
bool running;
|
|
#ifdef HAVE_POLL
|
|
struct pollfd *pollfds;
|
|
size_t pollfds_len;
|
|
#endif
|
|
} client_queue_t;
|
|
|
|
#define QUEUE_READY_TIMEOUT 50
|
|
|
|
static spin_t _connection_lock; // protects _current_id
|
|
static volatile connection_id_t _current_id = 0;
|
|
static int _initialized = 0;
|
|
|
|
static client_queue_t _request_queue;
|
|
static client_queue_t _connection_queue;
|
|
static client_queue_t _body_queue;
|
|
static client_queue_t _handle_queue;
|
|
static bool tls_ok = false;
|
|
static tls_ctx_t *tls_ctx;
|
|
|
|
/* filtering client connection based on IP */
|
|
static matchfile_t *banned_ip, *allowed_ip;
|
|
|
|
rwlock_t _source_shutdown_rwlock;
|
|
|
|
static void get_tls_certificate(ice_config_t *config);
|
|
static void free_client_node(client_queue_entry_t *node);
|
|
static void * _handle_connection(client_queue_t *queue);
|
|
static void * process_request_queue (client_queue_t *queue);
|
|
static void * process_request_body_queue (client_queue_t *queue);
|
|
static void * handle_client_worker(client_queue_t *queue);
|
|
|
|
static void client_queue_init(client_queue_t *queue)
|
|
{
|
|
memset(queue, 0, sizeof(*queue));
|
|
queue->tail = &(queue->head);
|
|
thread_mutex_create(&(queue->mutex));
|
|
thread_cond_create(&(queue->cond));
|
|
}
|
|
|
|
static void client_queue_destroy(client_queue_t *queue)
|
|
{
|
|
if (queue->thread) {
|
|
queue->running = false;
|
|
thread_cond_broadcast(&(queue->cond));
|
|
thread_join(queue->thread);
|
|
}
|
|
thread_cond_destroy(&(queue->cond));
|
|
thread_mutex_destroy(&(queue->mutex));
|
|
#ifdef HAVE_POLL
|
|
free(queue->pollfds);
|
|
#endif
|
|
}
|
|
|
|
static void client_queue_start_thread(client_queue_t *queue, const char *name, void *(*func)(client_queue_t *))
|
|
{
|
|
if (queue->thread)
|
|
return;
|
|
queue->running = true;
|
|
queue->thread = thread_create(name, (void*(*)(void*))func, queue, THREAD_ATTACHED);
|
|
}
|
|
|
|
static inline bool client_queue_running(client_queue_t *queue)
|
|
{
|
|
return queue->running;
|
|
}
|
|
|
|
static void client_queue_add(client_queue_t *queue, client_queue_entry_t *entry)
|
|
{
|
|
thread_mutex_lock(&(queue->mutex));
|
|
*(queue->tail) = entry;
|
|
queue->tail = &(entry->next);
|
|
thread_mutex_unlock(&(queue->mutex));
|
|
thread_cond_broadcast(&(queue->cond));
|
|
}
|
|
|
|
static void client_queue_wait(client_queue_t *queue)
|
|
{
|
|
thread_cond_wait(&(queue->cond));
|
|
}
|
|
|
|
static client_queue_entry_t * client_queue_shift(client_queue_t *queue, client_queue_entry_t *stop)
|
|
{
|
|
client_queue_entry_t *ret;
|
|
|
|
thread_mutex_lock(&(queue->mutex));
|
|
ret = queue->head;
|
|
if (ret) {
|
|
if (ret == stop) {
|
|
ret = NULL;
|
|
} else {
|
|
queue->head = ret->next;
|
|
if (!queue->head) {
|
|
queue->tail = &(queue->head);
|
|
}
|
|
ret->next = NULL;
|
|
}
|
|
}
|
|
thread_mutex_unlock(&(queue->mutex));
|
|
|
|
return ret;
|
|
}
|
|
|
|
static bool client_queue_check_ready(client_queue_t *queue, int timeout, time_t connection_timeout)
|
|
{
|
|
if (!queue->head)
|
|
return false;
|
|
|
|
#ifdef HAVE_POLL
|
|
if (true) {
|
|
bool had_timeout = false;
|
|
size_t count = 0;
|
|
size_t i;
|
|
client_queue_entry_t *cur;
|
|
|
|
thread_mutex_lock(&(queue->mutex));
|
|
for (cur = queue->head; cur; cur = cur->next) {
|
|
count++;
|
|
if (cur->client->con->con_time <= connection_timeout) {
|
|
cur->ready = true;
|
|
had_timeout = true;
|
|
} else {
|
|
cur->ready = false;
|
|
}
|
|
}
|
|
|
|
if (queue->pollfds_len < count) {
|
|
free(queue->pollfds);
|
|
queue->pollfds = calloc(count, sizeof(*queue->pollfds));
|
|
if (queue->pollfds) {
|
|
queue->pollfds_len = count;
|
|
} else {
|
|
ICECAST_LOG_ERROR("Allocation of queue->pollfds failed. BAD.");
|
|
queue->pollfds_len = 0;
|
|
thread_mutex_unlock(&(queue->mutex));
|
|
return false;
|
|
}
|
|
} else {
|
|
memset(queue->pollfds, 0, sizeof(*queue->pollfds)*count);
|
|
}
|
|
|
|
for (cur = queue->head, i = 0; cur && i < count; cur = cur->next, i++) {
|
|
queue->pollfds[i].fd = cur->client->con->sock;
|
|
queue->pollfds[i].events = POLLIN;
|
|
}
|
|
thread_mutex_unlock(&(queue->mutex));
|
|
|
|
if (had_timeout)
|
|
return true;
|
|
|
|
if (poll(queue->pollfds, count, timeout) < 1)
|
|
return false;
|
|
|
|
thread_mutex_lock(&(queue->mutex));
|
|
for (cur = queue->head; cur; cur = cur->next) {
|
|
for (i = 0; i < count; i++) {
|
|
if (queue->pollfds[i].fd == cur->client->con->sock) {
|
|
if (queue->pollfds[i].revents) {
|
|
cur->ready = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
thread_mutex_unlock(&(queue->mutex));
|
|
}
|
|
#endif
|
|
|
|
return true;
|
|
}
|
|
|
|
static bool client_queue_check_ready_wait(client_queue_t *queue, int timeout, time_t connection_timeout)
|
|
{
|
|
while (queue->running) {
|
|
if (client_queue_check_ready(queue, timeout, connection_timeout))
|
|
return true;
|
|
|
|
if (!queue->head)
|
|
thread_cond_wait(&(queue->cond));
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static client_queue_entry_t * client_queue_shift_ready(client_queue_t *queue, client_queue_entry_t *stop)
|
|
{
|
|
#ifdef HAVE_POLL
|
|
client_queue_entry_t *cur;
|
|
client_queue_entry_t *last = NULL;
|
|
|
|
if (!queue->head)
|
|
return NULL;
|
|
|
|
thread_mutex_lock(&(queue->mutex));
|
|
for (cur = queue->head; cur && cur != stop; cur = cur->next) {
|
|
if (cur->ready) {
|
|
// use this one.
|
|
if (last == NULL) {
|
|
/* we are the head */
|
|
queue->head = cur->next;
|
|
if (!queue->head) {
|
|
queue->tail = &(queue->head);
|
|
}
|
|
} else {
|
|
last->next = cur->next;
|
|
if (queue->tail == &(cur->next)) {
|
|
queue->tail = &(last->next);
|
|
}
|
|
}
|
|
|
|
cur->next = NULL;
|
|
thread_mutex_unlock(&(queue->mutex));
|
|
return cur;
|
|
}
|
|
last = cur;
|
|
}
|
|
thread_mutex_unlock(&(queue->mutex));
|
|
return NULL;
|
|
#else
|
|
/* just return any */
|
|
return client_queue_shift(queue, stop);
|
|
#endif
|
|
}
|
|
|
|
void connection_initialize(void)
|
|
{
|
|
if (_initialized)
|
|
return;
|
|
|
|
thread_spin_create (&_connection_lock);
|
|
thread_mutex_create(&move_clients_mutex);
|
|
thread_rwlock_create(&_source_shutdown_rwlock);
|
|
client_queue_init(&_request_queue);
|
|
client_queue_init(&_connection_queue);
|
|
client_queue_init(&_body_queue);
|
|
client_queue_init(&_handle_queue);
|
|
|
|
client_queue_start_thread(&_request_queue, "Request Queue", process_request_queue);
|
|
client_queue_start_thread(&_connection_queue, "Con Queue", _handle_connection);
|
|
client_queue_start_thread(&_body_queue, "Body Queue", process_request_body_queue);
|
|
client_queue_start_thread(&_handle_queue, "Client Handler", handle_client_worker);
|
|
|
|
_initialized = 1;
|
|
}
|
|
|
|
void connection_shutdown(void)
|
|
{
|
|
if (!_initialized)
|
|
return;
|
|
|
|
tls_ctx_unref(tls_ctx);
|
|
matchfile_release(banned_ip);
|
|
matchfile_release(allowed_ip);
|
|
|
|
thread_rwlock_destroy(&_source_shutdown_rwlock);
|
|
thread_spin_destroy (&_connection_lock);
|
|
thread_mutex_destroy(&move_clients_mutex);
|
|
client_queue_destroy(&_request_queue);
|
|
client_queue_destroy(&_connection_queue);
|
|
client_queue_destroy(&_body_queue);
|
|
client_queue_destroy(&_handle_queue);
|
|
|
|
_initialized = 0;
|
|
}
|
|
|
|
void connection_reread_config(ice_config_t *config)
|
|
{
|
|
get_tls_certificate(config);
|
|
listensocket_container_configure_and_setup(global.listensockets, config);
|
|
}
|
|
|
|
static connection_id_t _next_connection_id(void)
|
|
{
|
|
connection_id_t id;
|
|
|
|
thread_spin_lock(&_connection_lock);
|
|
id = _current_id++;
|
|
thread_spin_unlock(&_connection_lock);
|
|
|
|
return id;
|
|
}
|
|
|
|
|
|
#ifdef ICECAST_CAP_TLS
|
|
static void get_tls_certificate(ice_config_t *config)
|
|
{
|
|
const char *keyfile;
|
|
|
|
tls_ok = false;
|
|
|
|
keyfile = config->tls_context.key_file;
|
|
if (!keyfile)
|
|
keyfile = config->tls_context.cert_file;
|
|
|
|
tls_ctx_unref(tls_ctx);
|
|
tls_ctx = tls_ctx_new(config->tls_context.cert_file, keyfile, config->tls_context.cipher_list);
|
|
if (!tls_ctx) {
|
|
ICECAST_LOG_INFO("No TLS capability on any configured ports");
|
|
return;
|
|
}
|
|
|
|
tls_ok = true;
|
|
}
|
|
|
|
|
|
/* handlers for reading and writing a connection_t when there is TLS
|
|
* configured on the listening port
|
|
*/
|
|
static int connection_read_tls(connection_t *con, void *buf, size_t len)
|
|
{
|
|
ssize_t bytes = tls_read(con->tls, buf, len);
|
|
|
|
if (bytes <= 0) {
|
|
if (tls_want_io(con->tls) > 0)
|
|
return -1;
|
|
con->error = 1;
|
|
}
|
|
return bytes;
|
|
}
|
|
|
|
static int connection_send_tls(connection_t *con, const void *buf, size_t len)
|
|
{
|
|
ssize_t bytes = tls_write(con->tls, buf, len);
|
|
|
|
if (bytes < 0) {
|
|
con->error = 1;
|
|
} else {
|
|
con->sent_bytes += bytes;
|
|
}
|
|
|
|
return bytes;
|
|
}
|
|
#else
|
|
|
|
/* TLS not compiled in, so at least log it */
|
|
static void get_tls_certificate(ice_config_t *config)
|
|
{
|
|
tls_ok = false;
|
|
ICECAST_LOG_INFO("No TLS capability. "
|
|
"Rebuild Icecast with OpenSSL support to enable this.");
|
|
}
|
|
#endif /* ICECAST_CAP_TLS */
|
|
|
|
|
|
/* handlers (default) for reading and writing a connection_t, no encrpytion
|
|
* used just straight access to the socket
|
|
*/
|
|
static int connection_read(connection_t *con, void *buf, size_t len)
|
|
{
|
|
int bytes = sock_read_bytes(con->sock, buf, len);
|
|
if (bytes == 0)
|
|
con->error = 1;
|
|
if (bytes == -1 && !sock_recoverable(sock_error()))
|
|
con->error = 1;
|
|
return bytes;
|
|
}
|
|
|
|
static int connection_send(connection_t *con, const void *buf, size_t len)
|
|
{
|
|
int bytes = sock_write_bytes(con->sock, buf, len);
|
|
if (bytes < 0) {
|
|
if (!sock_recoverable(sock_error()))
|
|
con->error = 1;
|
|
} else {
|
|
con->sent_bytes += bytes;
|
|
}
|
|
|
|
return bytes;
|
|
}
|
|
|
|
connection_t *connection_create(sock_t sock, listensocket_t *listensocket_real, listensocket_t* listensocket_effective, char *ip)
|
|
{
|
|
connection_t *con;
|
|
|
|
if (!matchfile_match_allow_deny(allowed_ip, banned_ip, ip))
|
|
return NULL;
|
|
|
|
con = (connection_t *)calloc(1, sizeof(connection_t));
|
|
if (con) {
|
|
refobject_ref(listensocket_real);
|
|
refobject_ref(listensocket_effective);
|
|
|
|
con->sock = sock;
|
|
con->listensocket_real = listensocket_real;
|
|
con->listensocket_effective = listensocket_effective;
|
|
con->con_time = time(NULL);
|
|
con->id = _next_connection_id();
|
|
con->ip = ip;
|
|
con->tlsmode = ICECAST_TLSMODE_AUTO;
|
|
con->read = connection_read;
|
|
con->send = connection_send;
|
|
}
|
|
|
|
fastevent_emit(FASTEVENT_TYPE_CONNECTION_CREATE, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_CONNECTION, con);
|
|
|
|
return con;
|
|
}
|
|
|
|
/* prepare connection for interacting over a TLS connection
|
|
*/
|
|
void connection_uses_tls(connection_t *con)
|
|
{
|
|
#ifdef ICECAST_CAP_TLS
|
|
if (con->tls)
|
|
return;
|
|
|
|
if (con->readbufferlen) {
|
|
ICECAST_LOG_ERROR("Connection is now using TLS but has data put back. BAD. Discarding putback data.");
|
|
free(con->readbuffer);
|
|
con->readbufferlen = 0;
|
|
}
|
|
|
|
con->tlsmode = ICECAST_TLSMODE_RFC2818;
|
|
con->read = connection_read_tls;
|
|
con->send = connection_send_tls;
|
|
con->tls = tls_new(tls_ctx);
|
|
tls_set_incoming(con->tls);
|
|
tls_set_socket(con->tls, con->sock);
|
|
#endif
|
|
}
|
|
|
|
ssize_t connection_send_bytes(connection_t *con, const void *buf, size_t len)
|
|
{
|
|
ssize_t ret = con->send(con, buf, len);
|
|
|
|
fastevent_emit(FASTEVENT_TYPE_CONNECTION_WRITE, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_OBRD, con, buf, len, ret);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static inline ssize_t connection_read_bytes_real(connection_t *con, void *buf, size_t len)
|
|
{
|
|
ssize_t done = 0;
|
|
ssize_t ret;
|
|
|
|
if (con->readbufferlen) {
|
|
ICECAST_LOG_DEBUG("On connection %p we read from putback buffer, filled with %zu bytes, requested are %zu bytes", con, con->readbufferlen, len);
|
|
if (len >= con->readbufferlen) {
|
|
memcpy(buf, con->readbuffer, con->readbufferlen);
|
|
free(con->readbuffer);
|
|
con->readbuffer = NULL;
|
|
ICECAST_LOG_DEBUG("New fill in buffer=<empty>");
|
|
if (len == con->readbufferlen) {
|
|
con->readbufferlen = 0;
|
|
return len;
|
|
} else {
|
|
len -= con->readbufferlen;
|
|
buf += con->readbufferlen;
|
|
done = con->readbufferlen;
|
|
con->readbufferlen = 0;
|
|
}
|
|
} else {
|
|
memcpy(buf, con->readbuffer, len);
|
|
memmove(con->readbuffer, con->readbuffer+len, con->readbufferlen-len);
|
|
con->readbufferlen -= len;
|
|
return len;
|
|
}
|
|
}
|
|
|
|
ret = con->read(con, buf, len);
|
|
|
|
if (ret < 0) {
|
|
if (done == 0) {
|
|
return ret;
|
|
} else {
|
|
return done;
|
|
}
|
|
}
|
|
|
|
return done + ret;
|
|
}
|
|
|
|
ssize_t connection_read_bytes(connection_t *con, void *buf, size_t len)
|
|
{
|
|
ssize_t ret = connection_read_bytes_real(con, buf, len);
|
|
|
|
fastevent_emit(FASTEVENT_TYPE_CONNECTION_READ, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_OBRD, con, buf, len, ret);
|
|
|
|
return ret;
|
|
}
|
|
|
|
int connection_read_put_back(connection_t *con, const void *buf, size_t len)
|
|
{
|
|
void *n;
|
|
|
|
fastevent_emit(FASTEVENT_TYPE_CONNECTION_PUTBACK, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_OBR, con, buf, len);
|
|
|
|
if (con->readbufferlen) {
|
|
n = realloc(con->readbuffer, con->readbufferlen + len);
|
|
if (!n)
|
|
return -1;
|
|
|
|
memcpy(n + con->readbufferlen, buf, len);
|
|
con->readbuffer = n;
|
|
con->readbufferlen += len;
|
|
|
|
ICECAST_LOG_DEBUG("On connection %p %zu bytes have been put back.", con, len);
|
|
return 0;
|
|
} else {
|
|
n = malloc(len);
|
|
if (!n)
|
|
return -1;
|
|
|
|
memcpy(n, buf, len);
|
|
con->readbuffer = n;
|
|
con->readbufferlen = len;
|
|
ICECAST_LOG_DEBUG("On connection %p %zu bytes have been put back.", con, len);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/* run along queue checking for any data that has come in or a timeout */
|
|
static bool process_request_queue_one (client_queue_entry_t *node, time_t timeout)
|
|
{
|
|
client_t *client = node->client;
|
|
int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
|
|
char *buf = client->refbuf->data + node->offset;
|
|
|
|
ICECAST_LOG_DDEBUG("Checking on client %p", client);
|
|
|
|
if (client->con->tlsmode == ICECAST_TLSMODE_AUTO || client->con->tlsmode == ICECAST_TLSMODE_AUTO_NO_PLAIN) {
|
|
char peak;
|
|
if (recv(client->con->sock, &peak, 1, MSG_PEEK) == 1) {
|
|
if (peak == 0x16) { /* TLS Record Protocol Content type 0x16 == Handshake */
|
|
connection_uses_tls(client->con);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (len > 0) {
|
|
if (client->con->con_time <= timeout) {
|
|
len = 0;
|
|
} else {
|
|
len = client_read_bytes(client, buf, len);
|
|
}
|
|
}
|
|
|
|
if (len > 0 || node->shoutcast > 1) {
|
|
ssize_t stream_offset = -1;
|
|
int pass_it = 1;
|
|
char *ptr;
|
|
|
|
if (len < 0 && node->shoutcast > 1)
|
|
len = 0;
|
|
|
|
/* handle \n, \r\n and nsvcap which for some strange reason has
|
|
* EOL as \r\r\n */
|
|
node->offset += len;
|
|
client->refbuf->data[node->offset] = '\000';
|
|
do {
|
|
if (node->shoutcast == 1) {
|
|
/* password line */
|
|
if (strstr (client->refbuf->data, "\r\r\n") != NULL)
|
|
break;
|
|
if (strstr (client->refbuf->data, "\r\n") != NULL)
|
|
break;
|
|
if (strstr (client->refbuf->data, "\n") != NULL)
|
|
break;
|
|
}
|
|
/* stream_offset refers to the start of any data sent after the
|
|
* http style headers, we don't want to lose those */
|
|
ptr = strstr(client->refbuf->data, "\r\r\n\r\r\n");
|
|
if (ptr) {
|
|
stream_offset = (ptr+6) - client->refbuf->data;
|
|
break;
|
|
}
|
|
ptr = strstr(client->refbuf->data, "\r\n\r\n");
|
|
if (ptr) {
|
|
stream_offset = (ptr+4) - client->refbuf->data;
|
|
break;
|
|
}
|
|
ptr = strstr(client->refbuf->data, "\n\n");
|
|
if (ptr) {
|
|
stream_offset = (ptr+2) - client->refbuf->data;
|
|
break;
|
|
}
|
|
pass_it = 0;
|
|
} while (0);
|
|
|
|
ICECAST_LOG_DDEBUG("pass_it=%i, len=%i", pass_it, (int)len);
|
|
ICECAST_LOG_DDEBUG("Client %p has buffer: %H", client, client->refbuf->data);
|
|
|
|
if (pass_it) {
|
|
if (stream_offset != -1) {
|
|
connection_read_put_back(client->con, client->refbuf->data + stream_offset, node->offset - stream_offset);
|
|
node->offset = stream_offset;
|
|
}
|
|
client_queue_add(&_connection_queue, node);
|
|
return true;
|
|
}
|
|
} else {
|
|
if (len == 0 || client->con->error) {
|
|
client_destroy(client);
|
|
free_client_node(node);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static void * process_request_queue (client_queue_t *queue)
|
|
{
|
|
while (client_queue_running(queue)) {
|
|
client_queue_entry_t *stop = NULL;
|
|
client_queue_entry_t *node;
|
|
ice_config_t *config;
|
|
time_t timeout;
|
|
|
|
config = config_get_config();
|
|
timeout = time(NULL) - config->header_timeout;
|
|
config_release_config();
|
|
|
|
client_queue_check_ready_wait(queue, QUEUE_READY_TIMEOUT, timeout);
|
|
|
|
while ((node = client_queue_shift_ready(queue, stop))) {
|
|
if (!process_request_queue_one(node, timeout)) {
|
|
client_queue_add(queue, node);
|
|
if (!stop)
|
|
stop = node;
|
|
}
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static client_slurp_result_t process_request_body_queue_one(client_queue_entry_t *node, time_t timeout, size_t body_size_limit)
|
|
{
|
|
client_t *client = node->client;
|
|
client_slurp_result_t res;
|
|
|
|
if (client->parser->req_type == httpp_req_post) {
|
|
if (node->bodybuffer == NULL && client->request_body_read == 0) {
|
|
if (client->request_body_length < 0) {
|
|
node->bodybufferlen = body_size_limit;
|
|
node->bodybuffer = malloc(node->bodybufferlen);
|
|
} else if (client->request_body_length <= (ssize_t)body_size_limit) {
|
|
node->bodybufferlen = client->request_body_length;
|
|
node->bodybuffer = malloc(node->bodybufferlen);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (node->bodybuffer) {
|
|
res = client_body_slurp(client, node->bodybuffer, &(node->bodybufferlen));
|
|
if (res == CLIENT_SLURP_SUCCESS) {
|
|
httpp_parse_postdata(client->parser, node->bodybuffer, node->bodybufferlen);
|
|
free(node->bodybuffer);
|
|
node->bodybuffer = NULL;
|
|
}
|
|
} else {
|
|
res = client_body_skip(client);
|
|
}
|
|
|
|
if (res != CLIENT_SLURP_SUCCESS) {
|
|
if (client->con->con_time <= timeout || client->request_body_read >= body_size_limit) {
|
|
return CLIENT_SLURP_ERROR;
|
|
}
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
/* This queue reads data from the body of clients. */
|
|
static void * process_request_body_queue (client_queue_t *queue)
|
|
{
|
|
while (client_queue_running(queue)) {
|
|
client_queue_entry_t *stop = NULL;
|
|
client_queue_entry_t *node;
|
|
ice_config_t *config;
|
|
time_t timeout;
|
|
size_t body_size_limit;
|
|
|
|
ICECAST_LOG_DDEBUG("Processing body queue.");
|
|
|
|
config = config_get_config();
|
|
timeout = time(NULL) - config->body_timeout;
|
|
body_size_limit = config->body_size_limit;
|
|
config_release_config();
|
|
|
|
client_queue_check_ready_wait(queue, QUEUE_READY_TIMEOUT, timeout);
|
|
|
|
while ((node = client_queue_shift(queue, stop))) {
|
|
client_t *client = node->client;
|
|
client_slurp_result_t res;
|
|
|
|
node->tried_body = 1;
|
|
|
|
ICECAST_LOG_DEBUG("Got client %p in body queue.", client);
|
|
|
|
res = process_request_body_queue_one(node, timeout, body_size_limit);
|
|
|
|
if (res == CLIENT_SLURP_NEEDS_MORE_DATA) {
|
|
client_queue_add(queue, node);
|
|
if (!stop)
|
|
stop = node;
|
|
} else {
|
|
ICECAST_LOG_DEBUG("Putting client %p back in connection queue.", client);
|
|
|
|
client_queue_add(&_connection_queue, node);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static client_queue_entry_t *create_client_node(client_t *client)
|
|
{
|
|
client_queue_entry_t *node = calloc (1, sizeof (client_queue_entry_t));
|
|
const listener_t *listener;
|
|
|
|
if (!node)
|
|
return NULL;
|
|
|
|
node->client = client;
|
|
|
|
listener = listensocket_get_listener(client->con->listensocket_effective);
|
|
|
|
if (listener) {
|
|
if (listener->shoutcast_compat)
|
|
node->shoutcast = 1;
|
|
client->con->tlsmode = listener->tls;
|
|
if (listener->tls == ICECAST_TLSMODE_RFC2818 && tls_ok)
|
|
connection_uses_tls(client->con);
|
|
if (listener->shoutcast_mount)
|
|
node->shoutcast_mount = strdup(listener->shoutcast_mount);
|
|
}
|
|
|
|
listensocket_release_listener(client->con->listensocket_effective);
|
|
|
|
return node;
|
|
}
|
|
|
|
static void free_client_node(client_queue_entry_t *node)
|
|
{
|
|
free(node->shoutcast_mount);
|
|
free(node->bodybuffer);
|
|
free(node);
|
|
}
|
|
|
|
void connection_queue(connection_t *con)
|
|
{
|
|
client_queue_entry_t *node;
|
|
client_t *client = NULL;
|
|
|
|
global_lock();
|
|
if (client_create(&client, con, NULL) < 0) {
|
|
global_unlock();
|
|
client_send_error_by_id(client, ICECAST_ERROR_GEN_CLIENT_LIMIT);
|
|
/* don't be too eager as this is an imposed hard limit */
|
|
thread_sleep(400000);
|
|
return;
|
|
}
|
|
|
|
/* setup client for reading incoming http */
|
|
client->refbuf->data[PER_CLIENT_REFBUF_SIZE-1] = '\000';
|
|
|
|
if (sock_set_blocking(client->con->sock, 0) || sock_set_nodelay(client->con->sock)) {
|
|
global_unlock();
|
|
ICECAST_LOG_WARN("Failed to set tcp options on client connection, dropping");
|
|
client_destroy(client);
|
|
return;
|
|
}
|
|
node = create_client_node(client);
|
|
global_unlock();
|
|
|
|
if (node == NULL) {
|
|
client_destroy(client);
|
|
return;
|
|
}
|
|
|
|
client_queue_add(&_request_queue, node);
|
|
stats_event_inc(NULL, "connections");
|
|
}
|
|
|
|
void connection_accept_loop(void)
|
|
{
|
|
ice_config_t *config;
|
|
|
|
config = config_get_config();
|
|
get_tls_certificate(config);
|
|
config_release_config();
|
|
|
|
while (global.running == ICECAST_RUNNING) {
|
|
connection_t *con = listensocket_container_accept(global.listensockets, 800);
|
|
|
|
if (con) {
|
|
connection_queue(con);
|
|
}
|
|
}
|
|
|
|
/* Give all the other threads notification to shut down */
|
|
|
|
/* wait for all the sources to shutdown */
|
|
thread_rwlock_wlock(&_source_shutdown_rwlock);
|
|
thread_rwlock_unlock(&_source_shutdown_rwlock);
|
|
}
|
|
|
|
|
|
/* Called when activating a source. Verifies that the source count is not
|
|
* exceeded and applies any initial parameters.
|
|
*/
|
|
int connection_complete_source(source_t *source, int response)
|
|
{
|
|
ice_config_t *config;
|
|
|
|
global_lock();
|
|
ICECAST_LOG_DEBUG("sources count is %d", global.sources);
|
|
|
|
config = config_get_config();
|
|
if (global.sources < config->source_limit) {
|
|
const char *contenttype;
|
|
mount_proxy *mountinfo;
|
|
format_type_t format_type;
|
|
|
|
/* setup format handler */
|
|
contenttype = httpp_getvar (source->parser, "content-type");
|
|
if (contenttype != NULL) {
|
|
format_type = format_get_type(contenttype);
|
|
|
|
if (format_type == FORMAT_ERROR) {
|
|
config_release_config();
|
|
global_unlock();
|
|
if (response) {
|
|
client_send_error_by_id(source->client, ICECAST_ERROR_CON_CONTENT_TYPE_NOSYS);
|
|
source->client = NULL;
|
|
}
|
|
ICECAST_LOG_WARN("Content-type \"%s\" not supported, dropping source", contenttype);
|
|
return -1;
|
|
}
|
|
} else if (source->parser->req_type == httpp_req_put) {
|
|
config_release_config();
|
|
global_unlock();
|
|
if (response) {
|
|
client_send_error_by_id(source->client, ICECAST_ERROR_CON_NO_CONTENT_TYPE_GIVEN);
|
|
source->client = NULL;
|
|
}
|
|
ICECAST_LOG_ERROR("Content-type not given in PUT request, dropping source");
|
|
return -1;
|
|
} else {
|
|
ICECAST_LOG_ERROR("No content-type header, falling back to backwards compatibility mode "
|
|
"for icecast 1.x relays. Assuming content is mp3. This behaviour is deprecated "
|
|
"and the source client will NOT work with future Icecast versions!");
|
|
format_type = FORMAT_TYPE_GENERIC;
|
|
}
|
|
|
|
if (format_get_plugin (format_type, source) < 0) {
|
|
global_unlock();
|
|
config_release_config();
|
|
if (response) {
|
|
client_send_error_by_id(source->client, ICECAST_ERROR_CON_INTERNAL_FORMAT_ALLOC_ERROR);
|
|
source->client = NULL;
|
|
}
|
|
ICECAST_LOG_WARN("plugin format failed for \"%s\"", source->mount);
|
|
return -1;
|
|
}
|
|
|
|
global.sources++;
|
|
stats_event_args(NULL, "sources", "%d", global.sources);
|
|
global_unlock();
|
|
|
|
source->running = 1;
|
|
mountinfo = config_find_mount(config, source->mount, MOUNT_TYPE_NORMAL);
|
|
source_update_settings(config, source, mountinfo);
|
|
config_release_config();
|
|
slave_rebuild_mounts();
|
|
|
|
source->shutdown_rwlock = &_source_shutdown_rwlock;
|
|
ICECAST_LOG_DEBUG("source is ready to start");
|
|
|
|
return 0;
|
|
}
|
|
ICECAST_LOG_WARN("Request to add source when maximum source limit "
|
|
"reached %d", global.sources);
|
|
|
|
global_unlock();
|
|
config_release_config();
|
|
|
|
if (response) {
|
|
client_send_error_by_id(source->client, ICECAST_ERROR_CON_SOURCE_CLIENT_LIMIT);
|
|
source->client = NULL;
|
|
}
|
|
|
|
return -1;
|
|
}
|
|
|
|
static void _handle_shoutcast_compatible(client_queue_entry_t *node)
|
|
{
|
|
char *http_compliant;
|
|
int http_compliant_len = 0;
|
|
http_parser_t *parser;
|
|
const char *shoutcast_mount;
|
|
client_t *client = node->client;
|
|
ice_config_t *config;
|
|
|
|
ICECAST_LOG_DDEBUG("Client %p is a shoutcast client of stage %i", client, (int)node->shoutcast);
|
|
|
|
if (node->shoutcast == 1)
|
|
{
|
|
char *ptr, *headers;
|
|
|
|
ICECAST_LOG_DDEBUG("Client %p has buffer: %H", client, client->refbuf->data);
|
|
|
|
/* Get rid of trailing \r\n or \n after password */
|
|
ptr = strstr(client->refbuf->data, "\r\r\n");
|
|
if (ptr) {
|
|
headers = ptr+3;
|
|
} else {
|
|
ptr = strstr(client->refbuf->data, "\r\n");
|
|
if (ptr) {
|
|
headers = ptr+2;
|
|
} else {
|
|
ptr = strstr(client->refbuf->data, "\n");
|
|
if (ptr)
|
|
headers = ptr+1;
|
|
}
|
|
}
|
|
|
|
if (ptr == NULL){
|
|
client_destroy(client);
|
|
free_client_node(node);
|
|
return;
|
|
}
|
|
*ptr = '\0';
|
|
|
|
client->password = strdup(client->refbuf->data);
|
|
config = config_get_config();
|
|
client->username = strdup(config->shoutcast_user);
|
|
config_release_config();
|
|
node->offset -= (headers - client->refbuf->data);
|
|
memmove(client->refbuf->data, headers, node->offset+1);
|
|
node->shoutcast = 2;
|
|
/* we've checked the password, now send it back for reading headers */
|
|
client_queue_add(&_request_queue, node);
|
|
ICECAST_LOG_DDEBUG("Client %p re-added to request queue", client);
|
|
return;
|
|
}
|
|
/* actually make a copy as we are dropping the config lock */
|
|
/* Here we create a valid HTTP request based of the information
|
|
that was passed in via the non-HTTP style protocol above. This
|
|
means we can use some of our existing code to handle this case */
|
|
config = config_get_config();
|
|
if (node->shoutcast_mount) {
|
|
shoutcast_mount = node->shoutcast_mount;
|
|
} else {
|
|
shoutcast_mount = config->shoutcast_mount;
|
|
}
|
|
http_compliant_len = 20 + strlen(shoutcast_mount) + node->offset;
|
|
http_compliant = (char *)calloc(1, http_compliant_len);
|
|
snprintf(http_compliant, http_compliant_len,
|
|
"SOURCE %s HTTP/1.0\r\n%s", shoutcast_mount, client->refbuf->data);
|
|
config_release_config();
|
|
|
|
parser = httpp_create_parser();
|
|
httpp_initialize(parser, NULL);
|
|
if (httpp_parse(parser, http_compliant, strlen(http_compliant))) {
|
|
client->refbuf->len = 0;
|
|
client->parser = parser;
|
|
client->protocol = ICECAST_PROTOCOL_SHOUTCAST;
|
|
node->shoutcast = 0;
|
|
return;
|
|
} else {
|
|
httpp_destroy(parser);
|
|
client_destroy(client);
|
|
}
|
|
free(http_compliant);
|
|
free_client_node(node);
|
|
return;
|
|
}
|
|
|
|
/* Check if we need body of client */
|
|
static int _need_body(client_queue_entry_t *node)
|
|
{
|
|
client_t *client = node->client;
|
|
|
|
if (node->tried_body)
|
|
return 0;
|
|
|
|
if (client->parser->req_type == httpp_req_source) {
|
|
/* SOURCE connection. */
|
|
return 0;
|
|
} else if (client->parser->req_type == httpp_req_put) {
|
|
/* PUT connection.
|
|
* TODO: We may need body for /admin/ but we do not know if it's an admin request yet.
|
|
*/
|
|
return 0;
|
|
} else if (client->request_body_length != -1 && (size_t)client->request_body_length != client->request_body_read) {
|
|
return 1;
|
|
} else if (client->request_body_length == -1 && client_body_eof(client) == 0) {
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Connection thread. Here we take clients off the connection queue and check
|
|
* the contents provided. We set up the parser then hand off to the specific
|
|
* request handler.
|
|
*/
|
|
static void * _handle_connection(client_queue_t *queue)
|
|
{
|
|
while (client_queue_running(queue)) {
|
|
client_queue_entry_t *node;
|
|
|
|
node = client_queue_shift(&_connection_queue, NULL);
|
|
if (node) {
|
|
client_t *client = node->client;
|
|
http_parser_t *parser;
|
|
const char *rawuri;
|
|
int already_parsed = 0;
|
|
|
|
/* Check for special shoutcast compatability processing */
|
|
if (node->shoutcast) {
|
|
_handle_shoutcast_compatible (node);
|
|
if (node->shoutcast)
|
|
continue;
|
|
}
|
|
|
|
/* process normal HTTP headers */
|
|
if (client->parser) {
|
|
already_parsed = 1;
|
|
parser = client->parser;
|
|
} else {
|
|
parser = httpp_create_parser();
|
|
httpp_initialize(parser, NULL);
|
|
client->parser = parser;
|
|
}
|
|
if (already_parsed || httpp_parse (parser, client->refbuf->data, node->offset)) {
|
|
client->refbuf->len = 0;
|
|
|
|
/* early check if we need more data */
|
|
client_complete(client);
|
|
if (_need_body(node)) {
|
|
/* Just calling client_queue_add(&_body_queue, node) would do the job.
|
|
* However, if the client only has a small body this might work without moving it between queues.
|
|
* -> much faster.
|
|
*/
|
|
client_slurp_result_t res;
|
|
ice_config_t *config;
|
|
time_t timeout;
|
|
size_t body_size_limit;
|
|
|
|
config = config_get_config();
|
|
timeout = time(NULL) - config->body_timeout;
|
|
body_size_limit = config->body_size_limit;
|
|
config_release_config();
|
|
|
|
res = process_request_body_queue_one(node, timeout, body_size_limit);
|
|
if (res != CLIENT_SLURP_SUCCESS) {
|
|
ICECAST_LOG_DEBUG("Putting client %p in body queue.", client);
|
|
client_queue_add(&_body_queue, node);
|
|
continue;
|
|
} else {
|
|
ICECAST_LOG_DEBUG("Success on fast lane");
|
|
}
|
|
}
|
|
|
|
rawuri = httpp_getvar(parser, HTTPP_VAR_URI);
|
|
|
|
/* assign a port-based shoutcast mountpoint if required */
|
|
if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0)
|
|
httpp_set_query_param (client->parser, "mount", node->shoutcast_mount);
|
|
|
|
|
|
client_queue_add(&_handle_queue, node);
|
|
} else {
|
|
free_client_node(node);
|
|
ICECAST_LOG_ERROR("HTTP request parsing failed");
|
|
client_destroy (client);
|
|
}
|
|
} else {
|
|
client_queue_wait(queue);
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void * handle_client_worker(client_queue_t *queue)
|
|
{
|
|
while (client_queue_running(queue)) {
|
|
client_queue_entry_t *node = client_queue_shift(queue, NULL);
|
|
if (node) {
|
|
client_t *client = node->client;
|
|
free_client_node(node);
|
|
|
|
connection_handle_client(client);
|
|
} else {
|
|
client_queue_wait(queue);
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void __on_sock_count(size_t count, void *userdata)
|
|
{
|
|
(void)userdata;
|
|
|
|
ICECAST_LOG_DEBUG("Listen socket count is now %zu.", count);
|
|
|
|
if (count == 0 && global.running == ICECAST_RUNNING) {
|
|
ICECAST_LOG_INFO("No more listen sockets. Exiting.");
|
|
global.running = ICECAST_HALTING;
|
|
}
|
|
}
|
|
|
|
/* called when listening thread is not checking for incoming connections */
|
|
void connection_setup_sockets (ice_config_t *config)
|
|
{
|
|
global_lock();
|
|
refobject_unref(global.listensockets);
|
|
|
|
if (config == NULL) {
|
|
global_unlock();
|
|
return;
|
|
}
|
|
|
|
/* setup the banned/allowed IP filenames from the xml */
|
|
if (config->banfile) {
|
|
matchfile_release(banned_ip);
|
|
banned_ip = matchfile_new(config->banfile);
|
|
if (!banned_ip)
|
|
ICECAST_LOG_ERROR("Can not create ban object, bad!");
|
|
}
|
|
|
|
if (config->allowfile) {
|
|
matchfile_release(allowed_ip);
|
|
allowed_ip = matchfile_new(config->allowfile);
|
|
}
|
|
|
|
global.listensockets = refobject_new(listensocket_container_t);
|
|
listensocket_container_configure(global.listensockets, config);
|
|
|
|
global_unlock();
|
|
|
|
listensocket_container_set_sockcount_cb(global.listensockets, __on_sock_count, NULL);
|
|
listensocket_container_setup(global.listensockets);;
|
|
}
|
|
|
|
|
|
void connection_close(connection_t *con)
|
|
{
|
|
if (!con)
|
|
return;
|
|
|
|
ICECAST_LOG_DEBUG("Closing connection %p (connection ID: %llu, sock=%R)", con, (long long unsigned int)con->id, con->sock);
|
|
|
|
fastevent_emit(FASTEVENT_TYPE_CONNECTION_DESTROY, FASTEVENT_FLAG_MODIFICATION_ALLOWED, FASTEVENT_DATATYPE_CONNECTION, con);
|
|
|
|
tls_unref(con->tls);
|
|
if (con->sock != SOCK_ERROR)
|
|
sock_close(con->sock);
|
|
if (con->ip)
|
|
free(con->ip);
|
|
if (con->readbuffer)
|
|
free(con->readbuffer);
|
|
refobject_unref(con->listensocket_real);
|
|
refobject_unref(con->listensocket_effective);
|
|
free(con);
|
|
}
|
|
|
|
void connection_queue_client(client_t *client)
|
|
{
|
|
client_queue_entry_t *node = create_client_node(client);
|
|
client_queue_add(&_connection_queue, node);
|
|
}
|