summaryrefslogtreecommitdiff
path: root/fftools
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2022-06-10 14:21:42 +0200
committerAnton Khirnov <anton@khirnov.net>2022-07-23 11:53:19 +0200
commit4740fea7ddf5f81577c9f5a0c096a8a16a54716e (patch)
treea1a5d74dc843e94bf0193116a4db8f24431b5a2c /fftools
parent9ac78fb347e26270ebaf5ed41e55d34065853583 (diff)
fftools/ffmpeg: rework -shortest implementation
The -shortest option (which finishes the output file at the time the shortest stream ends) is currently implemented by faking the -t option when an output stream ends. This approach is fragile, since it depends on the frames/packets being processed in a specific order. E.g. there are currently some situations in which the output file length will depend unpredictably on unrelated factors like encoder delay. More importantly, the present work aiming at splitting various ffmpeg components into different threads will make this approach completely unworkable, since the frames/packets will arrive in effectively random order. This commit introduces a "sync queue", which is essentially a collection of FIFOs, one per stream. Frames/packets are submitted to these FIFOs and are then released for further processing (encoding or muxing) when it is ensured that the frame in question will not cause its stream to get ahead of the other streams (the logic is similar to libavformat's interleaving queue). These sync queues are then used for encoding and/or muxing when the -shortest option is specified. A new option – -shortest_buf_duration – controls the maximum number of queued packets, to avoid runaway memory usage. This commit changes the results of the following tests: - copy-shortest[12]: the last audio frame is now gone. This is correct, since it actually outlasts the last video frame. - shortest-sub: the video packets following the last subtitle packet are now gone. This is also correct.
Diffstat (limited to 'fftools')
-rw-r--r--fftools/Makefile2
-rw-r--r--fftools/ffmpeg.c109
-rw-r--r--fftools/ffmpeg.h9
-rw-r--r--fftools/ffmpeg_mux.c67
-rw-r--r--fftools/ffmpeg_opt.c82
-rw-r--r--fftools/sync_queue.c425
-rw-r--r--fftools/sync_queue.h100
7 files changed, 758 insertions, 36 deletions
diff --git a/fftools/Makefile b/fftools/Makefile
index 81ad6c4f4f..bc57ebe748 100644
--- a/fftools/Makefile
+++ b/fftools/Makefile
@@ -14,6 +14,8 @@ OBJS-ffmpeg += \
fftools/ffmpeg_hw.o \
fftools/ffmpeg_mux.o \
fftools/ffmpeg_opt.o \
+ fftools/objpool.o \
+ fftools/sync_queue.o \
define DOFFTOOL
OBJS-$(1) += fftools/cmdutils.o fftools/opt_common.o fftools/$(1).o $(OBJS-$(1)-yes)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 50e17b1890..9b6bb3d759 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -104,6 +104,7 @@
#include "ffmpeg.h"
#include "cmdutils.h"
+#include "sync_queue.h"
#include "libavutil/avassert.h"
@@ -569,6 +570,7 @@ static void ffmpeg_cleanup(int ret)
av_bsf_free(&ost->bsf_ctx);
av_frame_free(&ost->filtered_frame);
+ av_frame_free(&ost->sq_frame);
av_frame_free(&ost->last_frame);
av_packet_free(&ost->pkt);
av_dict_free(&ost->encoder_opts);
@@ -691,13 +693,10 @@ static void update_benchmark(const char *fmt, ...)
static void close_output_stream(OutputStream *ost)
{
OutputFile *of = output_files[ost->file_index];
- AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
-
ost->finished |= ENCODER_FINISHED;
- if (of->shortest) {
- int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
- of->recording_time = FFMIN(of->recording_time, end);
- }
+
+ if (ost->sq_idx_encode >= 0)
+ sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
}
/*
@@ -726,10 +725,15 @@ static void output_packet(OutputFile *of, AVPacket *pkt,
goto finish;
while ((ret = av_bsf_receive_packet(ost->bsf_ctx, pkt)) >= 0)
of_submit_packet(of, pkt, ost);
+ if (ret == AVERROR_EOF)
+ of_submit_packet(of, NULL, ost);
if (ret == AVERROR(EAGAIN))
ret = 0;
- } else if (!eof)
- of_submit_packet(of, pkt, ost);
+ } else
+ of_submit_packet(of, eof ? NULL : pkt, ost);
+
+ if (eof)
+ ost->finished |= MUXER_FINISHED;
finish:
if (ret < 0 && ret != AVERROR_EOF) {
@@ -899,6 +903,7 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
if (frame) {
ost->frames_encoded++;
+ ost->samples_encoded += frame->nb_samples;
if (debug_ts) {
av_log(NULL, AV_LOG_INFO, "encoder <- type:%s "
@@ -971,6 +976,52 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
av_assert0(0);
}
+static int submit_encode_frame(OutputFile *of, OutputStream *ost,
+ AVFrame *frame)
+{
+ int ret;
+
+ if (ost->sq_idx_encode < 0)
+ return encode_frame(of, ost, frame);
+
+ if (frame) {
+ ret = av_frame_ref(ost->sq_frame, frame);
+ if (ret < 0)
+ return ret;
+ frame = ost->sq_frame;
+ }
+
+ ret = sq_send(of->sq_encode, ost->sq_idx_encode,
+ SQFRAME(frame));
+ if (ret < 0) {
+ if (frame)
+ av_frame_unref(frame);
+ if (ret != AVERROR_EOF)
+ return ret;
+ }
+
+ while (1) {
+ AVFrame *enc_frame = ost->sq_frame;
+
+ ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
+ SQFRAME(enc_frame));
+ if (ret == AVERROR_EOF) {
+ enc_frame = NULL;
+ } else if (ret < 0) {
+ return (ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
+
+ ret = encode_frame(of, ost, enc_frame);
+ if (enc_frame)
+ av_frame_unref(enc_frame);
+ if (ret < 0) {
+ if (ret == AVERROR_EOF)
+ close_output_stream(ost);
+ return ret;
+ }
+ }
+}
+
static void do_audio_out(OutputFile *of, OutputStream *ost,
AVFrame *frame)
{
@@ -984,10 +1035,9 @@ static void do_audio_out(OutputFile *of, OutputStream *ost,
if (frame->pts == AV_NOPTS_VALUE || audio_sync_method < 0)
frame->pts = ost->sync_opts;
ost->sync_opts = frame->pts + frame->nb_samples;
- ost->samples_encoded += frame->nb_samples;
- ret = encode_frame(of, ost, frame);
- if (ret < 0)
+ ret = submit_encode_frame(of, ost, frame);
+ if (ret < 0 && ret != AVERROR_EOF)
exit_program(1);
}
@@ -1151,15 +1201,18 @@ static void do_video_out(OutputFile *of,
if (delta0 > 1.1)
nb0_frames = llrintf(delta0 - 0.6);
}
+ next_picture->pkt_duration = 1;
break;
case VSYNC_VFR:
if (delta <= -0.6)
nb_frames = 0;
else if (delta > 0.6)
ost->sync_opts = llrint(sync_ipts);
+ next_picture->pkt_duration = duration;
break;
case VSYNC_DROP:
case VSYNC_PASSTHROUGH:
+ next_picture->pkt_duration = duration;
ost->sync_opts = llrint(sync_ipts);
break;
default:
@@ -1273,8 +1326,8 @@ static void do_video_out(OutputFile *of,
av_log(NULL, AV_LOG_DEBUG, "Forced keyframe at time %f\n", pts_time);
}
- ret = encode_frame(of, ost, in_picture);
- if (ret < 0)
+ ret = submit_encode_frame(of, ost, in_picture);
+ if (ret < 0 && ret != AVERROR_EOF)
exit_program(1);
ost->sync_opts++;
@@ -1286,19 +1339,6 @@ static void do_video_out(OutputFile *of,
av_frame_move_ref(ost->last_frame, next_picture);
}
-static void finish_output_stream(OutputStream *ost)
-{
- OutputFile *of = output_files[ost->file_index];
- AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
-
- ost->finished = ENCODER_FINISHED | MUXER_FINISHED;
-
- if (of->shortest) {
- int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
- of->recording_time = FFMIN(of->recording_time, end);
- }
-}
-
/**
* Get and encode new output from any of the filtergraphs, without causing
* activity.
@@ -1766,7 +1806,7 @@ static void flush_encoders(void)
exit_program(1);
}
- finish_output_stream(ost);
+ output_packet(of, ost->pkt, ost, 1);
}
init_output_stream_wrapper(ost, NULL, 1);
@@ -1775,7 +1815,7 @@ static void flush_encoders(void)
if (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)
continue;
- ret = encode_frame(of, ost, NULL);
+ ret = submit_encode_frame(of, ost, NULL);
if (ret != AVERROR_EOF)
exit_program(1);
}
@@ -3086,6 +3126,9 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
break;
}
+ if (ost->sq_idx_encode >= 0)
+ sq_set_tb(of->sq_encode, ost->sq_idx_encode, enc_ctx->time_base);
+
ost->mux_timebase = enc_ctx->time_base;
return 0;
@@ -3094,6 +3137,7 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
static int init_output_stream(OutputStream *ost, AVFrame *frame,
char *error, int error_len)
{
+ OutputFile *of = output_files[ost->file_index];
int ret = 0;
if (ost->encoding_needed) {
@@ -3226,6 +3270,9 @@ static int init_output_stream(OutputStream *ost, AVFrame *frame,
if (ret < 0)
return ret;
+ if (ost->sq_idx_mux >= 0)
+ sq_set_tb(of->sq_mux, ost->sq_idx_mux, ost->mux_timebase);
+
ost->initialized = 1;
ret = of_check_init(output_files[ost->file_index]);
@@ -3930,8 +3977,10 @@ static int process_input(int file_index)
OutputStream *ost = output_streams[j];
if (ost->source_index == ifile->ist_index + i &&
- (ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE))
- finish_output_stream(ost);
+ (ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE)) {
+ OutputFile *of = output_files[ost->file_index];
+ output_packet(of, ost->pkt, ost, 1);
+ }
}
}
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 090bf67d2d..58e093b2cb 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -26,6 +26,7 @@
#include <signal.h>
#include "cmdutils.h"
+#include "sync_queue.h"
#include "libavformat/avformat.h"
#include "libavformat/avio.h"
@@ -151,6 +152,7 @@ typedef struct OptionsContext {
int64_t limit_filesize;
float mux_preload;
float mux_max_delay;
+ float shortest_buf_duration;
int shortest;
int bitexact;
@@ -484,6 +486,7 @@ typedef struct OutputStream {
int64_t max_frames;
AVFrame *filtered_frame;
AVFrame *last_frame;
+ AVFrame *sq_frame;
AVPacket *pkt;
int64_t last_dropped;
int64_t last_nb0_frames[3];
@@ -575,6 +578,9 @@ typedef struct OutputStream {
/* frame encode sum of squared error values */
int64_t error[4];
+
+ int sq_idx_encode;
+ int sq_idx_mux;
} OutputStream;
typedef struct Muxer Muxer;
@@ -585,6 +591,9 @@ typedef struct OutputFile {
Muxer *mux;
const AVOutputFormat *format;
+ SyncQueue *sq_encode;
+ SyncQueue *sq_mux;
+
AVFormatContext *ctx;
int ost_index; /* index of the first stream in output_streams */
int64_t recording_time; ///< desired length of the resulting file in microseconds == AV_TIME_BASE units
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index a3350a73e9..453ccac912 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -20,6 +20,7 @@
#include <string.h>
#include "ffmpeg.h"
+#include "sync_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
@@ -56,6 +57,8 @@ struct Muxer {
int64_t limit_filesize;
int64_t final_filesize;
int header_written;
+
+ AVPacket *sq_pkt;
};
static int want_sdp = 1;
@@ -72,13 +75,14 @@ static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream,
static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
MuxStream *ms = &of->mux->streams[ost->index];
- AVPacket *tmp_pkt;
+ AVPacket *tmp_pkt = NULL;
int ret;
if (!av_fifo_can_write(ms->muxing_queue)) {
size_t cur_size = av_fifo_can_read(ms->muxing_queue);
+ size_t pkt_size = pkt ? pkt->size : 0;
unsigned int are_we_over_size =
- (ms->muxing_queue_data_size + pkt->size) > ost->muxing_queue_data_threshold;
+ (ms->muxing_queue_data_size + pkt_size) > ost->muxing_queue_data_threshold;
size_t limit = are_we_over_size ? ost->max_muxing_queue_size : SIZE_MAX;
size_t new_size = FFMIN(2 * cur_size, limit);
@@ -93,6 +97,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
return ret;
}
+ if (pkt) {
ret = av_packet_make_refcounted(pkt);
if (ret < 0)
return ret;
@@ -103,6 +108,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
av_packet_move_ref(tmp_pkt, pkt);
ms->muxing_queue_data_size += tmp_pkt->size;
+ }
av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
return 0;
@@ -192,11 +198,44 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
}
}
+static void submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+{
+ if (ost->sq_idx_mux >= 0) {
+ int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
+ if (ret < 0) {
+ if (pkt)
+ av_packet_unref(pkt);
+ if (ret == AVERROR_EOF) {
+ ost->finished |= MUXER_FINISHED;
+ return;
+ } else
+ exit_program(1);
+ }
+
+ while (1) {
+ ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt));
+ if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
+ return;
+ else if (ret < 0)
+ exit_program(1);
+
+ write_packet(of, output_streams[of->ost_index + ret],
+ of->mux->sq_pkt);
+ }
+ } else {
+ if (pkt)
+ write_packet(of, ost, pkt);
+ else
+ ost->finished |= MUXER_FINISHED;
+ }
+}
+
void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
{
AVStream *st = ost->st;
int ret;
+ if (pkt) {
/*
* Audio encoders may split the packets -- #frames in != #packets out.
* But there is no reordering, so we can limit the number of output packets
@@ -211,9 +250,10 @@ void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
}
ost->frame_number++;
}
+ }
if (of->mux->header_written) {
- write_packet(of, ost, pkt);
+ submit_packet(of, ost, pkt);
} else {
/* the muxer is not initialized yet, buffer the packet */
ret = queue_packet(of, ost, pkt);
@@ -321,9 +361,11 @@ int of_check_init(OutputFile *of)
ost->mux_timebase = ost->st->time_base;
while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
- ms->muxing_queue_data_size -= pkt->size;
- write_packet(of, ost, pkt);
- av_packet_free(&pkt);
+ submit_packet(of, ost, pkt);
+ if (pkt) {
+ ms->muxing_queue_data_size -= pkt->size;
+ av_packet_free(&pkt);
+ }
}
}
@@ -383,6 +425,8 @@ static void mux_free(Muxer **pmux, int nb_streams)
av_freep(&mux->streams);
av_dict_free(&mux->opts);
+ av_packet_free(&mux->sq_pkt);
+
av_freep(pmux);
}
@@ -394,6 +438,9 @@ void of_close(OutputFile **pof)
if (!of)
return;
+ sq_free(&of->sq_encode);
+ sq_free(&of->sq_mux);
+
s = of->ctx;
mux_free(&of->mux, s ? s->nb_streams : 0);
@@ -437,6 +484,14 @@ int of_muxer_init(OutputFile *of, AVDictionary *opts, int64_t limit_filesize)
if (strcmp(of->format->name, "rtp"))
want_sdp = 0;
+ if (of->sq_mux) {
+ mux->sq_pkt = av_packet_alloc();
+ if (!mux->sq_pkt) {
+ ret = AVERROR(ENOMEM);
+ goto fail;
+ }
+ }
+
/* write the header for files with no streams */
if (of->format->flags & AVFMT_NOSTREAMS && of->ctx->nb_streams == 0) {
ret = of_check_init(of);
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index db8ec33cde..4281644cfc 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -31,6 +31,7 @@
#include "fopen_utf8.h"
#include "cmdutils.h"
#include "opt_common.h"
+#include "sync_queue.h"
#include "libavformat/avformat.h"
@@ -236,6 +237,7 @@ static void init_options(OptionsContext *o)
o->accurate_seek = 1;
o->thread_queue_size = -1;
o->input_sync_ref = -1;
+ o->shortest_buf_duration = 10.f;
}
static int show_hwaccels(void *optctx, const char *opt, const char *arg)
@@ -2385,6 +2387,78 @@ static int init_complex_filters(void)
return 0;
}
+static int setup_sync_queues(OutputFile *of, AVFormatContext *oc, int64_t buf_size_us)
+{
+ int nb_av_enc = 0, nb_interleaved = 0;
+
+#define IS_AV_ENC(ost, type) \
+ (ost->encoding_needed && (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO))
+#define IS_INTERLEAVED(type) (type != AVMEDIA_TYPE_ATTACHMENT)
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ ost->sq_idx_encode = -1;
+ ost->sq_idx_mux = -1;
+
+ nb_interleaved += IS_INTERLEAVED(type);
+ nb_av_enc += IS_AV_ENC(ost, type);
+ }
+
+ if (!(nb_interleaved > 1 && of->shortest))
+ return 0;
+
+ /* if we have more than one encoded audio/video streams, then we
+ * synchronize them before encoding */
+ if (nb_av_enc > 1) {
+ of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us);
+ if (!of->sq_encode)
+ return AVERROR(ENOMEM);
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ if (!IS_AV_ENC(ost, type))
+ continue;
+
+ ost->sq_idx_encode = sq_add_stream(of->sq_encode);
+ if (ost->sq_idx_encode < 0)
+ return ost->sq_idx_encode;
+
+ ost->sq_frame = av_frame_alloc();
+ if (!ost->sq_frame)
+ return AVERROR(ENOMEM);
+ }
+ }
+
+ /* if there are any additional interleaved streams, then ALL the streams
+ * are also synchronized before sending them to the muxer */
+ if (nb_interleaved > nb_av_enc) {
+ of->sq_mux = sq_alloc(SYNC_QUEUE_PACKETS, buf_size_us);
+ if (!of->sq_mux)
+ return AVERROR(ENOMEM);
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ if (!IS_INTERLEAVED(type))
+ continue;
+
+ ost->sq_idx_mux = sq_add_stream(of->sq_mux);
+ if (ost->sq_idx_mux < 0)
+ return ost->sq_idx_mux;
+ }
+ }
+
+#undef IS_AV_ENC
+#undef IS_INTERLEAVED
+
+ return 0;
+}
+
static int open_output_file(OptionsContext *o, const char *filename)
{
AVFormatContext *oc;
@@ -3022,6 +3096,12 @@ loop_end:
exit_program(1);
}
+ err = setup_sync_queues(of, oc, o->shortest_buf_duration * AV_TIME_BASE);
+ if (err < 0) {
+ av_log(NULL, AV_LOG_FATAL, "Error setting up output sync queues\n");
+ exit_program(1);
+ }
+
err = of_muxer_init(of, format_opts, o->limit_filesize);
if (err < 0) {
av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n");
@@ -3739,6 +3819,8 @@ const OptionDef options[] = {
{ "shortest", OPT_BOOL | OPT_EXPERT | OPT_OFFSET |
OPT_OUTPUT, { .off = OFFSET(shortest) },
"finish encoding within shortest input" },
+ { "shortest_buf_duration", HAS_ARG | OPT_FLOAT | OPT_EXPERT | OPT_OFFSET | OPT_OUTPUT, { .off = OFFSET(shortest_buf_duration) },
+ "maximum buffering duration (in seconds) for the -shortest option" },
{ "bitexact", OPT_BOOL | OPT_EXPERT | OPT_OFFSET |
OPT_OUTPUT | OPT_INPUT, { .off = OFFSET(bitexact) },
"bitexact mode" },
diff --git a/fftools/sync_queue.c b/fftools/sync_queue.c
new file mode 100644
index 0000000000..ab654ca790
--- /dev/null
+++ b/fftools/sync_queue.c
@@ -0,0 +1,425 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdint.h>
+#include <string.h>
+
+#include "libavutil/avassert.h"
+#include "libavutil/error.h"
+#include "libavutil/fifo.h"
+#include "libavutil/mathematics.h"
+#include "libavutil/mem.h"
+
+#include "objpool.h"
+#include "sync_queue.h"
+
+typedef struct SyncQueueStream {
+ AVFifo *fifo;
+ AVRational tb;
+
+ /* stream head: largest timestamp seen */
+ int64_t head_ts;
+ /* no more frames will be sent for this stream */
+ int finished;
+} SyncQueueStream;
+
+struct SyncQueue {
+ enum SyncQueueType type;
+
+ /* no more frames will be sent for any stream */
+ int finished;
+ /* sync head: the stream with the _smallest_ head timestamp
+ * this stream determines which frames can be output */
+ int head_stream;
+ /* the finished stream with the smallest finish timestamp or -1 */
+ int head_finished_stream;
+
+ // maximum buffering duration in microseconds
+ int64_t buf_size_us;
+
+ SyncQueueStream *streams;
+ unsigned int nb_streams;
+
+ // pool of preallocated frames to avoid constant allocations
+ ObjPool *pool;
+};
+
+static void frame_move(const SyncQueue *sq, SyncQueueFrame dst,
+ SyncQueueFrame src)
+{
+ if (sq->type == SYNC_QUEUE_PACKETS)
+ av_packet_move_ref(dst.p, src.p);
+ else
+ av_frame_move_ref(dst.f, src.f);
+}
+
+static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
+{
+ return (sq->type == SYNC_QUEUE_PACKETS) ?
+ frame.p->pts + frame.p->duration :
+ frame.f->pts + frame.f->pkt_duration;
+}
+
+static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
+{
+ return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL);
+}
+
+static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
+{
+ SyncQueueStream *st = &sq->streams[stream_idx];
+
+ st->finished = 1;
+
+ if (st->head_ts != AV_NOPTS_VALUE) {
+ /* check if this stream is the new finished head */
+ if (sq->head_finished_stream < 0 ||
+ av_compare_ts(st->head_ts, st->tb,
+ sq->streams[sq->head_finished_stream].head_ts,
+ sq->streams[sq->head_finished_stream].tb) < 0) {
+ sq->head_finished_stream = stream_idx;
+ }
+
+ /* mark as finished all streams that should no longer receive new frames,
+ * due to them being ahead of some finished stream */
+ st = &sq->streams[sq->head_finished_stream];
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st1 = &sq->streams[i];
+ if (st != st1 && st1->head_ts != AV_NOPTS_VALUE &&
+ av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0)
+ st1->finished = 1;
+ }
+ }
+
+ /* mark the whole queue as finished if all streams are finished */
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ if (!sq->streams[i].finished)
+ return;
+ }
+ sq->finished = 1;
+}
+
+static void queue_head_update(SyncQueue *sq)
+{
+ if (sq->head_stream < 0) {
+ /* wait for one timestamp in each stream before determining
+ * the queue head */
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st = &sq->streams[i];
+ if (st->head_ts == AV_NOPTS_VALUE)
+ return;
+ }
+
+ // placeholder value, correct one will be found below
+ sq->head_stream = 0;
+ }
+
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st_head = &sq->streams[sq->head_stream];
+ SyncQueueStream *st_other = &sq->streams[i];
+ if (st_other->head_ts != AV_NOPTS_VALUE &&
+ av_compare_ts(st_other->head_ts, st_other->tb,
+ st_head->head_ts, st_head->tb) < 0)
+ sq->head_stream = i;
+ }
+}
+
+/* update this stream's head timestamp */
+static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
+{
+ SyncQueueStream *st = &sq->streams[stream_idx];
+
+ if (ts == AV_NOPTS_VALUE ||
+ (st->head_ts != AV_NOPTS_VALUE && st->head_ts >= ts))
+ return;
+
+ st->head_ts = ts;
+
+ /* if this stream is now ahead of some finished stream, then
+ * this stream is also finished */
+ if (sq->head_finished_stream >= 0 &&
+ av_compare_ts(sq->streams[sq->head_finished_stream].head_ts,
+ sq->streams[sq->head_finished_stream].tb,
+ ts, st->tb) <= 0)
+ finish_stream(sq, stream_idx);
+
+ /* update the overall head timestamp if it could have changed */
+ if (sq->head_stream < 0 || sq->head_stream == stream_idx)
+ queue_head_update(sq);
+}
+
+/* If the queue for the given stream (or all streams when stream_idx=-1)
+ * is overflowing, trigger a fake heartbeat on lagging streams.
+ *
+ * @return 1 if heartbeat triggered, 0 otherwise
+ */
+static int overflow_heartbeat(SyncQueue *sq, int stream_idx)
+{
+ SyncQueueStream *st;
+ SyncQueueFrame frame;
+ int64_t tail_ts = AV_NOPTS_VALUE;
+
+ /* if no stream specified, pick the one that is most ahead */
+ if (stream_idx < 0) {
+ int64_t ts = AV_NOPTS_VALUE;
+
+ for (int i = 0; i < sq->nb_streams; i++) {
+ st = &sq->streams[i];
+ if (st->head_ts != AV_NOPTS_VALUE &&
+ (ts == AV_NOPTS_VALUE ||
+ av_compare_ts(ts, sq->streams[stream_idx].tb,
+ st->head_ts, st->tb) < 0)) {
+ ts = st->head_ts;
+ stream_idx = i;
+ }
+ }
+ /* no stream has a timestamp yet -> nothing to do */
+ if (stream_idx < 0)
+ return 0;
+ }
+
+ st = &sq->streams[stream_idx];
+
+ /* get the chosen stream's tail timestamp */
+ for (size_t i = 0; tail_ts == AV_NOPTS_VALUE &&
+ av_fifo_peek(st->fifo, &frame, 1, i) >= 0; i++)
+ tail_ts = frame_ts(sq, frame);
+
+ /* overflow triggers when the tail is over specified duration behind the head */
+ if (tail_ts == AV_NOPTS_VALUE || tail_ts >= st->head_ts ||
+ av_rescale_q(st->head_ts - tail_ts, st->tb, AV_TIME_BASE_Q) < sq->buf_size_us)
+ return 0;
+
+ /* signal a fake timestamp for all streams that prevent tail_ts from being output */
+ tail_ts++;
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st1 = &sq->streams[i];
+ int64_t ts;
+
+ if (st == st1 || st1->finished ||
+ (st1->head_ts != AV_NOPTS_VALUE &&
+ av_compare_ts(tail_ts, st->tb, st1->head_ts, st1->tb) <= 0))
+ continue;
+
+ ts = av_rescale_q(tail_ts, st->tb, st1->tb);
+ if (st1->head_ts != AV_NOPTS_VALUE)
+ ts = FFMAX(st1->head_ts + 1, ts);
+
+ stream_update_ts(sq, i, ts);
+ }
+
+ return 1;
+}
+
+int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
+{
+ SyncQueueStream *st;
+ SyncQueueFrame dst;
+ int64_t ts;
+ int ret;
+
+ av_assert0(stream_idx < sq->nb_streams);
+ st = &sq->streams[stream_idx];
+
+ av_assert0(st->tb.num > 0 && st->tb.den > 0);
+
+ if (frame_null(sq, frame)) {
+ finish_stream(sq, stream_idx);
+ return 0;
+ }
+ if (st->finished)
+ return AVERROR_EOF;
+
+ ret = objpool_get(sq->pool, (void**)&dst);
+ if (ret < 0)
+ return ret;
+
+ frame_move(sq, dst, frame);
+
+ ts = frame_ts(sq, dst);
+
+ ret = av_fifo_write(st->fifo, &dst, 1);
+ if (ret < 0) {
+ frame_move(sq, frame, dst);
+ objpool_release(sq->pool, (void**)&dst);
+ return ret;
+ }
+
+ stream_update_ts(sq, stream_idx, ts);
+
+ return 0;
+}
+
+static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx,
+ SyncQueueFrame frame)
+{
+ SyncQueueStream *st_head = sq->head_stream >= 0 ?
+ &sq->streams[sq->head_stream] : NULL;
+ SyncQueueStream *st;
+
+ av_assert0(stream_idx < sq->nb_streams);
+ st = &sq->streams[stream_idx];
+
+ if (av_fifo_can_read(st->fifo)) {
+ SyncQueueFrame peek;
+ int64_t ts;
+ int cmp = 1;
+
+ av_fifo_peek(st->fifo, &peek, 1, 0);
+ ts = frame_ts(sq, peek);
+
+ /* check if this stream's tail timestamp does not overtake
+ * the overall queue head */
+ if (ts != AV_NOPTS_VALUE && st_head)
+ cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb);
+
+ /* We can release frames that do not end after the queue head.
+ * Frames with no timestamps are just passed through with no conditions.
+ */
+ if (cmp <= 0 || ts == AV_NOPTS_VALUE) {
+ frame_move(sq, frame, peek);
+ objpool_release(sq->pool, (void**)&peek);
+ av_fifo_drain2(st->fifo, 1);
+ return 0;
+ }
+ }
+
+ return (sq->finished || (st->finished && !av_fifo_can_read(st->fifo))) ?
+ AVERROR_EOF : AVERROR(EAGAIN);
+}
+
+static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
+{
+ int nb_eof = 0;
+ int ret;
+
+ /* read a frame for a specific stream */
+ if (stream_idx >= 0) {
+ ret = receive_for_stream(sq, stream_idx, frame);
+ return (ret < 0) ? ret : stream_idx;
+ }
+
+ /* read a frame for any stream with available output */
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ ret = receive_for_stream(sq, i, frame);
+ if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) {
+ nb_eof += (ret == AVERROR_EOF);
+ continue;
+ }
+ return (ret < 0) ? ret : i;
+ }
+
+ return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN);
+}
+
+int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
+{
+ int ret = receive_internal(sq, stream_idx, frame);
+
+ /* try again if the queue overflowed and triggered a fake heartbeat
+ * for lagging streams */
+ if (ret == AVERROR(EAGAIN) && overflow_heartbeat(sq, stream_idx))
+ ret = receive_internal(sq, stream_idx, frame);
+
+ return ret;
+}
+
+int sq_add_stream(SyncQueue *sq)
+{
+ SyncQueueStream *tmp, *st;
+
+ tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams));
+ if (!tmp)
+ return AVERROR(ENOMEM);
+ sq->streams = tmp;
+
+ st = &sq->streams[sq->nb_streams];
+ memset(st, 0, sizeof(*st));
+
+ st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW);
+ if (!st->fifo)
+ return AVERROR(ENOMEM);
+
+ /* we set a valid default, so that a pathological stream that never
+ * receives even a real timebase (and no frames) won't stall all other
+ * streams forever; cf. overflow_heartbeat() */
+ st->tb = (AVRational){ 1, 1 };
+ st->head_ts = AV_NOPTS_VALUE;
+
+ return sq->nb_streams++;
+}
+
+void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
+{
+ SyncQueueStream *st;
+
+ av_assert0(stream_idx < sq->nb_streams);
+ st = &sq->streams[stream_idx];
+
+ av_assert0(!av_fifo_can_read(st->fifo));
+
+ if (st->head_ts != AV_NOPTS_VALUE)
+ st->head_ts = av_rescale_q(st->head_ts, st->tb, tb);
+
+ st->tb = tb;
+}
+
+SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us)
+{
+ SyncQueue *sq = av_mallocz(sizeof(*sq));
+
+ if (!sq)
+ return NULL;
+
+ sq->type = type;
+ sq->buf_size_us = buf_size_us;
+
+ sq->head_stream = -1;
+ sq->head_finished_stream = -1;
+
+ sq->pool = (type == SYNC_QUEUE_PACKETS) ? objpool_alloc_packets() :
+ objpool_alloc_frames();
+ if (!sq->pool) {
+ av_freep(&sq);
+ return NULL;
+ }
+
+ return sq;
+}
+
+void sq_free(SyncQueue **psq)
+{
+ SyncQueue *sq = *psq;
+
+ if (!sq)
+ return;
+
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueFrame frame;
+ while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0)
+ objpool_release(sq->pool, (void**)&frame);
+
+ av_fifo_freep2(&sq->streams[i].fifo);
+ }
+
+ av_freep(&sq->streams);
+
+ objpool_free(&sq->pool);
+
+ av_freep(psq);
+}
diff --git a/fftools/sync_queue.h b/fftools/sync_queue.h
new file mode 100644
index 0000000000..e08780b7bf
--- /dev/null
+++ b/fftools/sync_queue.h
@@ -0,0 +1,100 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef FFTOOLS_SYNC_QUEUE_H
+#define FFTOOLS_SYNC_QUEUE_H
+
+#include <stdint.h>
+
+#include "libavcodec/packet.h"
+
+#include "libavutil/frame.h"
+
+enum SyncQueueType {
+ SYNC_QUEUE_PACKETS,
+ SYNC_QUEUE_FRAMES,
+};
+
+typedef union SyncQueueFrame {
+ AVFrame *f;
+ AVPacket *p;
+} SyncQueueFrame;
+
+#define SQFRAME(frame) ((SyncQueueFrame){ .f = (frame) })
+#define SQPKT(pkt) ((SyncQueueFrame){ .p = (pkt) })
+
+typedef struct SyncQueue SyncQueue;
+
+/**
+ * Allocate a sync queue of the given type.
+ *
+ * @param buf_size_us maximum duration that will be buffered in microseconds
+ */
+SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us);
+void sq_free(SyncQueue **sq);
+
+/**
+ * Add a new stream to the sync queue.
+ *
+ * @return
+ * - a non-negative stream index on success
+ * - a negative error code on error
+ */
+int sq_add_stream(SyncQueue *sq);
+
+/**
+ * Set the timebase for the stream with index stream_idx. Should be called
+ * before sending any frames for this stream.
+ */
+void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb);
+
+/**
+ * Submit a frame for the stream with index stream_idx.
+ *
+ * On success, the sync queue takes ownership of the frame and will reset the
+ * contents of the supplied frame. On failure, the frame remains owned by the
+ * caller.
+ *
+ * Sending a frame with NULL contents marks the stream as finished.
+ *
+ * @return
+ * - 0 on success
+ * - AVERROR_EOF when no more frames should be submitted for this stream
+ * - another a negative error code on failure
+ */
+int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame);
+
+/**
+ * Read a frame from the queue.
+ *
+ * @param stream_idx index of the stream to read a frame for. May be -1, then
+ * try to read a frame from any stream that is ready for
+ * output.
+ * @param frame output frame will be written here on success. The frame is owned
+ * by the caller.
+ *
+ * @return
+ * - a non-negative index of the stream to which the returned frame belongs
+ * - AVERROR(EAGAIN) when more frames need to be submitted to the queue
+ * - AVERROR_EOF when no more frames will be available for this stream (for any
+ * stream if stream_idx is -1)
+ * - another negative error code on failure
+ */
+int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame);
+
+#endif // FFTOOLS_SYNC_QUEUE_H