diff options
Diffstat (limited to 'libavcodec/pthread_frame.c')
-rw-r--r-- | libavcodec/pthread_frame.c | 256 |
1 files changed, 164 insertions, 92 deletions
diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c index 8faea75a49..5b09473dc7 100644 --- a/libavcodec/pthread_frame.c +++ b/libavcodec/pthread_frame.c @@ -29,6 +29,7 @@ #include "avcodec.h" #include "codec_internal.h" +#include "decode.h" #include "hwconfig.h" #include "internal.h" #include "pthread_internal.h" @@ -72,6 +73,12 @@ enum { INITIALIZED, ///< Thread has been properly set up }; +typedef struct DecodedFrames { + AVFrame **f; + size_t nb_f; + size_t nb_f_allocated; +} DecodedFrames; + /** * Context used by codec threads and stored in their AVCodecInternal thread_ctx. */ @@ -92,8 +99,10 @@ typedef struct PerThreadContext { AVPacket *avpkt; ///< Input packet (for decoding) or output (for encoding). - AVFrame *frame; ///< Output frame (for decoding) or input (for encoding). - int got_frame; ///< The output of got_picture_ptr from the last avcodec_decode_video() call. + /** + * Decoded frames from a single decode iteration. + */ + DecodedFrames df; int result; ///< The result of the last codec decode/encode() call. atomic_int state; @@ -140,6 +149,14 @@ typedef struct FrameThreadContext { pthread_cond_t async_cond; int async_lock; + DecodedFrames df; + int result; + + /** + * Packet to be submitted to the next thread for decoding. + */ + AVPacket *next_pkt; + int next_decoding; ///< The next context to submit a packet to. int next_finished; ///< The next context to return output from. @@ -172,6 +189,51 @@ static void async_unlock(FrameThreadContext *fctx) pthread_mutex_unlock(&fctx->async_mutex); } +// get a free frame to decode into +static AVFrame *decoded_frames_get_free(DecodedFrames *df) +{ + if (df->nb_f == df->nb_f_allocated) { + AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1, + sizeof(*df->f)); + if (!tmp) + return NULL; + df->f = tmp; + + df->f[df->nb_f] = av_frame_alloc(); + if (!df->f[df->nb_f]) + return NULL; + + df->nb_f_allocated++; + } + + av_frame_unref(df->f[df->nb_f]); + return df->f[df->nb_f]; +} + +static void decoded_frames_pop(DecodedFrames *df, AVFrame *dst) +{ + AVFrame *tmp_frame = df->f[0]; + av_frame_move_ref(dst, tmp_frame); + memmove(df->f, df->f + 1, (df->nb_f - 1) * sizeof(*df->f)); + df->f[--df->nb_f] = tmp_frame; +} + +static void decoded_frames_flush(DecodedFrames *df) +{ + for (int i = 0; i < df->nb_f; i++) + av_frame_unref(df->f[i]); + df->nb_f = 0; +} + +static void decoded_frames_free(DecodedFrames *df) +{ + for (int i = 0; i < df->nb_f_allocated; i++) + av_frame_free(&df->f[i]); + av_freep(&df->f); + df->nb_f = 0; + df->nb_f_allocated = 0; +} + /** * Codec worker thread. * @@ -184,6 +246,7 @@ static attribute_align_arg void *frame_worker_thread(void *arg) PerThreadContext *p = arg; AVCodecContext *avctx = p->avctx; const FFCodec *codec = ffcodec(avctx->codec); + int ret; pthread_mutex_lock(&p->mutex); while (1) { @@ -216,16 +279,31 @@ FF_ENABLE_DEPRECATION_WARNINGS p->hwaccel_serializing = 1; } - av_frame_unref(p->frame); - p->got_frame = 0; - p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt); + ret = 0; + while (ret >= 0) { + AVFrame *frame; + + /* get the frame which will store the output */ + frame = decoded_frames_get_free(&p->df); + if (!frame) { + p->result = AVERROR(ENOMEM); + goto alloc_fail; + } + + /* do the actual decoding */ + ret = ff_decode_receive_frame(avctx, frame); + if (ret == 0) + p->df.nb_f++; + else if (ret < 0 && frame->buf[0]) + ff_thread_release_buffer(avctx, frame); - if ((p->result < 0 || !p->got_frame) && p->frame->buf[0]) - ff_thread_release_buffer(avctx, p->frame); + p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret; + } if (atomic_load(&p->state) == STATE_SETTING_UP) ff_thread_finish_setup(avctx); +alloc_fail: if (p->hwaccel_serializing) { p->hwaccel_serializing = 0; pthread_mutex_unlock(&p->parent->hwaccel_mutex); @@ -410,23 +488,25 @@ static void release_delayed_buffers(PerThreadContext *p) #endif static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, - AVPacket *avpkt) + AVPacket *in_pkt) { FrameThreadContext *fctx = p->parent; PerThreadContext *prev_thread = fctx->prev_thread; - const AVCodec *codec = p->avctx->codec; - int ret; - - if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY)) - return 0; + int err; pthread_mutex_lock(&p->mutex); - ret = update_context_from_user(p->avctx, user_avctx); - if (ret) { + av_packet_unref(p->avpkt); + av_packet_move_ref(p->avpkt, in_pkt); + if (!p->avpkt->size) + p->avctx->internal->draining = 1; + + err = update_context_from_user(p->avctx, user_avctx); + if (err < 0) { pthread_mutex_unlock(&p->mutex); - return ret; + return err; } + atomic_store_explicit(&p->debug_threads, (p->avctx->debug & FF_DEBUG_THREADS) != 0, memory_order_relaxed); @@ -436,7 +516,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, #endif if (prev_thread) { - int err; if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) { pthread_mutex_lock(&prev_thread->progress_mutex); while (atomic_load(&prev_thread->state) == STATE_SETTING_UP) @@ -451,14 +530,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, } } - av_packet_unref(p->avpkt); - ret = av_packet_ref(p->avpkt, avpkt); - if (ret < 0) { - pthread_mutex_unlock(&p->mutex); - av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in submit_packet()\n"); - return ret; - } - atomic_store(&p->state, STATE_SETTING_UP); pthread_cond_signal(&p->input_cond); pthread_mutex_unlock(&p->mutex); @@ -502,57 +573,42 @@ FF_ENABLE_DEPRECATION_WARNINGS #endif fctx->prev_thread = p; - fctx->next_decoding++; + fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count; return 0; } -int ff_thread_decode_frame(AVCodecContext *avctx, - AVFrame *picture, int *got_picture_ptr, - AVPacket *avpkt) +int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame) { FrameThreadContext *fctx = avctx->internal->thread_ctx; - int finished = fctx->next_finished; - PerThreadContext *p; - int err; + int ret = 0; /* release the async lock, permitting blocked hwaccel threads to * go forward while we are in this function */ async_unlock(fctx); - /* - * Submit a packet to the next decoding thread. - */ - - p = &fctx->threads[fctx->next_decoding]; - err = submit_packet(p, avctx, avpkt); - if (err) - goto finish; + /* submit packets to threads while there are no buffered results to return */ + while (!fctx->df.nb_f && !fctx->result) { + PerThreadContext *p; - /* - * If we're still receiving the initial packets, don't return a frame. - */ - - if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1))) - fctx->delaying = 0; + /* get a packet to be submitted to the next thread */ + av_packet_unref(fctx->next_pkt); + ret = ff_decode_get_packet(avctx, fctx->next_pkt); + if (ret < 0 && ret != AVERROR_EOF) + goto finish; - if (fctx->delaying) { - *got_picture_ptr=0; - if (avpkt->size) { - err = avpkt->size; + ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx, + fctx->next_pkt); + if (ret < 0) goto finish; - } - } - /* - * Return the next available frame from the oldest thread. - * If we're at the end of the stream, then we have to skip threads that - * didn't output a frame/error, because we don't want to accidentally signal - * EOF (avpkt->size == 0 && *got_picture_ptr == 0 && err >= 0). - */ + /* do not return any frames until all threads have something to do */ + if (fctx->next_decoding != fctx->next_finished && + !avctx->internal->draining) + continue; - do { - p = &fctx->threads[finished++]; + p = &fctx->threads[fctx->next_finished]; + fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count; if (atomic_load(&p->state) != STATE_INPUT_READY) { pthread_mutex_lock(&p->progress_mutex); @@ -561,35 +617,26 @@ int ff_thread_decode_frame(AVCodecContext *avctx, pthread_mutex_unlock(&p->progress_mutex); } - av_frame_move_ref(picture, p->frame); - *got_picture_ptr = p->got_frame; - picture->pkt_dts = p->avpkt->dts; - err = p->result; - - /* - * A later call with avkpt->size == 0 may loop over all threads, - * including this one, searching for a frame/error to return before being - * stopped by the "finished != fctx->next_finished" condition. - * Make sure we don't mistakenly return the same frame/error again. - */ - p->got_frame = 0; - p->result = 0; - - if (finished >= avctx->thread_count) finished = 0; - } while (!avpkt->size && !*got_picture_ptr && err >= 0 && finished != fctx->next_finished); - - update_context_from_thread(avctx, p->avctx, 1); + fctx->result = p->result; + p->result = 0; - if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0; + if (p->df.nb_f) + FFSWAP(DecodedFrames, fctx->df, p->df); + } - fctx->next_finished = finished; + /* a thread may return multiple frames AND an error + * we first return all the frames, then the error */ + if (fctx->df.nb_f) { + decoded_frames_pop(&fctx->df, frame); + ret = 0; + } else { + ret = fctx->result; + fctx->result = 0; + } - /* return the size of the consumed packet if no error occurred */ - if (err >= 0) - err = avpkt->size; finish: async_lock(fctx); - return err; + return ret; } void ff_thread_report_progress(ThreadFrame *f, int n, int field) @@ -681,7 +728,6 @@ static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count pthread_cond_wait(&p->output_cond, &p->progress_mutex); pthread_mutex_unlock(&p->progress_mutex); } - p->got_frame = 0; } async_lock(fctx); @@ -742,6 +788,8 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) av_freep(&ctx->priv_data); } + av_packet_free(&ctx->internal->in_pkt); + av_freep(&ctx->slice_offset); av_buffer_unref(&ctx->internal->pool); @@ -749,7 +797,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) av_buffer_unref(&ctx->hw_frames_ctx); } - av_frame_free(&p->frame); + decoded_frames_free(&p->df); ff_pthread_free(p, per_thread_offsets); av_packet_free(&p->avpkt); @@ -757,6 +805,9 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) av_freep(&p->avctx); } + decoded_frames_free(&fctx->df); + av_packet_free(&fctx->next_pkt); + av_freep(&fctx->threads); ff_pthread_free(fctx, thread_ctx_offsets); @@ -808,14 +859,18 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free, if (err < 0) return err; - if (!(p->frame = av_frame_alloc()) || - !(p->avpkt = av_packet_alloc())) + if (!(p->avpkt = av_packet_alloc())) return AVERROR(ENOMEM); copy->internal->last_pkt_props = p->avpkt; + copy->internal->is_frame_mt = 1; if (!first) copy->internal->is_copy = 1; + copy->internal->in_pkt = av_packet_alloc(); + if (!copy->internal->in_pkt) + return AVERROR(ENOMEM); + if (codec->init) { err = codec->init(copy); if (err < 0) { @@ -871,6 +926,10 @@ int ff_frame_thread_init(AVCodecContext *avctx) return err; } + fctx->next_pkt = av_packet_alloc(); + if (!fctx->next_pkt) + return AVERROR(ENOMEM); + fctx->async_lock = 1; fctx->delaying = 1; @@ -915,12 +974,13 @@ void ff_thread_flush(AVCodecContext *avctx) fctx->next_decoding = fctx->next_finished = 0; fctx->delaying = 1; fctx->prev_thread = NULL; + + decoded_frames_flush(&fctx->df); + for (i = 0; i < avctx->thread_count; i++) { PerThreadContext *p = &fctx->threads[i]; - // Make sure decode flush calls with size=0 won't return old frames - p->got_frame = 0; - av_frame_unref(p->frame); - p->result = 0; + + decoded_frames_flush(&p->df); #if FF_API_THREAD_SAFE_CALLBACKS release_delayed_buffers(p); @@ -1144,3 +1204,15 @@ void ff_thread_release_ext_buffer(AVCodecContext *avctx, ThreadFrame *f) f->owner[0] = f->owner[1] = NULL; ff_thread_release_buffer(avctx, f->f); } + +int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt) +{ + PerThreadContext *p = avctx->internal->thread_ctx; + + if (p->avpkt->buf) { + av_packet_move_ref(pkt, p->avpkt); + return 0; + } + + return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN); +} |