summaryrefslogtreecommitdiff
path: root/fftools/ffmpeg_mux.c
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2022-03-31 17:33:48 +0200
committerAnton Khirnov <anton@khirnov.net>2022-07-23 11:53:19 +0200
commit2d924b3a630869c65fe0c76568910500f54ed057 (patch)
tree046e6484309205ca7715ec0a66e5b77732cf536b /fftools/ffmpeg_mux.c
parent37c764df6730e8299c468dd7636c45da6e158ef3 (diff)
fftools/ffmpeg: move each muxer to a separate thread
Diffstat (limited to 'fftools/ffmpeg_mux.c')
-rw-r--r--fftools/ffmpeg_mux.c299
1 files changed, 215 insertions, 84 deletions
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 2abadd3f9b..df9cb73d0e 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -16,17 +16,21 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#include <stdatomic.h>
#include <stdio.h>
#include <string.h>
#include "ffmpeg.h"
+#include "objpool.h"
#include "sync_queue.h"
+#include "thread_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
#include "libavutil/log.h"
#include "libavutil/mem.h"
#include "libavutil/timestamp.h"
+#include "libavutil/thread.h"
#include "libavcodec/packet.h"
@@ -51,13 +55,18 @@ typedef struct MuxStream {
struct Muxer {
AVFormatContext *fc;
+ pthread_t thread;
+ ThreadQueue *tq;
+
MuxStream *streams;
AVDictionary *opts;
+ int thread_queue_size;
+
/* filesize limit expressed in bytes */
int64_t limit_filesize;
- int64_t final_filesize;
+ atomic_int_least64_t last_filesize;
int header_written;
AVPacket *sq_pkt;
@@ -65,15 +74,6 @@ struct Muxer {
static int want_sdp = 1;
-static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream, OSTFinished others)
-{
- int i;
- for (i = 0; i < nb_output_streams; i++) {
- OutputStream *ost2 = output_streams[i];
- ost2->finished |= ost == ost2 ? this_stream : others;
- }
-}
-
static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
MuxStream *ms = &of->mux->streams[ost->index];
@@ -116,13 +116,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
return 0;
}
+static int64_t filesize(AVIOContext *pb)
+{
+ int64_t ret = -1;
+
+ if (pb) {
+ ret = avio_size(pb);
+ if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
+ ret = avio_tell(pb);
+ }
+
+ return ret;
+}
+
static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
MuxStream *ms = &of->mux->streams[ost->index];
AVFormatContext *s = of->mux->fc;
AVStream *st = ost->st;
+ int64_t fs;
int ret;
+ fs = filesize(s->pb);
+ atomic_store(&of->mux->last_filesize, fs);
+ if (fs >= of->mux->limit_filesize)
+ return AVERROR_EOF;
+
if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->vsync_method == VSYNC_DROP) ||
(st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0))
pkt->pts = pkt->dts = AV_NOPTS_VALUE;
@@ -175,7 +194,7 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
ms->last_mux_dts = pkt->dts;
ost->data_size += pkt->size;
- ost->packets_written++;
+ atomic_fetch_add(&ost->packets_written, 1);
pkt->stream_index = ost->index;
@@ -193,66 +212,81 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
ret = av_interleaved_write_frame(s, pkt);
if (ret < 0) {
print_error("av_interleaved_write_frame()", ret);
- main_return_code = 1;
- close_all_output_streams(ost, MUXER_FINISHED | ENCODER_FINISHED, ENCODER_FINISHED);
return ret;
}
return 0;
}
-static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
if (ost->sq_idx_mux >= 0) {
int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
- if (ret < 0) {
- if (pkt)
- av_packet_unref(pkt);
- if (ret == AVERROR_EOF) {
- ost->finished |= MUXER_FINISHED;
- return 0;
- } else
- return ret;
- }
+ if (ret < 0)
+ return ret;
while (1) {
ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt));
- if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
- return 0;
- else if (ret < 0)
- return ret;
+ if (ret < 0)
+ return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret;
ret = write_packet(of, output_streams[of->ost_index + ret],
of->mux->sq_pkt);
if (ret < 0)
return ret;
}
- } else {
- if (pkt)
- return write_packet(of, ost, pkt);
-
- ost->finished |= MUXER_FINISHED;
- }
+ } else if (pkt)
+ return write_packet(of, ost, pkt);
return 0;
}
-int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
+static void *muxer_thread(void *arg)
{
- int ret;
+ OutputFile *of = arg;
+ Muxer *mux = of->mux;
+ AVPacket *pkt = NULL;
+ int ret = 0;
+
+ pkt = av_packet_alloc();
+ if (!pkt) {
+ ret = AVERROR(ENOMEM);
+ goto finish;
+ }
- if (of->mux->header_written) {
- return submit_packet(of, ost, pkt);
- } else {
- /* the muxer is not initialized yet, buffer the packet */
- ret = queue_packet(of, ost, pkt);
- if (ret < 0) {
- av_packet_unref(pkt);
- return ret;
+ while (1) {
+ OutputStream *ost;
+ int stream_idx;
+
+ ret = tq_receive(mux->tq, &stream_idx, pkt);
+ if (stream_idx < 0) {
+ av_log(NULL, AV_LOG_VERBOSE,
+ "All streams finished for output file #%d\n", of->index);
+ ret = 0;
+ break;
+ }
+
+ ost = output_streams[of->ost_index + stream_idx];
+ ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt);
+ av_packet_unref(pkt);
+ if (ret == AVERROR_EOF)
+ tq_receive_finish(mux->tq, stream_idx);
+ else if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Error muxing a packet for output file #%d\n", of->index);
+ break;
}
}
- return 0;
+finish:
+ av_packet_free(&pkt);
+
+ for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
+ tq_receive_finish(mux->tq, i);
+
+ av_log(NULL, AV_LOG_VERBOSE, "Terminating muxer thread %d\n", of->index);
+
+ return (void*)(intptr_t)ret;
}
static int print_sdp(void)
@@ -303,11 +337,125 @@ static int print_sdp(void)
av_freep(&sdp_filename);
}
+ // SDP successfully written, allow muxer threads to start
+ ret = 1;
+
fail:
av_freep(&avc);
return ret;
}
+static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+{
+ Muxer *mux = of->mux;
+ int ret = 0;
+
+ if (!pkt || ost->finished & MUXER_FINISHED)
+ goto finish;
+
+ ret = tq_send(mux->tq, ost->index, pkt);
+ if (ret < 0)
+ goto finish;
+
+ return 0;
+
+finish:
+ if (pkt)
+ av_packet_unref(pkt);
+
+ ost->finished |= MUXER_FINISHED;
+ tq_send_finish(mux->tq, ost->index);
+ return ret == AVERROR_EOF ? 0 : ret;
+}
+
+int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
+{
+ int ret;
+
+ if (of->mux->tq) {
+ return submit_packet(of, ost, pkt);
+ } else {
+ /* the muxer is not initialized yet, buffer the packet */
+ ret = queue_packet(of, ost, pkt);
+ if (ret < 0) {
+ av_packet_unref(pkt);
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+static int thread_stop(OutputFile *of)
+{
+ Muxer *mux = of->mux;
+ void *ret;
+
+ if (!mux || !mux->tq)
+ return 0;
+
+ for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
+ tq_send_finish(mux->tq, i);
+
+ pthread_join(mux->thread, &ret);
+
+ tq_free(&mux->tq);
+
+ return (int)(intptr_t)ret;
+}
+
+static void pkt_move(void *dst, void *src)
+{
+ av_packet_move_ref(dst, src);
+}
+
+static int thread_start(OutputFile *of)
+{
+ Muxer *mux = of->mux;
+ AVFormatContext *fc = mux->fc;
+ ObjPool *op;
+ int ret;
+
+ op = objpool_alloc_packets();
+ if (!op)
+ return AVERROR(ENOMEM);
+
+ mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
+ if (!mux->tq) {
+ objpool_free(&op);
+ return AVERROR(ENOMEM);
+ }
+
+ ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of);
+ if (ret) {
+ tq_free(&mux->tq);
+ return AVERROR(ret);
+ }
+
+ /* flush the muxing queues */
+ for (int i = 0; i < fc->nb_streams; i++) {
+ MuxStream *ms = &of->mux->streams[i];
+ OutputStream *ost = output_streams[of->ost_index + i];
+ AVPacket *pkt;
+
+ /* try to improve muxing time_base (only possible if nothing has been written yet) */
+ if (!av_fifo_can_read(ms->muxing_queue))
+ ost->mux_timebase = ost->st->time_base;
+
+ while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
+ ret = submit_packet(of, ost, pkt);
+ if (pkt) {
+ ms->muxing_queue_data_size -= pkt->size;
+ av_packet_free(&pkt);
+ }
+ if (ret < 0)
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
/* open the muxer when all the streams are initialized */
int of_check_init(OutputFile *of)
{
@@ -339,28 +487,19 @@ int of_check_init(OutputFile *of)
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
return ret;
- }
- }
-
- /* flush the muxing queues */
- for (i = 0; i < fc->nb_streams; i++) {
- MuxStream *ms = &of->mux->streams[i];
- OutputStream *ost = output_streams[of->ost_index + i];
- AVPacket *pkt;
-
- /* try to improve muxing time_base (only possible if nothing has been written yet) */
- if (!av_fifo_can_read(ms->muxing_queue))
- ost->mux_timebase = ost->st->time_base;
-
- while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
- ret = submit_packet(of, ost, pkt);
- if (pkt) {
- ms->muxing_queue_data_size -= pkt->size;
- av_packet_free(&pkt);
+ } else if (ret == 1) {
+ /* SDP is written only after all the muxers are ready, so now we
+ * start ALL the threads */
+ for (i = 0; i < nb_output_files; i++) {
+ ret = thread_start(output_files[i]);
+ if (ret < 0)
+ return ret;
}
- if (ret < 0)
- return ret;
}
+ } else {
+ ret = thread_start(of);
+ if (ret < 0)
+ return ret;
}
return 0;
@@ -371,7 +510,7 @@ int of_write_trailer(OutputFile *of)
AVFormatContext *fc = of->mux->fc;
int ret;
- if (!of->mux->header_written) {
+ if (!of->mux->tq) {
av_log(NULL, AV_LOG_ERROR,
"Nothing was written into output file %d (%s), because "
"at least one of its streams received no packets.\n",
@@ -379,13 +518,17 @@ int of_write_trailer(OutputFile *of)
return AVERROR(EINVAL);
}
+ ret = thread_stop(of);
+ if (ret < 0)
+ main_return_code = ret;
+
ret = av_write_trailer(fc);
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error writing trailer of %s: %s\n", fc->url, av_err2str(ret));
return ret;
}
- of->mux->final_filesize = of_filesize(of);
+ of->mux->last_filesize = filesize(fc->pb);
if (!(of->format->flags & AVFMT_NOFILE)) {
ret = avio_closep(&fc->pb);
@@ -448,6 +591,8 @@ void of_close(OutputFile **pof)
if (!of)
return;
+ thread_stop(of);
+
sq_free(&of->sq_encode);
sq_free(&of->sq_mux);
@@ -457,7 +602,8 @@ void of_close(OutputFile **pof)
}
int of_muxer_init(OutputFile *of, AVFormatContext *fc,
- AVDictionary *opts, int64_t limit_filesize)
+ AVDictionary *opts, int64_t limit_filesize,
+ int thread_queue_size)
{
Muxer *mux = av_mallocz(sizeof(*mux));
int ret = 0;
@@ -487,6 +633,7 @@ int of_muxer_init(OutputFile *of, AVFormatContext *fc,
ms->last_mux_dts = AV_NOPTS_VALUE;
}
+ mux->thread_queue_size = thread_queue_size > 0 ? thread_queue_size : 8;
mux->limit_filesize = limit_filesize;
mux->opts = opts;
@@ -515,25 +662,9 @@ fail:
return ret;
}
-int of_finished(OutputFile *of)
-{
- return of_filesize(of) >= of->mux->limit_filesize;
-}
-
int64_t of_filesize(OutputFile *of)
{
- AVIOContext *pb = of->mux->fc->pb;
- int64_t ret = -1;
-
- if (of->mux->final_filesize)
- ret = of->mux->final_filesize;
- else if (pb) {
- ret = avio_size(pb);
- if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
- ret = avio_tell(pb);
- }
-
- return ret;
+ return atomic_load(&of->mux->last_filesize);
}
AVChapter * const *