From 2d924b3a630869c65fe0c76568910500f54ed057 Mon Sep 17 00:00:00 2001 From: Anton Khirnov Date: Thu, 31 Mar 2022 17:33:48 +0200 Subject: fftools/ffmpeg: move each muxer to a separate thread --- fftools/ffmpeg_mux.c | 299 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 215 insertions(+), 84 deletions(-) (limited to 'fftools/ffmpeg_mux.c') diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 2abadd3f9b..df9cb73d0e 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -16,17 +16,21 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include #include #include #include "ffmpeg.h" +#include "objpool.h" #include "sync_queue.h" +#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" +#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -51,13 +55,18 @@ typedef struct MuxStream { struct Muxer { AVFormatContext *fc; + pthread_t thread; + ThreadQueue *tq; + MuxStream *streams; AVDictionary *opts; + int thread_queue_size; + /* filesize limit expressed in bytes */ int64_t limit_filesize; - int64_t final_filesize; + atomic_int_least64_t last_filesize; int header_written; AVPacket *sq_pkt; @@ -65,15 +74,6 @@ struct Muxer { static int want_sdp = 1; -static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream, OSTFinished others) -{ - int i; - for (i = 0; i < nb_output_streams; i++) { - OutputStream *ost2 = output_streams[i]; - ost2->finished |= ost == ost2 ? this_stream : others; - } -} - static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; @@ -116,13 +116,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) return 0; } +static int64_t filesize(AVIOContext *pb) +{ + int64_t ret = -1; + + if (pb) { + ret = avio_size(pb); + if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too + ret = avio_tell(pb); + } + + return ret; +} + static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; AVFormatContext *s = of->mux->fc; AVStream *st = ost->st; + int64_t fs; int ret; + fs = filesize(s->pb); + atomic_store(&of->mux->last_filesize, fs); + if (fs >= of->mux->limit_filesize) + return AVERROR_EOF; + if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->vsync_method == VSYNC_DROP) || (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0)) pkt->pts = pkt->dts = AV_NOPTS_VALUE; @@ -175,7 +194,7 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) ms->last_mux_dts = pkt->dts; ost->data_size += pkt->size; - ost->packets_written++; + atomic_fetch_add(&ost->packets_written, 1); pkt->stream_index = ost->index; @@ -193,66 +212,81 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) ret = av_interleaved_write_frame(s, pkt); if (ret < 0) { print_error("av_interleaved_write_frame()", ret); - main_return_code = 1; - close_all_output_streams(ost, MUXER_FINISHED | ENCODER_FINISHED, ENCODER_FINISHED); return ret; } return 0; } -static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) +static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt) { if (ost->sq_idx_mux >= 0) { int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt)); - if (ret < 0) { - if (pkt) - av_packet_unref(pkt); - if (ret == AVERROR_EOF) { - ost->finished |= MUXER_FINISHED; - return 0; - } else - return ret; - } + if (ret < 0) + return ret; while (1) { ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt)); - if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) - return 0; - else if (ret < 0) - return ret; + if (ret < 0) + return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret; ret = write_packet(of, output_streams[of->ost_index + ret], of->mux->sq_pkt); if (ret < 0) return ret; } - } else { - if (pkt) - return write_packet(of, ost, pkt); - - ost->finished |= MUXER_FINISHED; - } + } else if (pkt) + return write_packet(of, ost, pkt); return 0; } -int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) +static void *muxer_thread(void *arg) { - int ret; + OutputFile *of = arg; + Muxer *mux = of->mux; + AVPacket *pkt = NULL; + int ret = 0; + + pkt = av_packet_alloc(); + if (!pkt) { + ret = AVERROR(ENOMEM); + goto finish; + } - if (of->mux->header_written) { - return submit_packet(of, ost, pkt); - } else { - /* the muxer is not initialized yet, buffer the packet */ - ret = queue_packet(of, ost, pkt); - if (ret < 0) { - av_packet_unref(pkt); - return ret; + while (1) { + OutputStream *ost; + int stream_idx; + + ret = tq_receive(mux->tq, &stream_idx, pkt); + if (stream_idx < 0) { + av_log(NULL, AV_LOG_VERBOSE, + "All streams finished for output file #%d\n", of->index); + ret = 0; + break; + } + + ost = output_streams[of->ost_index + stream_idx]; + ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt); + av_packet_unref(pkt); + if (ret == AVERROR_EOF) + tq_receive_finish(mux->tq, stream_idx); + else if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Error muxing a packet for output file #%d\n", of->index); + break; } } - return 0; +finish: + av_packet_free(&pkt); + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_receive_finish(mux->tq, i); + + av_log(NULL, AV_LOG_VERBOSE, "Terminating muxer thread %d\n", of->index); + + return (void*)(intptr_t)ret; } static int print_sdp(void) @@ -303,11 +337,125 @@ static int print_sdp(void) av_freep(&sdp_filename); } + // SDP successfully written, allow muxer threads to start + ret = 1; + fail: av_freep(&avc); return ret; } +static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) +{ + Muxer *mux = of->mux; + int ret = 0; + + if (!pkt || ost->finished & MUXER_FINISHED) + goto finish; + + ret = tq_send(mux->tq, ost->index, pkt); + if (ret < 0) + goto finish; + + return 0; + +finish: + if (pkt) + av_packet_unref(pkt); + + ost->finished |= MUXER_FINISHED; + tq_send_finish(mux->tq, ost->index); + return ret == AVERROR_EOF ? 0 : ret; +} + +int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) +{ + int ret; + + if (of->mux->tq) { + return submit_packet(of, ost, pkt); + } else { + /* the muxer is not initialized yet, buffer the packet */ + ret = queue_packet(of, ost, pkt); + if (ret < 0) { + av_packet_unref(pkt); + return ret; + } + } + + return 0; +} + +static int thread_stop(OutputFile *of) +{ + Muxer *mux = of->mux; + void *ret; + + if (!mux || !mux->tq) + return 0; + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_send_finish(mux->tq, i); + + pthread_join(mux->thread, &ret); + + tq_free(&mux->tq); + + return (int)(intptr_t)ret; +} + +static void pkt_move(void *dst, void *src) +{ + av_packet_move_ref(dst, src); +} + +static int thread_start(OutputFile *of) +{ + Muxer *mux = of->mux; + AVFormatContext *fc = mux->fc; + ObjPool *op; + int ret; + + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move); + if (!mux->tq) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of); + if (ret) { + tq_free(&mux->tq); + return AVERROR(ret); + } + + /* flush the muxing queues */ + for (int i = 0; i < fc->nb_streams; i++) { + MuxStream *ms = &of->mux->streams[i]; + OutputStream *ost = output_streams[of->ost_index + i]; + AVPacket *pkt; + + /* try to improve muxing time_base (only possible if nothing has been written yet) */ + if (!av_fifo_can_read(ms->muxing_queue)) + ost->mux_timebase = ost->st->time_base; + + while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { + ret = submit_packet(of, ost, pkt); + if (pkt) { + ms->muxing_queue_data_size -= pkt->size; + av_packet_free(&pkt); + } + if (ret < 0) + return ret; + } + } + + return 0; +} + /* open the muxer when all the streams are initialized */ int of_check_init(OutputFile *of) { @@ -339,28 +487,19 @@ int of_check_init(OutputFile *of) if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); return ret; - } - } - - /* flush the muxing queues */ - for (i = 0; i < fc->nb_streams; i++) { - MuxStream *ms = &of->mux->streams[i]; - OutputStream *ost = output_streams[of->ost_index + i]; - AVPacket *pkt; - - /* try to improve muxing time_base (only possible if nothing has been written yet) */ - if (!av_fifo_can_read(ms->muxing_queue)) - ost->mux_timebase = ost->st->time_base; - - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { - ret = submit_packet(of, ost, pkt); - if (pkt) { - ms->muxing_queue_data_size -= pkt->size; - av_packet_free(&pkt); + } else if (ret == 1) { + /* SDP is written only after all the muxers are ready, so now we + * start ALL the threads */ + for (i = 0; i < nb_output_files; i++) { + ret = thread_start(output_files[i]); + if (ret < 0) + return ret; } - if (ret < 0) - return ret; } + } else { + ret = thread_start(of); + if (ret < 0) + return ret; } return 0; @@ -371,7 +510,7 @@ int of_write_trailer(OutputFile *of) AVFormatContext *fc = of->mux->fc; int ret; - if (!of->mux->header_written) { + if (!of->mux->tq) { av_log(NULL, AV_LOG_ERROR, "Nothing was written into output file %d (%s), because " "at least one of its streams received no packets.\n", @@ -379,13 +518,17 @@ int of_write_trailer(OutputFile *of) return AVERROR(EINVAL); } + ret = thread_stop(of); + if (ret < 0) + main_return_code = ret; + ret = av_write_trailer(fc); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing trailer of %s: %s\n", fc->url, av_err2str(ret)); return ret; } - of->mux->final_filesize = of_filesize(of); + of->mux->last_filesize = filesize(fc->pb); if (!(of->format->flags & AVFMT_NOFILE)) { ret = avio_closep(&fc->pb); @@ -448,6 +591,8 @@ void of_close(OutputFile **pof) if (!of) return; + thread_stop(of); + sq_free(&of->sq_encode); sq_free(&of->sq_mux); @@ -457,7 +602,8 @@ void of_close(OutputFile **pof) } int of_muxer_init(OutputFile *of, AVFormatContext *fc, - AVDictionary *opts, int64_t limit_filesize) + AVDictionary *opts, int64_t limit_filesize, + int thread_queue_size) { Muxer *mux = av_mallocz(sizeof(*mux)); int ret = 0; @@ -487,6 +633,7 @@ int of_muxer_init(OutputFile *of, AVFormatContext *fc, ms->last_mux_dts = AV_NOPTS_VALUE; } + mux->thread_queue_size = thread_queue_size > 0 ? thread_queue_size : 8; mux->limit_filesize = limit_filesize; mux->opts = opts; @@ -515,25 +662,9 @@ fail: return ret; } -int of_finished(OutputFile *of) -{ - return of_filesize(of) >= of->mux->limit_filesize; -} - int64_t of_filesize(OutputFile *of) { - AVIOContext *pb = of->mux->fc->pb; - int64_t ret = -1; - - if (of->mux->final_filesize) - ret = of->mux->final_filesize; - else if (pb) { - ret = avio_size(pb); - if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too - ret = avio_tell(pb); - } - - return ret; + return atomic_load(&of->mux->last_filesize); } AVChapter * const * -- cgit v1.2.3