summaryrefslogtreecommitdiff
path: root/fftools
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2022-03-31 17:33:48 +0200
committerAnton Khirnov <anton@khirnov.net>2022-07-23 11:53:19 +0200
commit2d924b3a630869c65fe0c76568910500f54ed057 (patch)
tree046e6484309205ca7715ec0a66e5b77732cf536b /fftools
parent37c764df6730e8299c468dd7636c45da6e158ef3 (diff)
fftools/ffmpeg: move each muxer to a separate thread
Diffstat (limited to 'fftools')
-rw-r--r--fftools/ffmpeg.c18
-rw-r--r--fftools/ffmpeg.h7
-rw-r--r--fftools/ffmpeg_mux.c299
-rw-r--r--fftools/ffmpeg_opt.c4
4 files changed, 229 insertions, 99 deletions
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 4b651f9224..632ac25cb2 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -746,9 +746,6 @@ static void output_packet(OutputFile *of, AVPacket *pkt,
goto mux_fail;
}
- if (eof)
- ost->finished |= MUXER_FINISHED;
-
return;
mux_fail:
@@ -1532,7 +1529,7 @@ static void print_final_stats(int64_t total_size)
enum AVMediaType type = ost->enc_ctx->codec_type;
total_size += ost->data_size;
- total_packets += ost->packets_written;
+ total_packets += atomic_load(&ost->packets_written);
av_log(NULL, AV_LOG_VERBOSE, " Output stream #%d:%d (%s): ",
i, j, av_get_media_type_string(type));
@@ -1545,7 +1542,7 @@ static void print_final_stats(int64_t total_size)
}
av_log(NULL, AV_LOG_VERBOSE, "%"PRIu64" packets muxed (%"PRIu64" bytes); ",
- ost->packets_written, ost->data_size);
+ atomic_load(&ost->packets_written), ost->data_size);
av_log(NULL, AV_LOG_VERBOSE, "\n");
}
@@ -1613,7 +1610,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
}
if (!vid && enc->codec_type == AVMEDIA_TYPE_VIDEO) {
float fps;
- uint64_t frame_number = ost->packets_written;
+ uint64_t frame_number = atomic_load(&ost->packets_written);
fps = t > 1 ? frame_number / t : 0;
av_bprintf(&buf, "frame=%5"PRId64" fps=%3.*f q=%3.1f ",
@@ -3491,9 +3488,8 @@ static int need_output(void)
for (i = 0; i < nb_output_streams; i++) {
OutputStream *ost = output_streams[i];
- OutputFile *of = output_files[ost->file_index];
- if (ost->finished || of_finished(of))
+ if (ost->finished)
continue;
return 1;
@@ -4412,9 +4408,11 @@ static int transcode(void)
/* close each encoder */
for (i = 0; i < nb_output_streams; i++) {
+ uint64_t packets_written;
ost = output_streams[i];
- total_packets_written += ost->packets_written;
- if (!ost->packets_written && (abort_on_flags & ABORT_ON_FLAG_EMPTY_OUTPUT_STREAM)) {
+ packets_written = atomic_load(&ost->packets_written);
+ total_packets_written += packets_written;
+ if (!packets_written && (abort_on_flags & ABORT_ON_FLAG_EMPTY_OUTPUT_STREAM)) {
av_log(NULL, AV_LOG_FATAL, "Empty output on stream %d.\n", i);
exit_program(1);
}
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index d12577e992..0c9498c23e 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -21,6 +21,7 @@
#include "config.h"
+#include <stdatomic.h>
#include <stdint.h>
#include <stdio.h>
#include <signal.h>
@@ -557,7 +558,7 @@ typedef struct OutputStream {
// combined size of all the packets written
uint64_t data_size;
// number of packets send to the muxer
- uint64_t packets_written;
+ atomic_uint_least64_t packets_written;
// number of frames/samples sent to the encoder
uint64_t frames_encoded;
uint64_t samples_encoded;
@@ -699,14 +700,14 @@ int hw_device_setup_for_filter(FilterGraph *fg);
int hwaccel_decode_init(AVCodecContext *avctx);
int of_muxer_init(OutputFile *of, AVFormatContext *fc,
- AVDictionary *opts, int64_t limit_filesize);
+ AVDictionary *opts, int64_t limit_filesize,
+ int thread_queue_size);
/* open the muxer when all the streams are initialized */
int of_check_init(OutputFile *of);
int of_write_trailer(OutputFile *of);
void of_close(OutputFile **pof);
int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost);
-int of_finished(OutputFile *of);
int64_t of_filesize(OutputFile *of);
AVChapter * const *
of_get_chapters(OutputFile *of, unsigned int *nb_chapters);
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 2abadd3f9b..df9cb73d0e 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -16,17 +16,21 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#include <stdatomic.h>
#include <stdio.h>
#include <string.h>
#include "ffmpeg.h"
+#include "objpool.h"
#include "sync_queue.h"
+#include "thread_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
#include "libavutil/log.h"
#include "libavutil/mem.h"
#include "libavutil/timestamp.h"
+#include "libavutil/thread.h"
#include "libavcodec/packet.h"
@@ -51,13 +55,18 @@ typedef struct MuxStream {
struct Muxer {
AVFormatContext *fc;
+ pthread_t thread;
+ ThreadQueue *tq;
+
MuxStream *streams;
AVDictionary *opts;
+ int thread_queue_size;
+
/* filesize limit expressed in bytes */
int64_t limit_filesize;
- int64_t final_filesize;
+ atomic_int_least64_t last_filesize;
int header_written;
AVPacket *sq_pkt;
@@ -65,15 +74,6 @@ struct Muxer {
static int want_sdp = 1;
-static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream, OSTFinished others)
-{
- int i;
- for (i = 0; i < nb_output_streams; i++) {
- OutputStream *ost2 = output_streams[i];
- ost2->finished |= ost == ost2 ? this_stream : others;
- }
-}
-
static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
MuxStream *ms = &of->mux->streams[ost->index];
@@ -116,13 +116,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
return 0;
}
+static int64_t filesize(AVIOContext *pb)
+{
+ int64_t ret = -1;
+
+ if (pb) {
+ ret = avio_size(pb);
+ if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
+ ret = avio_tell(pb);
+ }
+
+ return ret;
+}
+
static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
MuxStream *ms = &of->mux->streams[ost->index];
AVFormatContext *s = of->mux->fc;
AVStream *st = ost->st;
+ int64_t fs;
int ret;
+ fs = filesize(s->pb);
+ atomic_store(&of->mux->last_filesize, fs);
+ if (fs >= of->mux->limit_filesize)
+ return AVERROR_EOF;
+
if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->vsync_method == VSYNC_DROP) ||
(st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0))
pkt->pts = pkt->dts = AV_NOPTS_VALUE;
@@ -175,7 +194,7 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
ms->last_mux_dts = pkt->dts;
ost->data_size += pkt->size;
- ost->packets_written++;
+ atomic_fetch_add(&ost->packets_written, 1);
pkt->stream_index = ost->index;
@@ -193,66 +212,81 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
ret = av_interleaved_write_frame(s, pkt);
if (ret < 0) {
print_error("av_interleaved_write_frame()", ret);
- main_return_code = 1;
- close_all_output_streams(ost, MUXER_FINISHED | ENCODER_FINISHED, ENCODER_FINISHED);
return ret;
}
return 0;
}
-static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
if (ost->sq_idx_mux >= 0) {
int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
- if (ret < 0) {
- if (pkt)
- av_packet_unref(pkt);
- if (ret == AVERROR_EOF) {
- ost->finished |= MUXER_FINISHED;
- return 0;
- } else
- return ret;
- }
+ if (ret < 0)
+ return ret;
while (1) {
ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt));
- if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
- return 0;
- else if (ret < 0)
- return ret;
+ if (ret < 0)
+ return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret;
ret = write_packet(of, output_streams[of->ost_index + ret],
of->mux->sq_pkt);
if (ret < 0)
return ret;
}
- } else {
- if (pkt)
- return write_packet(of, ost, pkt);
-
- ost->finished |= MUXER_FINISHED;
- }
+ } else if (pkt)
+ return write_packet(of, ost, pkt);
return 0;
}
-int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
+static void *muxer_thread(void *arg)
{
- int ret;
+ OutputFile *of = arg;
+ Muxer *mux = of->mux;
+ AVPacket *pkt = NULL;
+ int ret = 0;
+
+ pkt = av_packet_alloc();
+ if (!pkt) {
+ ret = AVERROR(ENOMEM);
+ goto finish;
+ }
- if (of->mux->header_written) {
- return submit_packet(of, ost, pkt);
- } else {
- /* the muxer is not initialized yet, buffer the packet */
- ret = queue_packet(of, ost, pkt);
- if (ret < 0) {
- av_packet_unref(pkt);
- return ret;
+ while (1) {
+ OutputStream *ost;
+ int stream_idx;
+
+ ret = tq_receive(mux->tq, &stream_idx, pkt);
+ if (stream_idx < 0) {
+ av_log(NULL, AV_LOG_VERBOSE,
+ "All streams finished for output file #%d\n", of->index);
+ ret = 0;
+ break;
+ }
+
+ ost = output_streams[of->ost_index + stream_idx];
+ ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt);
+ av_packet_unref(pkt);
+ if (ret == AVERROR_EOF)
+ tq_receive_finish(mux->tq, stream_idx);
+ else if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Error muxing a packet for output file #%d\n", of->index);
+ break;
}
}
- return 0;
+finish:
+ av_packet_free(&pkt);
+
+ for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
+ tq_receive_finish(mux->tq, i);
+
+ av_log(NULL, AV_LOG_VERBOSE, "Terminating muxer thread %d\n", of->index);
+
+ return (void*)(intptr_t)ret;
}
static int print_sdp(void)
@@ -303,11 +337,125 @@ static int print_sdp(void)
av_freep(&sdp_filename);
}
+ // SDP successfully written, allow muxer threads to start
+ ret = 1;
+
fail:
av_freep(&avc);
return ret;
}
+static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+{
+ Muxer *mux = of->mux;
+ int ret = 0;
+
+ if (!pkt || ost->finished & MUXER_FINISHED)
+ goto finish;
+
+ ret = tq_send(mux->tq, ost->index, pkt);
+ if (ret < 0)
+ goto finish;
+
+ return 0;
+
+finish:
+ if (pkt)
+ av_packet_unref(pkt);
+
+ ost->finished |= MUXER_FINISHED;
+ tq_send_finish(mux->tq, ost->index);
+ return ret == AVERROR_EOF ? 0 : ret;
+}
+
+int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
+{
+ int ret;
+
+ if (of->mux->tq) {
+ return submit_packet(of, ost, pkt);
+ } else {
+ /* the muxer is not initialized yet, buffer the packet */
+ ret = queue_packet(of, ost, pkt);
+ if (ret < 0) {
+ av_packet_unref(pkt);
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+static int thread_stop(OutputFile *of)
+{
+ Muxer *mux = of->mux;
+ void *ret;
+
+ if (!mux || !mux->tq)
+ return 0;
+
+ for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
+ tq_send_finish(mux->tq, i);
+
+ pthread_join(mux->thread, &ret);
+
+ tq_free(&mux->tq);
+
+ return (int)(intptr_t)ret;
+}
+
+static void pkt_move(void *dst, void *src)
+{
+ av_packet_move_ref(dst, src);
+}
+
+static int thread_start(OutputFile *of)
+{
+ Muxer *mux = of->mux;
+ AVFormatContext *fc = mux->fc;
+ ObjPool *op;
+ int ret;
+
+ op = objpool_alloc_packets();
+ if (!op)
+ return AVERROR(ENOMEM);
+
+ mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
+ if (!mux->tq) {
+ objpool_free(&op);
+ return AVERROR(ENOMEM);
+ }
+
+ ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of);
+ if (ret) {
+ tq_free(&mux->tq);
+ return AVERROR(ret);
+ }
+
+ /* flush the muxing queues */
+ for (int i = 0; i < fc->nb_streams; i++) {
+ MuxStream *ms = &of->mux->streams[i];
+ OutputStream *ost = output_streams[of->ost_index + i];
+ AVPacket *pkt;
+
+ /* try to improve muxing time_base (only possible if nothing has been written yet) */
+ if (!av_fifo_can_read(ms->muxing_queue))
+ ost->mux_timebase = ost->st->time_base;
+
+ while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
+ ret = submit_packet(of, ost, pkt);
+ if (pkt) {
+ ms->muxing_queue_data_size -= pkt->size;
+ av_packet_free(&pkt);
+ }
+ if (ret < 0)
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
/* open the muxer when all the streams are initialized */
int of_check_init(OutputFile *of)
{
@@ -339,28 +487,19 @@ int of_check_init(OutputFile *of)
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
return ret;
- }
- }
-
- /* flush the muxing queues */
- for (i = 0; i < fc->nb_streams; i++) {
- MuxStream *ms = &of->mux->streams[i];
- OutputStream *ost = output_streams[of->ost_index + i];
- AVPacket *pkt;
-
- /* try to improve muxing time_base (only possible if nothing has been written yet) */
- if (!av_fifo_can_read(ms->muxing_queue))
- ost->mux_timebase = ost->st->time_base;
-
- while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
- ret = submit_packet(of, ost, pkt);
- if (pkt) {
- ms->muxing_queue_data_size -= pkt->size;
- av_packet_free(&pkt);
+ } else if (ret == 1) {
+ /* SDP is written only after all the muxers are ready, so now we
+ * start ALL the threads */
+ for (i = 0; i < nb_output_files; i++) {
+ ret = thread_start(output_files[i]);
+ if (ret < 0)
+ return ret;
}
- if (ret < 0)
- return ret;
}
+ } else {
+ ret = thread_start(of);
+ if (ret < 0)
+ return ret;
}
return 0;
@@ -371,7 +510,7 @@ int of_write_trailer(OutputFile *of)
AVFormatContext *fc = of->mux->fc;
int ret;
- if (!of->mux->header_written) {
+ if (!of->mux->tq) {
av_log(NULL, AV_LOG_ERROR,
"Nothing was written into output file %d (%s), because "
"at least one of its streams received no packets.\n",
@@ -379,13 +518,17 @@ int of_write_trailer(OutputFile *of)
return AVERROR(EINVAL);
}
+ ret = thread_stop(of);
+ if (ret < 0)
+ main_return_code = ret;
+
ret = av_write_trailer(fc);
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error writing trailer of %s: %s\n", fc->url, av_err2str(ret));
return ret;
}
- of->mux->final_filesize = of_filesize(of);
+ of->mux->last_filesize = filesize(fc->pb);
if (!(of->format->flags & AVFMT_NOFILE)) {
ret = avio_closep(&fc->pb);
@@ -448,6 +591,8 @@ void of_close(OutputFile **pof)
if (!of)
return;
+ thread_stop(of);
+
sq_free(&of->sq_encode);
sq_free(&of->sq_mux);
@@ -457,7 +602,8 @@ void of_close(OutputFile **pof)
}
int of_muxer_init(OutputFile *of, AVFormatContext *fc,
- AVDictionary *opts, int64_t limit_filesize)
+ AVDictionary *opts, int64_t limit_filesize,
+ int thread_queue_size)
{
Muxer *mux = av_mallocz(sizeof(*mux));
int ret = 0;
@@ -487,6 +633,7 @@ int of_muxer_init(OutputFile *of, AVFormatContext *fc,
ms->last_mux_dts = AV_NOPTS_VALUE;
}
+ mux->thread_queue_size = thread_queue_size > 0 ? thread_queue_size : 8;
mux->limit_filesize = limit_filesize;
mux->opts = opts;
@@ -515,25 +662,9 @@ fail:
return ret;
}
-int of_finished(OutputFile *of)
-{
- return of_filesize(of) >= of->mux->limit_filesize;
-}
-
int64_t of_filesize(OutputFile *of)
{
- AVIOContext *pb = of->mux->fc->pb;
- int64_t ret = -1;
-
- if (of->mux->final_filesize)
- ret = of->mux->final_filesize;
- else if (pb) {
- ret = avio_size(pb);
- if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
- ret = avio_tell(pb);
- }
-
- return ret;
+ return atomic_load(&of->mux->last_filesize);
}
AVChapter * const *
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index 01eedbeb34..8ac73c0efc 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -3116,7 +3116,7 @@ loop_end:
of->nb_streams = oc->nb_streams;
of->url = filename;
- err = of_muxer_init(of, oc, format_opts, o->limit_filesize);
+ err = of_muxer_init(of, oc, format_opts, o->limit_filesize, o->thread_queue_size);
if (err < 0) {
av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n");
exit_program(1);
@@ -3907,7 +3907,7 @@ const OptionDef options[] = {
{ "disposition", OPT_STRING | HAS_ARG | OPT_SPEC |
OPT_OUTPUT, { .off = OFFSET(disposition) },
"disposition", "" },
- { "thread_queue_size", HAS_ARG | OPT_INT | OPT_OFFSET | OPT_EXPERT | OPT_INPUT,
+ { "thread_queue_size", HAS_ARG | OPT_INT | OPT_OFFSET | OPT_EXPERT | OPT_INPUT | OPT_OUTPUT,
{ .off = OFFSET(thread_queue_size) },
"set the maximum number of queued packets from the demuxer" },
{ "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT | OPT_EXPERT, { &find_stream_info },