From fc9c857c2d1b61dbbd104750ca1533c4a4658655 Mon Sep 17 00:00:00 2001 From: Nicolas George Date: Thu, 20 Feb 2014 14:14:53 +0100 Subject: ffmpeg: use thread message API. --- ffmpeg.c | 98 ++++++++++++++++++---------------------------------------------- 1 file changed, 27 insertions(+), 71 deletions(-) (limited to 'ffmpeg.c') diff --git a/ffmpeg.c b/ffmpeg.c index 5299f0ef0f..a0e2be294f 100644 --- a/ffmpeg.c +++ b/ffmpeg.c @@ -59,6 +59,7 @@ #include "libavutil/timestamp.h" #include "libavutil/bprint.h" #include "libavutil/time.h" +#include "libavutil/threadmessage.h" #include "libavformat/os_support.h" #include "libavformat/ffm.h" // not public API @@ -132,11 +133,6 @@ AVIOContext *progress_avio = NULL; static uint8_t *subtitle_out; -#if HAVE_PTHREADS -/* signal to input threads that they should exit; set by the main thread */ -static int transcoding_finished; -#endif - #define DEFAULT_PASS_LOGFILENAME_PREFIX "ffmpeg2pass" InputStream **input_streams = NULL; @@ -3105,32 +3101,31 @@ static void *input_thread(void *arg) InputFile *f = arg; int ret = 0; - while (!transcoding_finished && ret >= 0) { + while (1) { AVPacket pkt; ret = av_read_frame(f->ctx, &pkt); if (ret == AVERROR(EAGAIN)) { av_usleep(10000); - ret = 0; continue; - } else if (ret < 0) + } + if (ret < 0) { + av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); break; - - pthread_mutex_lock(&f->fifo_lock); - while (!av_fifo_space(f->fifo)) - pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); - + } av_dup_packet(&pkt); - av_fifo_generic_write(f->fifo, &pkt, sizeof(pkt), NULL); - pthread_cond_signal(&f->fifo_cond); - - pthread_mutex_unlock(&f->fifo_lock); + ret = av_thread_message_queue_send(f->in_thread_queue, &pkt, 0); + if (ret < 0) { + if (ret != AVERROR_EOF) + av_log(f->ctx, AV_LOG_ERROR, + "Unable to send packet to main thread: %s\n", + av_err2str(ret)); + av_free_packet(&pkt); + av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); + break; + } } - pthread_mutex_lock(&f->fifo_lock); - f->finished = 1; - pthread_cond_signal(&f->fifo_cond); - pthread_mutex_unlock(&f->fifo_lock); return NULL; } @@ -3138,34 +3133,19 @@ static void free_input_threads(void) { int i; - if (nb_input_files == 1) - return; - - transcoding_finished = 1; - for (i = 0; i < nb_input_files; i++) { InputFile *f = input_files[i]; AVPacket pkt; - if (!f->fifo || f->joined) + if (!f->in_thread_queue) continue; - - pthread_mutex_lock(&f->fifo_lock); - while (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); + av_thread_message_queue_set_err_send(f->in_thread_queue, AVERROR_EOF); + while (av_thread_message_queue_recv(f->in_thread_queue, &pkt, 0) >= 0) av_free_packet(&pkt); - } - pthread_cond_signal(&f->fifo_cond); - pthread_mutex_unlock(&f->fifo_lock); pthread_join(f->thread, NULL); f->joined = 1; - - while (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); - av_free_packet(&pkt); - } - av_fifo_freep(&f->fifo); + av_thread_message_queue_free(&f->in_thread_queue); } } @@ -3179,15 +3159,13 @@ static int init_input_threads(void) for (i = 0; i < nb_input_files; i++) { InputFile *f = input_files[i]; - if (!(f->fifo = av_fifo_alloc_array(8, sizeof(AVPacket)))) - return AVERROR(ENOMEM); - if (f->ctx->pb ? !f->ctx->pb->seekable : strcmp(f->ctx->iformat->name, "lavfi")) f->non_blocking = 1; - - pthread_mutex_init(&f->fifo_lock, NULL); - pthread_cond_init (&f->fifo_cond, NULL); + ret = av_thread_message_queue_alloc(&f->in_thread_queue, + 8, sizeof(AVPacket)); + if (ret < 0) + return ret; if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) return AVERROR(ret); @@ -3197,31 +3175,9 @@ static int init_input_threads(void) static int get_input_packet_mt(InputFile *f, AVPacket *pkt) { - int ret = 0; - - pthread_mutex_lock(&f->fifo_lock); - - while (1) { - if (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, pkt, sizeof(*pkt), NULL); - pthread_cond_signal(&f->fifo_cond); - break; - } else { - if (f->finished) { - ret = AVERROR_EOF; - break; - } - if (f->non_blocking) { - ret = AVERROR(EAGAIN); - break; - } - pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); - } - } - - pthread_mutex_unlock(&f->fifo_lock); - - return ret; + return av_thread_message_queue_recv(f->in_thread_queue, pkt, + f->non_blocking ? + AV_THREAD_MESSAGE_NONBLOCK : 0); } #endif -- cgit v1.2.3