From 6c7f289fab3429706cb47b81ea02963585f0dd05 Mon Sep 17 00:00:00 2001 From: Zhang Rui Date: Tue, 13 Oct 2015 18:30:47 +0800 Subject: avformat/async: cache some data for fast seek backward Signed-off-by: Michael Niedermayer --- libavformat/async.c | 123 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 104 insertions(+), 19 deletions(-) (limited to 'libavformat') diff --git a/libavformat/async.c b/libavformat/async.c index a8c8f542c1..9fac84a01a 100644 --- a/libavformat/async.c +++ b/libavformat/async.c @@ -24,7 +24,6 @@ /** * @TODO * support timeout - * support backward short seek * support work with concatdec, hls */ @@ -43,8 +42,17 @@ #endif #define BUFFER_CAPACITY (4 * 1024 * 1024) +#define READ_BACK_CAPACITY (4 * 1024 * 1024) #define SHORT_SEEK_THRESHOLD (256 * 1024) +typedef struct RingBuffer +{ + AVFifoBuffer *fifo; + int read_back_capacity; + + int read_pos; +} RingBuffer; + typedef struct Context { AVClass *class; URLContext *inner; @@ -61,7 +69,7 @@ typedef struct Context { int64_t logical_pos; int64_t logical_size; - AVFifoBuffer *fifo; + RingBuffer ring; pthread_cond_t cond_wakeup_main; pthread_cond_t cond_wakeup_background; @@ -72,6 +80,73 @@ typedef struct Context { AVIOInterruptCB interrupt_callback; } Context; +static int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity) +{ + memset(ring, 0, sizeof(RingBuffer)); + ring->fifo = av_fifo_alloc(capacity + read_back_capacity); + if (!ring->fifo) + return AVERROR(ENOMEM); + + ring->read_back_capacity = read_back_capacity; + return 0; +} + +static void ring_destroy(RingBuffer *ring) +{ + av_fifo_freep(&ring->fifo); +} + +static void ring_reset(RingBuffer *ring) +{ + av_fifo_reset(ring->fifo); + ring->read_pos = 0; +} + +static int ring_size(RingBuffer *ring) +{ + return av_fifo_size(ring->fifo) - ring->read_pos; +} + +static int ring_space(RingBuffer *ring) +{ + return av_fifo_space(ring->fifo); +} + +static int ring_generic_read(RingBuffer *ring, void *dest, int buf_size, void (*func)(void*, void*, int)) +{ + int ret; + + av_assert2(buf_size <= ring_size(ring)); + ret = av_fifo_generic_peek_at(ring->fifo, dest, ring->read_pos, buf_size, func); + ring->read_pos += buf_size; + + if (ring->read_pos > ring->read_back_capacity) { + av_fifo_drain(ring->fifo, ring->read_pos - ring->read_back_capacity); + ring->read_pos = ring->read_back_capacity; + } + + return ret; +} + +static int ring_generic_write(RingBuffer *ring, void *src, int size, int (*func)(void*, void*, int)) +{ + av_assert2(size <= ring_space(ring)); + return av_fifo_generic_write(ring->fifo, src, size, func); +} + +static int ring_size_of_read_back(RingBuffer *ring) +{ + return ring->read_pos; +} + +static int ring_drain(RingBuffer *ring, int offset) +{ + av_assert2(offset >= -ring_size_of_read_back(ring)); + av_assert2(offset <= -ring_size(ring)); + ring->read_pos += offset; + return 0; +} + static int async_check_interrupt(void *arg) { URLContext *h = arg; @@ -102,7 +177,7 @@ static void *async_buffer_task(void *arg) { URLContext *h = arg; Context *c = h->priv_data; - AVFifoBuffer *fifo = c->fifo; + RingBuffer *ring = &c->ring; int ret = 0; int64_t seek_ret; @@ -132,14 +207,14 @@ static void *async_buffer_task(void *arg) c->seek_ret = seek_ret; c->seek_request = 0; - av_fifo_reset(fifo); + ring_reset(ring); pthread_cond_signal(&c->cond_wakeup_main); pthread_mutex_unlock(&c->mutex); continue; } - fifo_space = av_fifo_space(fifo); + fifo_space = ring_space(ring); if (c->io_eof_reached || fifo_space <= 0) { pthread_cond_signal(&c->cond_wakeup_main); pthread_cond_wait(&c->cond_wakeup_background, &c->mutex); @@ -149,7 +224,7 @@ static void *async_buffer_task(void *arg) pthread_mutex_unlock(&c->mutex); to_copy = FFMIN(4096, fifo_space); - ret = av_fifo_generic_write(fifo, (void *)h, to_copy, (void *)wrapped_url_read); + ret = ring_generic_write(ring, (void *)h, to_copy, (void *)wrapped_url_read); pthread_mutex_lock(&c->mutex); if (ret <= 0) { @@ -173,11 +248,9 @@ static int async_open(URLContext *h, const char *arg, int flags, AVDictionary ** av_strstart(arg, "async:", &arg); - c->fifo = av_fifo_alloc(BUFFER_CAPACITY); - if (!c->fifo) { - ret = AVERROR(ENOMEM); + ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY); + if (ret < 0) goto fifo_fail; - } /* wrap interrupt callback */ c->interrupt_callback = h->interrupt_callback; @@ -225,7 +298,7 @@ cond_wakeup_main_fail: mutex_fail: ffurl_close(c->inner); url_fail: - av_fifo_freep(&c->fifo); + ring_destroy(&c->ring); fifo_fail: return ret; } @@ -248,7 +321,7 @@ static int async_close(URLContext *h) pthread_cond_destroy(&c->cond_wakeup_main); pthread_mutex_destroy(&c->mutex); ffurl_close(c->inner); - av_fifo_freep(&c->fifo); + ring_destroy(&c->ring); return 0; } @@ -257,7 +330,7 @@ static int async_read_internal(URLContext *h, void *dest, int size, int read_com void (*func)(void*, void*, int)) { Context *c = h->priv_data; - AVFifoBuffer *fifo = c->fifo; + RingBuffer *ring = &c->ring; int to_read = size; int ret = 0; @@ -269,10 +342,10 @@ static int async_read_internal(URLContext *h, void *dest, int size, int read_com ret = AVERROR_EXIT; break; } - fifo_size = av_fifo_size(fifo); + fifo_size = ring_size(ring); to_copy = FFMIN(to_read, fifo_size); if (to_copy > 0) { - av_fifo_generic_read(fifo, dest, to_copy, func); + ring_generic_read(ring, dest, to_copy, func); if (!func) dest = (uint8_t *)dest + to_copy; c->logical_pos += to_copy; @@ -312,10 +385,11 @@ static void fifo_do_not_copy_func(void* dest, void* src, int size) { static int64_t async_seek(URLContext *h, int64_t pos, int whence) { Context *c = h->priv_data; - AVFifoBuffer *fifo = c->fifo; + RingBuffer *ring = &c->ring; int64_t ret; int64_t new_logical_pos; int fifo_size; + int fifo_size_of_read_back; if (whence == AVSEEK_SIZE) { av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size); @@ -332,17 +406,28 @@ static int64_t async_seek(URLContext *h, int64_t pos, int whence) if (new_logical_pos < 0) return AVERROR(EINVAL); - fifo_size = av_fifo_size(fifo); + fifo_size = ring_size(ring); + fifo_size_of_read_back = ring_size_of_read_back(ring); if (new_logical_pos == c->logical_pos) { /* current position */ return c->logical_pos; - } else if ((new_logical_pos > c->logical_pos) && + } else if ((new_logical_pos >= (c->logical_pos - fifo_size_of_read_back)) && (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) { + int pos_delta = (int)(new_logical_pos - c->logical_pos); /* fast seek */ av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n", new_logical_pos, (int)c->logical_pos, (int)(new_logical_pos - c->logical_pos), fifo_size); - async_read_internal(h, NULL, (int)(new_logical_pos - c->logical_pos), 1, fifo_do_not_copy_func); + + if (pos_delta > 0) { + // fast seek forwards + async_read_internal(h, NULL, pos_delta, 1, fifo_do_not_copy_func); + } else { + // fast seek backwards + ring_drain(ring, pos_delta); + c->logical_pos = new_logical_pos; + } + return c->logical_pos; } else if (c->logical_size <= 0) { /* can not seek */ -- cgit v1.2.3