diff options
Diffstat (limited to 'libavcodec/pthread.c')
-rw-r--r-- | libavcodec/pthread.c | 221 |
1 files changed, 192 insertions, 29 deletions
diff --git a/libavcodec/pthread.c b/libavcodec/pthread.c index ce969af166..c9c275adfa 100644 --- a/libavcodec/pthread.c +++ b/libavcodec/pthread.c @@ -6,20 +6,20 @@ * to Michael Niedermayer <michaelni@gmx.at> for writing initial * implementation. * - * This file is part of Libav. + * This file is part of FFmpeg. * - * Libav is free software; you can redistribute it and/or + * FFmpeg is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * - * Libav is distributed in the hope that it will be useful, + * FFmpeg is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public - * License along with Libav; if not, write to the Free Software + * License along with FFmpeg; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ @@ -43,6 +43,8 @@ #include <pthread.h> #elif HAVE_W32THREADS #include "compat/w32pthreads.h" +#elif HAVE_OS2THREADS +#include "compat/os2threads.h" #endif typedef int (action_func)(AVCodecContext *c, void *arg); @@ -64,6 +66,12 @@ typedef struct ThreadContext { unsigned current_execute; int current_job; int done; + + int *entries; + int entries_count; + int thread_count; + pthread_cond_t *progress_cond; + pthread_mutex_t *progress_mutex; } ThreadContext; /** @@ -98,6 +106,10 @@ typedef struct PerThreadContext { * Set when the codec calls get_buffer(). * State is returned to STATE_SETTING_UP afterwards. */ + STATE_GET_FORMAT, /**< + * Set when the codec calls get_format(). + * State is returned to STATE_SETTING_UP afterwards. + */ STATE_SETUP_FINISHED ///< Set after the codec has called ff_thread_finish_setup(). } state; @@ -111,6 +123,9 @@ typedef struct PerThreadContext { AVFrame *requested_frame; ///< AVFrame the codec passed to get_buffer() int requested_flags; ///< flags passed to get_buffer() for requested_frame + + const enum AVPixelFormat *available_formats; ///< Format array for get_format() + enum AVPixelFormat result_format; ///< get_format() result } PerThreadContext; /** @@ -249,7 +264,8 @@ static int thread_init_internal(AVCodecContext *avctx) if (!thread_count) { int nb_cpus = av_cpu_count(); - av_log(avctx, AV_LOG_DEBUG, "detected %d logical cores\n", nb_cpus); + if (avctx->height) + nb_cpus = FFMIN(nb_cpus, (avctx->height+15)/16); // use number of cores + 1 as thread count if there is more than one if (nb_cpus > 1) thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS); @@ -297,6 +313,9 @@ static int thread_init_internal(AVCodecContext *avctx) return 0; } +#define THREAD_SAFE_CALLBACKS(avctx) \ +((avctx)->thread_safe_callbacks || (!(avctx)->get_buffer && (avctx)->get_buffer2 == avcodec_default_get_buffer2)) + /** * Codec worker thread. * @@ -311,20 +330,16 @@ static attribute_align_arg void *frame_worker_thread(void *arg) AVCodecContext *avctx = p->avctx; const AVCodec *codec = avctx->codec; + pthread_mutex_lock(&p->mutex); while (1) { - if (p->state == STATE_INPUT_READY && !fctx->die) { - pthread_mutex_lock(&p->mutex); while (p->state == STATE_INPUT_READY && !fctx->die) pthread_cond_wait(&p->input_cond, &p->mutex); - pthread_mutex_unlock(&p->mutex); - } if (fctx->die) break; - if (!codec->update_thread_context && avctx->thread_safe_callbacks) + if (!codec->update_thread_context && THREAD_SAFE_CALLBACKS(avctx)) ff_thread_finish_setup(avctx); - pthread_mutex_lock(&p->mutex); avcodec_get_frame_defaults(&p->frame); p->got_frame = 0; p->result = codec->decode(avctx, &p->frame, &p->got_frame, &p->avpkt); @@ -335,14 +350,21 @@ static attribute_align_arg void *frame_worker_thread(void *arg) if (p->state == STATE_SETTING_UP) ff_thread_finish_setup(avctx); + pthread_mutex_lock(&p->progress_mutex); +#if 0 //BUFREF-FIXME + for (i = 0; i < MAX_BUFFERS; i++) + if (p->progress_used[i] && (p->got_frame || p->result<0 || avctx->codec_id != AV_CODEC_ID_H264)) { + p->progress[i][0] = INT_MAX; + p->progress[i][1] = INT_MAX; + } +#endif p->state = STATE_INPUT_READY; - pthread_mutex_lock(&p->progress_mutex); + pthread_cond_broadcast(&p->progress_cond); pthread_cond_signal(&p->output_cond); pthread_mutex_unlock(&p->progress_mutex); - - pthread_mutex_unlock(&p->mutex); } + pthread_mutex_unlock(&p->mutex); return NULL; } @@ -388,9 +410,15 @@ static int update_context_from_thread(AVCodecContext *dst, AVCodecContext *src, dst->hwaccel = src->hwaccel; dst->hwaccel_context = src->hwaccel_context; + + dst->channels = src->channels; + dst->sample_rate = src->sample_rate; + dst->sample_fmt = src->sample_fmt; + dst->channel_layout = src->channel_layout; } if (for_user) { + dst->delay = src->thread_count - 1; dst->coded_frame = src->coded_frame; } else { if (dst->codec->update_thread_context) @@ -423,6 +451,7 @@ FF_ENABLE_DEPRECATION_WARNINGS dst->opaque = src->opaque; dst->debug = src->debug; + dst->debug_mv = src->debug_mv; dst->slice_flags = src->slice_flags; dst->flags2 = src->flags2; @@ -431,6 +460,7 @@ FF_ENABLE_DEPRECATION_WARNINGS dst->frame_number = src->frame_number; dst->reordered_opaque = src->reordered_opaque; + dst->thread_safe_callbacks = src->thread_safe_callbacks; if (src->slice_count && src->slice_offset) { if (dst->slice_count < src->slice_count) { @@ -461,7 +491,8 @@ static void release_delayed_buffers(PerThreadContext *p) pthread_mutex_lock(&fctx->buffer_mutex); // fix extended data in case the caller screwed it up - av_assert0(p->avctx->codec_type == AVMEDIA_TYPE_VIDEO); + av_assert0(p->avctx->codec_type == AVMEDIA_TYPE_VIDEO || + p->avctx->codec_type == AVMEDIA_TYPE_AUDIO); f = &p->released_buffers[--p->num_released_buffers]; f->extended_data = f->data; av_frame_unref(f); @@ -521,18 +552,30 @@ static int submit_packet(PerThreadContext *p, AVPacket *avpkt) FF_DISABLE_DEPRECATION_WARNINGS if (!p->avctx->thread_safe_callbacks && ( + p->avctx->get_format != avcodec_default_get_format || #if FF_API_GET_BUFFER p->avctx->get_buffer || #endif p->avctx->get_buffer2 != avcodec_default_get_buffer2)) { FF_ENABLE_DEPRECATION_WARNINGS while (p->state != STATE_SETUP_FINISHED && p->state != STATE_INPUT_READY) { + int call_done = 1; pthread_mutex_lock(&p->progress_mutex); while (p->state == STATE_SETTING_UP) pthread_cond_wait(&p->progress_cond, &p->progress_mutex); - if (p->state == STATE_GET_BUFFER) { + switch (p->state) { + case STATE_GET_BUFFER: p->result = ff_get_buffer(p->avctx, p->requested_frame, p->requested_flags); + break; + case STATE_GET_FORMAT: + p->result_format = p->avctx->get_format(p->avctx, p->available_formats); + break; + default: + call_done = 0; + break; + } + if (call_done) { p->state = STATE_SETTING_UP; pthread_cond_signal(&p->progress_cond); } @@ -569,9 +612,10 @@ int ff_thread_decode_frame(AVCodecContext *avctx, * If we're still receiving the initial packets, don't return a frame. */ - if (fctx->delaying) { - if (fctx->next_decoding >= (avctx->thread_count-1)) fctx->delaying = 0; + if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1))) + fctx->delaying = 0; + if (fctx->delaying) { *got_picture_ptr=0; if (avpkt->size) return avpkt->size; @@ -622,7 +666,7 @@ int ff_thread_decode_frame(AVCodecContext *avctx, void ff_thread_report_progress(ThreadFrame *f, int n, int field) { PerThreadContext *p; - int *progress = f->progress ? (int*)f->progress->data : NULL; + volatile int *progress = f->progress ? (int*)f->progress->data : NULL; if (!progress || progress[field] >= n) return; @@ -640,7 +684,7 @@ void ff_thread_report_progress(ThreadFrame *f, int n, int field) void ff_thread_await_progress(ThreadFrame *f, int n, int field) { PerThreadContext *p; - int *progress = f->progress ? (int*)f->progress->data : NULL; + volatile int *progress = f->progress ? (int*)f->progress->data : NULL; if (!progress || progress[field] >= n) return; @@ -660,6 +704,10 @@ void ff_thread_finish_setup(AVCodecContext *avctx) { if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return; + if(p->state == STATE_SETUP_FINISHED){ + av_log(avctx, AV_LOG_WARNING, "Multiple ff_thread_finish_setup() calls\n"); + } + pthread_mutex_lock(&p->progress_mutex); p->state = STATE_SETUP_FINISHED; pthread_cond_broadcast(&p->progress_cond); @@ -680,6 +728,7 @@ static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count pthread_cond_wait(&p->output_cond, &p->progress_mutex); pthread_mutex_unlock(&p->progress_mutex); } + p->got_frame = 0; } } @@ -692,7 +741,11 @@ static void frame_thread_free(AVCodecContext *avctx, int thread_count) park_frame_worker_threads(fctx, thread_count); if (fctx->prev_thread && fctx->prev_thread != fctx->threads) - update_context_from_thread(fctx->threads->avctx, fctx->prev_thread->avctx, 0); + if (update_context_from_thread(fctx->threads->avctx, fctx->prev_thread->avctx, 0) < 0) { + av_log(avctx, AV_LOG_ERROR, "Final thread update failed\n"); + fctx->prev_thread->avctx->internal->is_copy = fctx->threads->avctx->internal->is_copy; + fctx->threads->avctx->internal->is_copy = 1; + } fctx->die = 1; @@ -705,6 +758,7 @@ static void frame_thread_free(AVCodecContext *avctx, int thread_count) if (p->thread_init) pthread_join(p->thread, NULL); + p->thread_init=0; if (codec->close) codec->close(p->avctx); @@ -751,7 +805,8 @@ static int frame_thread_init(AVCodecContext *avctx) if (!thread_count) { int nb_cpus = av_cpu_count(); - av_log(avctx, AV_LOG_DEBUG, "detected %d logical cores\n", nb_cpus); + if ((avctx->debug & (FF_DEBUG_VIS_QP | FF_DEBUG_VIS_MB_TYPE)) || avctx->debug_mv) + nb_cpus = 1; // use number of cores + 1 as thread count if there is more than one if (nb_cpus > 1) thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS); @@ -820,8 +875,10 @@ static int frame_thread_init(AVCodecContext *avctx) if (err) goto error; - if (!pthread_create(&p->thread, NULL, frame_worker_thread, p)) - p->thread_init = 1; + err = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p)); + p->thread_init= !err; + if(!p->thread_init) + goto error; } return 0; @@ -860,18 +917,30 @@ void ff_thread_flush(AVCodecContext *avctx) } } -int ff_thread_get_buffer(AVCodecContext *avctx, ThreadFrame *f, int flags) +int ff_thread_can_start_frame(AVCodecContext *avctx) +{ + PerThreadContext *p = avctx->thread_opaque; + if ((avctx->active_thread_type&FF_THREAD_FRAME) && p->state != STATE_SETTING_UP && + (avctx->codec->update_thread_context || !THREAD_SAFE_CALLBACKS(avctx))) { + return 0; + } + return 1; +} + +static int thread_get_buffer_internal(AVCodecContext *avctx, ThreadFrame *f, int flags) { PerThreadContext *p = avctx->thread_opaque; int err; f->owner = avctx; + ff_init_buffer_info(avctx, f->f); + if (!(avctx->active_thread_type & FF_THREAD_FRAME)) return ff_get_buffer(avctx, f->f, flags); if (p->state != STATE_SETTING_UP && - (avctx->codec->update_thread_context || !avctx->thread_safe_callbacks)) { + (avctx->codec->update_thread_context || !THREAD_SAFE_CALLBACKS(avctx))) { av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n"); return -1; } @@ -888,6 +957,7 @@ int ff_thread_get_buffer(AVCodecContext *avctx, ThreadFrame *f, int flags) } pthread_mutex_lock(&p->parent->buffer_mutex); + FF_DISABLE_DEPRECATION_WARNINGS if (avctx->thread_safe_callbacks || ( #if FF_API_GET_BUFFER @@ -897,11 +967,11 @@ FF_DISABLE_DEPRECATION_WARNINGS FF_ENABLE_DEPRECATION_WARNINGS err = ff_get_buffer(avctx, f->f, flags); } else { + pthread_mutex_lock(&p->progress_mutex); p->requested_frame = f->f; p->requested_flags = flags; p->state = STATE_GET_BUFFER; - pthread_mutex_lock(&p->progress_mutex); - pthread_cond_signal(&p->progress_cond); + pthread_cond_broadcast(&p->progress_cond); while (p->state != STATE_SETTING_UP) pthread_cond_wait(&p->progress_cond, &p->progress_mutex); @@ -911,7 +981,7 @@ FF_ENABLE_DEPRECATION_WARNINGS pthread_mutex_unlock(&p->progress_mutex); } - if (!avctx->thread_safe_callbacks && !avctx->codec->update_thread_context) + if (!THREAD_SAFE_CALLBACKS(avctx) && !avctx->codec->update_thread_context) ff_thread_finish_setup(avctx); if (err) @@ -922,6 +992,40 @@ FF_ENABLE_DEPRECATION_WARNINGS return err; } +enum AVPixelFormat ff_thread_get_format(AVCodecContext *avctx, const enum AVPixelFormat *fmt) +{ + enum AVPixelFormat res; + PerThreadContext *p = avctx->thread_opaque; + if (!(avctx->active_thread_type & FF_THREAD_FRAME) || avctx->thread_safe_callbacks || + avctx->get_format == avcodec_default_get_format) + return avctx->get_format(avctx, fmt); + if (p->state != STATE_SETTING_UP) { + av_log(avctx, AV_LOG_ERROR, "get_format() cannot be called after ff_thread_finish_setup()\n"); + return -1; + } + pthread_mutex_lock(&p->progress_mutex); + p->available_formats = fmt; + p->state = STATE_GET_FORMAT; + pthread_cond_broadcast(&p->progress_cond); + + while (p->state != STATE_SETTING_UP) + pthread_cond_wait(&p->progress_cond, &p->progress_mutex); + + res = p->result_format; + + pthread_mutex_unlock(&p->progress_mutex); + + return res; +} + +int ff_thread_get_buffer(AVCodecContext *avctx, ThreadFrame *f, int flags) +{ + int ret = thread_get_buffer_internal(avctx, f, flags); + if (ret < 0) + av_log(avctx, AV_LOG_ERROR, "thread_get_buffer() failed\n"); + return ret; +} + void ff_thread_release_buffer(AVCodecContext *avctx, ThreadFrame *f) { PerThreadContext *p = avctx->thread_opaque; @@ -1028,3 +1132,62 @@ void ff_thread_free(AVCodecContext *avctx) else thread_free(avctx); } + +void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n) +{ + ThreadContext *p = avctx->thread_opaque; + int *entries = p->entries; + + pthread_mutex_lock(&p->progress_mutex[thread]); + entries[field] +=n; + pthread_cond_signal(&p->progress_cond[thread]); + pthread_mutex_unlock(&p->progress_mutex[thread]); +} + +void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift) +{ + ThreadContext *p = avctx->thread_opaque; + int *entries = p->entries; + + if (!entries || !field) return; + + thread = thread ? thread - 1 : p->thread_count - 1; + + pthread_mutex_lock(&p->progress_mutex[thread]); + while ((entries[field - 1] - entries[field]) < shift){ + pthread_cond_wait(&p->progress_cond[thread], &p->progress_mutex[thread]); + } + pthread_mutex_unlock(&p->progress_mutex[thread]); +} + +int ff_alloc_entries(AVCodecContext *avctx, int count) +{ + int i; + + if (avctx->active_thread_type & FF_THREAD_SLICE) { + ThreadContext *p = avctx->thread_opaque; + p->thread_count = avctx->thread_count; + p->entries = av_mallocz(count * sizeof(int)); + + if (!p->entries) { + return AVERROR(ENOMEM); + } + + p->entries_count = count; + p->progress_mutex = av_malloc(p->thread_count * sizeof(pthread_mutex_t)); + p->progress_cond = av_malloc(p->thread_count * sizeof(pthread_cond_t)); + + for (i = 0; i < p->thread_count; i++) { + pthread_mutex_init(&p->progress_mutex[i], NULL); + pthread_cond_init(&p->progress_cond[i], NULL); + } + } + + return 0; +} + +void ff_reset_entries(AVCodecContext *avctx) +{ + ThreadContext *p = avctx->thread_opaque; + memset(p->entries, 0, p->entries_count * sizeof(int)); +} |