summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libavcodec/decode.c29
-rw-r--r--libavcodec/decode.h7
-rw-r--r--libavcodec/internal.h7
-rw-r--r--libavcodec/pthread_frame.c256
-rw-r--r--libavcodec/thread.h22
5 files changed, 209 insertions, 112 deletions
diff --git a/libavcodec/decode.c b/libavcodec/decode.c
index 0613681f89..034bed03ea 100644
--- a/libavcodec/decode.c
+++ b/libavcodec/decode.c
@@ -202,6 +202,10 @@ fail:
return ret;
}
+#if !HAVE_THREADS
+#define ff_thread_get_packet(avctx, pkt) (AVERROR_BUG)
+#endif
+
int ff_decode_get_packet(AVCodecContext *avctx, AVPacket *pkt)
{
AVCodecInternal *avci = avctx->internal;
@@ -210,7 +214,14 @@ int ff_decode_get_packet(AVCodecContext *avctx, AVPacket *pkt)
if (avci->draining)
return AVERROR_EOF;
- ret = av_bsf_receive_packet(avci->bsf, pkt);
+ /* If we are a worker thread, get the next packet from the threading
+ * context. Otherwise we are the main (user-facing) context, so we get the
+ * next packet from the input filterchain.
+ */
+ if (avctx->internal->is_frame_mt)
+ ret = ff_thread_get_packet(avctx, pkt);
+ else
+ ret = av_bsf_receive_packet(avci->bsf, pkt);
if (ret == AVERROR_EOF)
avci->draining = 1;
if (ret < 0)
@@ -295,15 +306,11 @@ static inline int decode_simple_internal(AVCodecContext *avctx, AVFrame *frame,
return AVERROR_EOF;
if (!pkt->data &&
- !(avctx->codec->capabilities & AV_CODEC_CAP_DELAY ||
- avctx->active_thread_type & FF_THREAD_FRAME))
+ !(avctx->codec->capabilities & AV_CODEC_CAP_DELAY))
return AVERROR_EOF;
got_frame = 0;
- if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME) {
- ret = ff_thread_decode_frame(avctx, frame, &got_frame, pkt);
- } else {
ret = codec->cb.decode(avctx, frame, &got_frame, pkt);
if (!(codec->caps_internal & FF_CODEC_CAP_SETS_PKT_DTS))
@@ -320,7 +327,6 @@ static inline int decode_simple_internal(AVCodecContext *avctx, AVFrame *frame,
if (frame->format == AV_PIX_FMT_NONE) frame->format = avctx->pix_fmt;
}
}
- }
emms_c();
actual_got_frame = got_frame;
@@ -520,7 +526,7 @@ static int decode_simple_receive_frame(AVCodecContext *avctx, AVFrame *frame)
return 0;
}
-static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
+int ff_decode_receive_frame(AVCodecContext *avctx, AVFrame *frame)
{
AVCodecInternal *avci = avctx->internal;
const FFCodec *const codec = ffcodec(avctx->codec);
@@ -579,6 +585,13 @@ FF_ENABLE_DEPRECATION_WARNINGS
return ret;
}
+static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
+{
+ if (avctx->active_thread_type & FF_THREAD_FRAME)
+ return ff_thread_receive_frame(avctx, frame);
+ return ff_decode_receive_frame(avctx, frame);
+}
+
int attribute_align_arg avcodec_send_packet(AVCodecContext *avctx, const AVPacket *avpkt)
{
AVCodecInternal *avci = avctx->internal;
diff --git a/libavcodec/decode.h b/libavcodec/decode.h
index 1b40f714e1..3ee272ddec 100644
--- a/libavcodec/decode.h
+++ b/libavcodec/decode.h
@@ -94,4 +94,11 @@ int ff_copy_palette(void *dst, const AVPacket *src, void *logctx);
*/
int ff_decode_preinit(AVCodecContext *avctx);
+/**
+ * Do the actual decoding and obtain a decoded frame from the decoder, if
+ * available. When frame threading is used, this is invoked by the worker
+ * threads, otherwise by the top layer directly.
+ */
+int ff_decode_receive_frame(AVCodecContext *avctx, AVFrame *frame);
+
#endif /* AVCODEC_DECODE_H */
diff --git a/libavcodec/internal.h b/libavcodec/internal.h
index 6fb4e1b9af..f61685e4e1 100644
--- a/libavcodec/internal.h
+++ b/libavcodec/internal.h
@@ -53,6 +53,13 @@ typedef struct AVCodecInternal {
int is_copy;
/**
+ * This field is set to 1 when frame threading is being used and the parent
+ * AVCodecContext of this AVCodecInternal is a worker-thread context (i.e.
+ * one of those actually doing the decoding), 0 otherwise.
+ */
+ int is_frame_mt;
+
+ /**
* An audio frame with less than required samples has been submitted and
* padded with silence. Reject all subsequent frames.
*/
diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index 8faea75a49..5b09473dc7 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -29,6 +29,7 @@
#include "avcodec.h"
#include "codec_internal.h"
+#include "decode.h"
#include "hwconfig.h"
#include "internal.h"
#include "pthread_internal.h"
@@ -72,6 +73,12 @@ enum {
INITIALIZED, ///< Thread has been properly set up
};
+typedef struct DecodedFrames {
+ AVFrame **f;
+ size_t nb_f;
+ size_t nb_f_allocated;
+} DecodedFrames;
+
/**
* Context used by codec threads and stored in their AVCodecInternal thread_ctx.
*/
@@ -92,8 +99,10 @@ typedef struct PerThreadContext {
AVPacket *avpkt; ///< Input packet (for decoding) or output (for encoding).
- AVFrame *frame; ///< Output frame (for decoding) or input (for encoding).
- int got_frame; ///< The output of got_picture_ptr from the last avcodec_decode_video() call.
+ /**
+ * Decoded frames from a single decode iteration.
+ */
+ DecodedFrames df;
int result; ///< The result of the last codec decode/encode() call.
atomic_int state;
@@ -140,6 +149,14 @@ typedef struct FrameThreadContext {
pthread_cond_t async_cond;
int async_lock;
+ DecodedFrames df;
+ int result;
+
+ /**
+ * Packet to be submitted to the next thread for decoding.
+ */
+ AVPacket *next_pkt;
+
int next_decoding; ///< The next context to submit a packet to.
int next_finished; ///< The next context to return output from.
@@ -172,6 +189,51 @@ static void async_unlock(FrameThreadContext *fctx)
pthread_mutex_unlock(&fctx->async_mutex);
}
+// get a free frame to decode into
+static AVFrame *decoded_frames_get_free(DecodedFrames *df)
+{
+ if (df->nb_f == df->nb_f_allocated) {
+ AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1,
+ sizeof(*df->f));
+ if (!tmp)
+ return NULL;
+ df->f = tmp;
+
+ df->f[df->nb_f] = av_frame_alloc();
+ if (!df->f[df->nb_f])
+ return NULL;
+
+ df->nb_f_allocated++;
+ }
+
+ av_frame_unref(df->f[df->nb_f]);
+ return df->f[df->nb_f];
+}
+
+static void decoded_frames_pop(DecodedFrames *df, AVFrame *dst)
+{
+ AVFrame *tmp_frame = df->f[0];
+ av_frame_move_ref(dst, tmp_frame);
+ memmove(df->f, df->f + 1, (df->nb_f - 1) * sizeof(*df->f));
+ df->f[--df->nb_f] = tmp_frame;
+}
+
+static void decoded_frames_flush(DecodedFrames *df)
+{
+ for (int i = 0; i < df->nb_f; i++)
+ av_frame_unref(df->f[i]);
+ df->nb_f = 0;
+}
+
+static void decoded_frames_free(DecodedFrames *df)
+{
+ for (int i = 0; i < df->nb_f_allocated; i++)
+ av_frame_free(&df->f[i]);
+ av_freep(&df->f);
+ df->nb_f = 0;
+ df->nb_f_allocated = 0;
+}
+
/**
* Codec worker thread.
*
@@ -184,6 +246,7 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
PerThreadContext *p = arg;
AVCodecContext *avctx = p->avctx;
const FFCodec *codec = ffcodec(avctx->codec);
+ int ret;
pthread_mutex_lock(&p->mutex);
while (1) {
@@ -216,16 +279,31 @@ FF_ENABLE_DEPRECATION_WARNINGS
p->hwaccel_serializing = 1;
}
- av_frame_unref(p->frame);
- p->got_frame = 0;
- p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt);
+ ret = 0;
+ while (ret >= 0) {
+ AVFrame *frame;
+
+ /* get the frame which will store the output */
+ frame = decoded_frames_get_free(&p->df);
+ if (!frame) {
+ p->result = AVERROR(ENOMEM);
+ goto alloc_fail;
+ }
+
+ /* do the actual decoding */
+ ret = ff_decode_receive_frame(avctx, frame);
+ if (ret == 0)
+ p->df.nb_f++;
+ else if (ret < 0 && frame->buf[0])
+ ff_thread_release_buffer(avctx, frame);
- if ((p->result < 0 || !p->got_frame) && p->frame->buf[0])
- ff_thread_release_buffer(avctx, p->frame);
+ p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
if (atomic_load(&p->state) == STATE_SETTING_UP)
ff_thread_finish_setup(avctx);
+alloc_fail:
if (p->hwaccel_serializing) {
p->hwaccel_serializing = 0;
pthread_mutex_unlock(&p->parent->hwaccel_mutex);
@@ -410,23 +488,25 @@ static void release_delayed_buffers(PerThreadContext *p)
#endif
static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
- AVPacket *avpkt)
+ AVPacket *in_pkt)
{
FrameThreadContext *fctx = p->parent;
PerThreadContext *prev_thread = fctx->prev_thread;
- const AVCodec *codec = p->avctx->codec;
- int ret;
-
- if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY))
- return 0;
+ int err;
pthread_mutex_lock(&p->mutex);
- ret = update_context_from_user(p->avctx, user_avctx);
- if (ret) {
+ av_packet_unref(p->avpkt);
+ av_packet_move_ref(p->avpkt, in_pkt);
+ if (!p->avpkt->size)
+ p->avctx->internal->draining = 1;
+
+ err = update_context_from_user(p->avctx, user_avctx);
+ if (err < 0) {
pthread_mutex_unlock(&p->mutex);
- return ret;
+ return err;
}
+
atomic_store_explicit(&p->debug_threads,
(p->avctx->debug & FF_DEBUG_THREADS) != 0,
memory_order_relaxed);
@@ -436,7 +516,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
#endif
if (prev_thread) {
- int err;
if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
pthread_mutex_lock(&prev_thread->progress_mutex);
while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
@@ -451,14 +530,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
}
}
- av_packet_unref(p->avpkt);
- ret = av_packet_ref(p->avpkt, avpkt);
- if (ret < 0) {
- pthread_mutex_unlock(&p->mutex);
- av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in submit_packet()\n");
- return ret;
- }
-
atomic_store(&p->state, STATE_SETTING_UP);
pthread_cond_signal(&p->input_cond);
pthread_mutex_unlock(&p->mutex);
@@ -502,57 +573,42 @@ FF_ENABLE_DEPRECATION_WARNINGS
#endif
fctx->prev_thread = p;
- fctx->next_decoding++;
+ fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count;
return 0;
}
-int ff_thread_decode_frame(AVCodecContext *avctx,
- AVFrame *picture, int *got_picture_ptr,
- AVPacket *avpkt)
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
{
FrameThreadContext *fctx = avctx->internal->thread_ctx;
- int finished = fctx->next_finished;
- PerThreadContext *p;
- int err;
+ int ret = 0;
/* release the async lock, permitting blocked hwaccel threads to
* go forward while we are in this function */
async_unlock(fctx);
- /*
- * Submit a packet to the next decoding thread.
- */
-
- p = &fctx->threads[fctx->next_decoding];
- err = submit_packet(p, avctx, avpkt);
- if (err)
- goto finish;
+ /* submit packets to threads while there are no buffered results to return */
+ while (!fctx->df.nb_f && !fctx->result) {
+ PerThreadContext *p;
- /*
- * If we're still receiving the initial packets, don't return a frame.
- */
-
- if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1)))
- fctx->delaying = 0;
+ /* get a packet to be submitted to the next thread */
+ av_packet_unref(fctx->next_pkt);
+ ret = ff_decode_get_packet(avctx, fctx->next_pkt);
+ if (ret < 0 && ret != AVERROR_EOF)
+ goto finish;
- if (fctx->delaying) {
- *got_picture_ptr=0;
- if (avpkt->size) {
- err = avpkt->size;
+ ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
+ fctx->next_pkt);
+ if (ret < 0)
goto finish;
- }
- }
- /*
- * Return the next available frame from the oldest thread.
- * If we're at the end of the stream, then we have to skip threads that
- * didn't output a frame/error, because we don't want to accidentally signal
- * EOF (avpkt->size == 0 && *got_picture_ptr == 0 && err >= 0).
- */
+ /* do not return any frames until all threads have something to do */
+ if (fctx->next_decoding != fctx->next_finished &&
+ !avctx->internal->draining)
+ continue;
- do {
- p = &fctx->threads[finished++];
+ p = &fctx->threads[fctx->next_finished];
+ fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
if (atomic_load(&p->state) != STATE_INPUT_READY) {
pthread_mutex_lock(&p->progress_mutex);
@@ -561,35 +617,26 @@ int ff_thread_decode_frame(AVCodecContext *avctx,
pthread_mutex_unlock(&p->progress_mutex);
}
- av_frame_move_ref(picture, p->frame);
- *got_picture_ptr = p->got_frame;
- picture->pkt_dts = p->avpkt->dts;
- err = p->result;
-
- /*
- * A later call with avkpt->size == 0 may loop over all threads,
- * including this one, searching for a frame/error to return before being
- * stopped by the "finished != fctx->next_finished" condition.
- * Make sure we don't mistakenly return the same frame/error again.
- */
- p->got_frame = 0;
- p->result = 0;
-
- if (finished >= avctx->thread_count) finished = 0;
- } while (!avpkt->size && !*got_picture_ptr && err >= 0 && finished != fctx->next_finished);
-
- update_context_from_thread(avctx, p->avctx, 1);
+ fctx->result = p->result;
+ p->result = 0;
- if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0;
+ if (p->df.nb_f)
+ FFSWAP(DecodedFrames, fctx->df, p->df);
+ }
- fctx->next_finished = finished;
+ /* a thread may return multiple frames AND an error
+ * we first return all the frames, then the error */
+ if (fctx->df.nb_f) {
+ decoded_frames_pop(&fctx->df, frame);
+ ret = 0;
+ } else {
+ ret = fctx->result;
+ fctx->result = 0;
+ }
- /* return the size of the consumed packet if no error occurred */
- if (err >= 0)
- err = avpkt->size;
finish:
async_lock(fctx);
- return err;
+ return ret;
}
void ff_thread_report_progress(ThreadFrame *f, int n, int field)
@@ -681,7 +728,6 @@ static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count
pthread_cond_wait(&p->output_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
- p->got_frame = 0;
}
async_lock(fctx);
@@ -742,6 +788,8 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep(&ctx->priv_data);
}
+ av_packet_free(&ctx->internal->in_pkt);
+
av_freep(&ctx->slice_offset);
av_buffer_unref(&ctx->internal->pool);
@@ -749,7 +797,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_buffer_unref(&ctx->hw_frames_ctx);
}
- av_frame_free(&p->frame);
+ decoded_frames_free(&p->df);
ff_pthread_free(p, per_thread_offsets);
av_packet_free(&p->avpkt);
@@ -757,6 +805,9 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep(&p->avctx);
}
+ decoded_frames_free(&fctx->df);
+ av_packet_free(&fctx->next_pkt);
+
av_freep(&fctx->threads);
ff_pthread_free(fctx, thread_ctx_offsets);
@@ -808,14 +859,18 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
if (err < 0)
return err;
- if (!(p->frame = av_frame_alloc()) ||
- !(p->avpkt = av_packet_alloc()))
+ if (!(p->avpkt = av_packet_alloc()))
return AVERROR(ENOMEM);
copy->internal->last_pkt_props = p->avpkt;
+ copy->internal->is_frame_mt = 1;
if (!first)
copy->internal->is_copy = 1;
+ copy->internal->in_pkt = av_packet_alloc();
+ if (!copy->internal->in_pkt)
+ return AVERROR(ENOMEM);
+
if (codec->init) {
err = codec->init(copy);
if (err < 0) {
@@ -871,6 +926,10 @@ int ff_frame_thread_init(AVCodecContext *avctx)
return err;
}
+ fctx->next_pkt = av_packet_alloc();
+ if (!fctx->next_pkt)
+ return AVERROR(ENOMEM);
+
fctx->async_lock = 1;
fctx->delaying = 1;
@@ -915,12 +974,13 @@ void ff_thread_flush(AVCodecContext *avctx)
fctx->next_decoding = fctx->next_finished = 0;
fctx->delaying = 1;
fctx->prev_thread = NULL;
+
+ decoded_frames_flush(&fctx->df);
+
for (i = 0; i < avctx->thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
- // Make sure decode flush calls with size=0 won't return old frames
- p->got_frame = 0;
- av_frame_unref(p->frame);
- p->result = 0;
+
+ decoded_frames_flush(&p->df);
#if FF_API_THREAD_SAFE_CALLBACKS
release_delayed_buffers(p);
@@ -1144,3 +1204,15 @@ void ff_thread_release_ext_buffer(AVCodecContext *avctx, ThreadFrame *f)
f->owner[0] = f->owner[1] = NULL;
ff_thread_release_buffer(avctx, f->f);
}
+
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt)
+{
+ PerThreadContext *p = avctx->internal->thread_ctx;
+
+ if (p->avpkt->buf) {
+ av_packet_move_ref(pkt, p->avpkt);
+ return 0;
+ }
+
+ return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN);
+}
diff --git a/libavcodec/thread.h b/libavcodec/thread.h
index d5673f25ea..3a1288835b 100644
--- a/libavcodec/thread.h
+++ b/libavcodec/thread.h
@@ -39,18 +39,11 @@
*/
void ff_thread_flush(AVCodecContext *avctx);
-/**
- * Submit a new frame to a decoding thread.
- * Returns the next available frame in picture. *got_picture_ptr
- * will be 0 if none is available.
- * The return value on success is the size of the consumed packet for
- * compatibility with FFCodec.decode. This means the decoder
- * has to consume the full packet.
- *
- * Parameters are the same as FFCodec.decode.
- */
-int ff_thread_decode_frame(AVCodecContext *avctx, AVFrame *picture,
- int *got_picture_ptr, AVPacket *avpkt);
+/*
+ * The receive_frame implementation for frame threading. Submit available
+ * packets for decoding to worker threads, return a decoded frame if available.
+*/
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame);
/**
* If the codec defines update_thread_context(), call this
@@ -99,6 +92,11 @@ int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f, int flags);
*/
void ff_thread_release_buffer(AVCodecContext *avctx, AVFrame *f);
+/**
+ * Get a packet for decoding. This gets invoked by the worker threads.
+ */
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt);
+
int ff_thread_init(AVCodecContext *s);
int ff_slice_thread_execute_with_mainfunc(AVCodecContext *avctx,
int (*action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr),