diff --git a/src/format_ebml.c b/src/format_ebml.c index 67cf8854..68e95047 100644 --- a/src/format_ebml.c +++ b/src/format_ebml.c @@ -47,6 +47,14 @@ typedef enum ebml_read_mode { EBML_STATE_READING_CLUSTERS } ebml_read_mode; +typedef enum ebml_parsing_state { + EBML_STATE_PARSING_HEADER = 0, + EBML_STATE_COPYING_TO_HEADER, + EBML_STATE_START_CLUSTER, + EBML_STATE_PARSING_CLUSTERS, + EBML_STATE_COPYING_TO_DATA +} ebml_parsing_state; + typedef enum ebml_chunk_type { EBML_CHUNK_HEADER = 0, EBML_CHUNK_CLUSTER_START, @@ -65,8 +73,9 @@ struct ebml_client_data_st { struct ebml_st { ebml_read_mode output_state; + ebml_parsing_state parse_state; + unsigned long long copy_len; - char *cluster_id; int cluster_start; int position; @@ -333,11 +342,9 @@ static ebml_t *ebml_create() ebml->output_state = EBML_STATE_READING_HEADER; ebml->header = calloc(1, EBML_HEADER_MAX_SIZE); - ebml->buffer = calloc(1, EBML_SLICE_SIZE * 4); + ebml->buffer = calloc(1, EBML_SLICE_SIZE); ebml->input_buffer = calloc(1, EBML_SLICE_SIZE); - ebml->cluster_id = "\x1F\x43\xB6\x75"; - ebml->cluster_start = -1; return ebml; @@ -487,65 +494,154 @@ static unsigned char *ebml_get_write_buffer(ebml_t *ebml, int *bytes) */ static int ebml_wrote(ebml_t *ebml, int len) { + int processing = 1; + int cursor = 0; + int to_copy; + unsigned char *end_of_buffer; - int b; + int tag_length; + unsigned long long payload_length; + int copy_state; - if (ebml->header_size == 0) { - /* Still reading header */ - if ((ebml->header_position + len) > EBML_HEADER_MAX_SIZE) { - ICECAST_LOG_ERROR("EBML Header too large, failing"); - return -1; - } - -/* - ICECAST_LOG_DEBUG("EBML: Adding to header, ofset is %d size is %d adding %d", - ebml->header_size, ebml->header_position, len); -*/ - - memcpy(ebml->header + ebml->header_position, ebml->input_buffer, len); - ebml->header_position += len; - } else { - /* Header's already been determined, read into data buffer */ - memcpy(ebml->buffer + ebml->position, ebml->input_buffer, len); - } - - for (b = 0; b <= len - 4; b++) - { - /* Scan for cluster start marker. - * False positives are possible, but unlikely, and only - * permanently corrupt a stream if they occur while scanning - * the initial header. Else, a client can reconnect in a few - * seconds and get a real sync point. - */ - if (!memcmp(ebml->input_buffer + b, ebml->cluster_id, 4)) - { -/* - ICECAST_LOG_DEBUG("EBML: found cluster"); -*/ - - if (ebml->header_size == 0) - { - /* We were looking for the header end; now we've found it */ - ebml->header_size = ebml->header_position - len + b; + char *segment_id = "\x18\x53\x80\x67"; + char *cluster_id = "\x1F\x43\xB6\x75"; + + ebml->input_position += len; + end_of_buffer = ebml->input_buffer + ebml->input_position; + + while (processing) { + + /*ICECAST_LOG_DEBUG("Parse State: %i", ebml->parse_state);*/ + + switch (ebml->parse_state) { + + case EBML_STATE_PARSING_HEADER: + case EBML_STATE_PARSING_CLUSTERS: - /* Shift data after the header into the data buffer */ - memcpy(ebml->buffer, ebml->input_buffer + b, len - b); - ebml->position = len - b; + if (ebml->parse_state == EBML_STATE_PARSING_HEADER) { + copy_state = EBML_STATE_COPYING_TO_HEADER; + } else { + copy_state = EBML_STATE_COPYING_TO_DATA; + } - /* Mark the start of the data as the first sync point */ - ebml->cluster_start = 0; - return len; - } else { - /* We've located a sync point in the data stream */ - ebml->cluster_start = ebml->position + b; - } + tag_length = ebml_parse_tag(ebml->input_buffer + cursor, + end_of_buffer, &payload_length); + + if (tag_length > 0) { + + if (payload_length == EBML_UNKNOWN) { + /* Parse all children for tags we can't skip */ + payload_length = 0; + } + + if (!memcmp(ebml->input_buffer + cursor, cluster_id, 4)) { + /* Found a cluster */ + ebml->parse_state = EBML_STATE_START_CLUSTER; + + } else if (!memcmp(ebml->input_buffer + cursor, segment_id, 4)) { + /* Segment tag, copy tag to buffer but parse children */ + ebml->copy_len = tag_length; + ebml->parse_state = copy_state; + + } else { + /* Non-cluster tag, copy it & children into buffer */ + ebml->copy_len = tag_length + payload_length; + ebml->parse_state = copy_state; + + } + + } else if (tag_length == 0) { + /* Wait for more data */ + /* ICECAST_LOG_DEBUG("Wait"); */ + processing = 0; + } else if (tag_length < 0) { + /* Parse error */ + /* ICECAST_LOG_DEBUG("Stop"); */ + return -1; + } + break; + + case EBML_STATE_START_CLUSTER: + /* found a cluster; wait to process it until + * any previous cluster tag has been flushed + * from the read buffer, so as to not lose the + * sync point. + */ + if (ebml->cluster_start >= 0) { + processing = 0; + } else { + + tag_length = ebml_parse_tag(ebml->input_buffer + cursor, + end_of_buffer, &payload_length); + + /* The header has been fully read by now, publish its size. */ + ebml->header_size = ebml->header_position; + + /* Mark this sync point */ + ebml->cluster_start = ebml->position; + + /* Copy cluster tag to read buffer */ + ebml->copy_len = tag_length; + ebml->parse_state = EBML_STATE_COPYING_TO_DATA; + } + break; + + case EBML_STATE_COPYING_TO_HEADER: + case EBML_STATE_COPYING_TO_DATA: + to_copy = ebml->input_position - cursor; + if (to_copy > ebml->copy_len) { + to_copy = ebml->copy_len; + } + + if (ebml->parse_state == EBML_STATE_COPYING_TO_HEADER) { + if ((ebml->header_position + to_copy) > EBML_HEADER_MAX_SIZE) { + ICECAST_LOG_ERROR("EBML Header too large, failing"); + return -1; + } + + memcpy(ebml->header + ebml->header_position, ebml->input_buffer + cursor, to_copy); + ebml->header_position += to_copy; + + } else if (ebml->parse_state == EBML_STATE_COPYING_TO_DATA) { + if ((ebml->position + to_copy) > EBML_SLICE_SIZE) { + to_copy = EBML_SLICE_SIZE - ebml->position; + } + + memcpy(ebml->buffer + ebml->position, ebml->input_buffer + cursor, to_copy); + ebml->position += to_copy; + } + /* ICECAST_LOG_DEBUG("Copied %i of %hhu", to_copy, ebml->copy_len); */ + + cursor += to_copy; + ebml->copy_len -= to_copy; + + if (ebml->copy_len == 0) { + /* resume parsing */ + if (ebml->parse_state == EBML_STATE_COPYING_TO_HEADER) { + ebml->parse_state = EBML_STATE_PARSING_HEADER; + } else { + ebml->parse_state = EBML_STATE_PARSING_CLUSTERS; + } + } else { + /* wait for more data */ + processing = 0; + } + + break; + + default: + processing = 0; + } + } - - ebml->position += len; - + + /* Shift unprocessed data down to the start of the buffer */ + memmove(ebml->input_buffer, ebml->input_buffer + cursor, ebml->input_position - cursor); + ebml->input_position -= cursor; + return len; - + } /* Try to parse an EBML tag at the given location, returning the