summaryrefslogtreecommitdiff
path: root/libavcodec/pthread_frame.c
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2017-01-24 15:00:22 +0100
committerAnton Khirnov <anton@khirnov.net>2022-08-01 15:45:25 +0200
commita691878d1d2185856e9a7d83022f880df1473737 (patch)
treebd8c1dda1bf0702a9e0c8db8551f6e97f9837b92 /libavcodec/pthread_frame.c
parent6b12208646fc8dc6d28da4f5c5ae57ea91c29412 (diff)
lavc: convert frame threading to the receive_frame() patternthread_receive
Reorganize the code such that the frame threading code does not call the decoders directly, but instead calls back into the generic decoding code. This avoids duplicating the logic that wraps the decoder invocation and will be useful in the following commits.
Diffstat (limited to 'libavcodec/pthread_frame.c')
-rw-r--r--libavcodec/pthread_frame.c256
1 files changed, 164 insertions, 92 deletions
diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index 8faea75a49..5b09473dc7 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -29,6 +29,7 @@
#include "avcodec.h"
#include "codec_internal.h"
+#include "decode.h"
#include "hwconfig.h"
#include "internal.h"
#include "pthread_internal.h"
@@ -72,6 +73,12 @@ enum {
INITIALIZED, ///< Thread has been properly set up
};
+typedef struct DecodedFrames {
+ AVFrame **f;
+ size_t nb_f;
+ size_t nb_f_allocated;
+} DecodedFrames;
+
/**
* Context used by codec threads and stored in their AVCodecInternal thread_ctx.
*/
@@ -92,8 +99,10 @@ typedef struct PerThreadContext {
AVPacket *avpkt; ///< Input packet (for decoding) or output (for encoding).
- AVFrame *frame; ///< Output frame (for decoding) or input (for encoding).
- int got_frame; ///< The output of got_picture_ptr from the last avcodec_decode_video() call.
+ /**
+ * Decoded frames from a single decode iteration.
+ */
+ DecodedFrames df;
int result; ///< The result of the last codec decode/encode() call.
atomic_int state;
@@ -140,6 +149,14 @@ typedef struct FrameThreadContext {
pthread_cond_t async_cond;
int async_lock;
+ DecodedFrames df;
+ int result;
+
+ /**
+ * Packet to be submitted to the next thread for decoding.
+ */
+ AVPacket *next_pkt;
+
int next_decoding; ///< The next context to submit a packet to.
int next_finished; ///< The next context to return output from.
@@ -172,6 +189,51 @@ static void async_unlock(FrameThreadContext *fctx)
pthread_mutex_unlock(&fctx->async_mutex);
}
+// get a free frame to decode into
+static AVFrame *decoded_frames_get_free(DecodedFrames *df)
+{
+ if (df->nb_f == df->nb_f_allocated) {
+ AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1,
+ sizeof(*df->f));
+ if (!tmp)
+ return NULL;
+ df->f = tmp;
+
+ df->f[df->nb_f] = av_frame_alloc();
+ if (!df->f[df->nb_f])
+ return NULL;
+
+ df->nb_f_allocated++;
+ }
+
+ av_frame_unref(df->f[df->nb_f]);
+ return df->f[df->nb_f];
+}
+
+static void decoded_frames_pop(DecodedFrames *df, AVFrame *dst)
+{
+ AVFrame *tmp_frame = df->f[0];
+ av_frame_move_ref(dst, tmp_frame);
+ memmove(df->f, df->f + 1, (df->nb_f - 1) * sizeof(*df->f));
+ df->f[--df->nb_f] = tmp_frame;
+}
+
+static void decoded_frames_flush(DecodedFrames *df)
+{
+ for (int i = 0; i < df->nb_f; i++)
+ av_frame_unref(df->f[i]);
+ df->nb_f = 0;
+}
+
+static void decoded_frames_free(DecodedFrames *df)
+{
+ for (int i = 0; i < df->nb_f_allocated; i++)
+ av_frame_free(&df->f[i]);
+ av_freep(&df->f);
+ df->nb_f = 0;
+ df->nb_f_allocated = 0;
+}
+
/**
* Codec worker thread.
*
@@ -184,6 +246,7 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
PerThreadContext *p = arg;
AVCodecContext *avctx = p->avctx;
const FFCodec *codec = ffcodec(avctx->codec);
+ int ret;
pthread_mutex_lock(&p->mutex);
while (1) {
@@ -216,16 +279,31 @@ FF_ENABLE_DEPRECATION_WARNINGS
p->hwaccel_serializing = 1;
}
- av_frame_unref(p->frame);
- p->got_frame = 0;
- p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt);
+ ret = 0;
+ while (ret >= 0) {
+ AVFrame *frame;
+
+ /* get the frame which will store the output */
+ frame = decoded_frames_get_free(&p->df);
+ if (!frame) {
+ p->result = AVERROR(ENOMEM);
+ goto alloc_fail;
+ }
+
+ /* do the actual decoding */
+ ret = ff_decode_receive_frame(avctx, frame);
+ if (ret == 0)
+ p->df.nb_f++;
+ else if (ret < 0 && frame->buf[0])
+ ff_thread_release_buffer(avctx, frame);
- if ((p->result < 0 || !p->got_frame) && p->frame->buf[0])
- ff_thread_release_buffer(avctx, p->frame);
+ p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
if (atomic_load(&p->state) == STATE_SETTING_UP)
ff_thread_finish_setup(avctx);
+alloc_fail:
if (p->hwaccel_serializing) {
p->hwaccel_serializing = 0;
pthread_mutex_unlock(&p->parent->hwaccel_mutex);
@@ -410,23 +488,25 @@ static void release_delayed_buffers(PerThreadContext *p)
#endif
static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
- AVPacket *avpkt)
+ AVPacket *in_pkt)
{
FrameThreadContext *fctx = p->parent;
PerThreadContext *prev_thread = fctx->prev_thread;
- const AVCodec *codec = p->avctx->codec;
- int ret;
-
- if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY))
- return 0;
+ int err;
pthread_mutex_lock(&p->mutex);
- ret = update_context_from_user(p->avctx, user_avctx);
- if (ret) {
+ av_packet_unref(p->avpkt);
+ av_packet_move_ref(p->avpkt, in_pkt);
+ if (!p->avpkt->size)
+ p->avctx->internal->draining = 1;
+
+ err = update_context_from_user(p->avctx, user_avctx);
+ if (err < 0) {
pthread_mutex_unlock(&p->mutex);
- return ret;
+ return err;
}
+
atomic_store_explicit(&p->debug_threads,
(p->avctx->debug & FF_DEBUG_THREADS) != 0,
memory_order_relaxed);
@@ -436,7 +516,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
#endif
if (prev_thread) {
- int err;
if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
pthread_mutex_lock(&prev_thread->progress_mutex);
while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
@@ -451,14 +530,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
}
}
- av_packet_unref(p->avpkt);
- ret = av_packet_ref(p->avpkt, avpkt);
- if (ret < 0) {
- pthread_mutex_unlock(&p->mutex);
- av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in submit_packet()\n");
- return ret;
- }
-
atomic_store(&p->state, STATE_SETTING_UP);
pthread_cond_signal(&p->input_cond);
pthread_mutex_unlock(&p->mutex);
@@ -502,57 +573,42 @@ FF_ENABLE_DEPRECATION_WARNINGS
#endif
fctx->prev_thread = p;
- fctx->next_decoding++;
+ fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count;
return 0;
}
-int ff_thread_decode_frame(AVCodecContext *avctx,
- AVFrame *picture, int *got_picture_ptr,
- AVPacket *avpkt)
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
{
FrameThreadContext *fctx = avctx->internal->thread_ctx;
- int finished = fctx->next_finished;
- PerThreadContext *p;
- int err;
+ int ret = 0;
/* release the async lock, permitting blocked hwaccel threads to
* go forward while we are in this function */
async_unlock(fctx);
- /*
- * Submit a packet to the next decoding thread.
- */
-
- p = &fctx->threads[fctx->next_decoding];
- err = submit_packet(p, avctx, avpkt);
- if (err)
- goto finish;
+ /* submit packets to threads while there are no buffered results to return */
+ while (!fctx->df.nb_f && !fctx->result) {
+ PerThreadContext *p;
- /*
- * If we're still receiving the initial packets, don't return a frame.
- */
-
- if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1)))
- fctx->delaying = 0;
+ /* get a packet to be submitted to the next thread */
+ av_packet_unref(fctx->next_pkt);
+ ret = ff_decode_get_packet(avctx, fctx->next_pkt);
+ if (ret < 0 && ret != AVERROR_EOF)
+ goto finish;
- if (fctx->delaying) {
- *got_picture_ptr=0;
- if (avpkt->size) {
- err = avpkt->size;
+ ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
+ fctx->next_pkt);
+ if (ret < 0)
goto finish;
- }
- }
- /*
- * Return the next available frame from the oldest thread.
- * If we're at the end of the stream, then we have to skip threads that
- * didn't output a frame/error, because we don't want to accidentally signal
- * EOF (avpkt->size == 0 && *got_picture_ptr == 0 && err >= 0).
- */
+ /* do not return any frames until all threads have something to do */
+ if (fctx->next_decoding != fctx->next_finished &&
+ !avctx->internal->draining)
+ continue;
- do {
- p = &fctx->threads[finished++];
+ p = &fctx->threads[fctx->next_finished];
+ fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
if (atomic_load(&p->state) != STATE_INPUT_READY) {
pthread_mutex_lock(&p->progress_mutex);
@@ -561,35 +617,26 @@ int ff_thread_decode_frame(AVCodecContext *avctx,
pthread_mutex_unlock(&p->progress_mutex);
}
- av_frame_move_ref(picture, p->frame);
- *got_picture_ptr = p->got_frame;
- picture->pkt_dts = p->avpkt->dts;
- err = p->result;
-
- /*
- * A later call with avkpt->size == 0 may loop over all threads,
- * including this one, searching for a frame/error to return before being
- * stopped by the "finished != fctx->next_finished" condition.
- * Make sure we don't mistakenly return the same frame/error again.
- */
- p->got_frame = 0;
- p->result = 0;
-
- if (finished >= avctx->thread_count) finished = 0;
- } while (!avpkt->size && !*got_picture_ptr && err >= 0 && finished != fctx->next_finished);
-
- update_context_from_thread(avctx, p->avctx, 1);
+ fctx->result = p->result;
+ p->result = 0;
- if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0;
+ if (p->df.nb_f)
+ FFSWAP(DecodedFrames, fctx->df, p->df);
+ }
- fctx->next_finished = finished;
+ /* a thread may return multiple frames AND an error
+ * we first return all the frames, then the error */
+ if (fctx->df.nb_f) {
+ decoded_frames_pop(&fctx->df, frame);
+ ret = 0;
+ } else {
+ ret = fctx->result;
+ fctx->result = 0;
+ }
- /* return the size of the consumed packet if no error occurred */
- if (err >= 0)
- err = avpkt->size;
finish:
async_lock(fctx);
- return err;
+ return ret;
}
void ff_thread_report_progress(ThreadFrame *f, int n, int field)
@@ -681,7 +728,6 @@ static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count
pthread_cond_wait(&p->output_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
- p->got_frame = 0;
}
async_lock(fctx);
@@ -742,6 +788,8 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep(&ctx->priv_data);
}
+ av_packet_free(&ctx->internal->in_pkt);
+
av_freep(&ctx->slice_offset);
av_buffer_unref(&ctx->internal->pool);
@@ -749,7 +797,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_buffer_unref(&ctx->hw_frames_ctx);
}
- av_frame_free(&p->frame);
+ decoded_frames_free(&p->df);
ff_pthread_free(p, per_thread_offsets);
av_packet_free(&p->avpkt);
@@ -757,6 +805,9 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep(&p->avctx);
}
+ decoded_frames_free(&fctx->df);
+ av_packet_free(&fctx->next_pkt);
+
av_freep(&fctx->threads);
ff_pthread_free(fctx, thread_ctx_offsets);
@@ -808,14 +859,18 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
if (err < 0)
return err;
- if (!(p->frame = av_frame_alloc()) ||
- !(p->avpkt = av_packet_alloc()))
+ if (!(p->avpkt = av_packet_alloc()))
return AVERROR(ENOMEM);
copy->internal->last_pkt_props = p->avpkt;
+ copy->internal->is_frame_mt = 1;
if (!first)
copy->internal->is_copy = 1;
+ copy->internal->in_pkt = av_packet_alloc();
+ if (!copy->internal->in_pkt)
+ return AVERROR(ENOMEM);
+
if (codec->init) {
err = codec->init(copy);
if (err < 0) {
@@ -871,6 +926,10 @@ int ff_frame_thread_init(AVCodecContext *avctx)
return err;
}
+ fctx->next_pkt = av_packet_alloc();
+ if (!fctx->next_pkt)
+ return AVERROR(ENOMEM);
+
fctx->async_lock = 1;
fctx->delaying = 1;
@@ -915,12 +974,13 @@ void ff_thread_flush(AVCodecContext *avctx)
fctx->next_decoding = fctx->next_finished = 0;
fctx->delaying = 1;
fctx->prev_thread = NULL;
+
+ decoded_frames_flush(&fctx->df);
+
for (i = 0; i < avctx->thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
- // Make sure decode flush calls with size=0 won't return old frames
- p->got_frame = 0;
- av_frame_unref(p->frame);
- p->result = 0;
+
+ decoded_frames_flush(&p->df);
#if FF_API_THREAD_SAFE_CALLBACKS
release_delayed_buffers(p);
@@ -1144,3 +1204,15 @@ void ff_thread_release_ext_buffer(AVCodecContext *avctx, ThreadFrame *f)
f->owner[0] = f->owner[1] = NULL;
ff_thread_release_buffer(avctx, f->f);
}
+
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt)
+{
+ PerThreadContext *p = avctx->internal->thread_ctx;
+
+ if (p->avpkt->buf) {
+ av_packet_move_ref(pkt, p->avpkt);
+ return 0;
+ }
+
+ return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN);
+}