From 00f4fd0308e9200293e0f67b8d20f1e4b8a857a0 Mon Sep 17 00:00:00 2001 From: Anton Khirnov Date: Thu, 29 Feb 2024 18:50:02 +0100 Subject: fftools/ffmpeg_sched: allow connecting encoder output to decoders --- fftools/ffmpeg_sched.c | 212 +++++++++++++++++++++++++++++++++++++++++-------- fftools/ffmpeg_sched.h | 8 +- 2 files changed, 181 insertions(+), 39 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 5f8ef04680..d1fb942c34 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1051,24 +1051,43 @@ int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst) } case SCH_NODE_TYPE_ENC: { SchEnc *enc; - SchMuxStream *ms; av_assert0(src.idx < sch->nb_enc); - // encoding packets go to muxing - av_assert0(dst.type == SCH_NODE_TYPE_MUX && - dst.idx < sch->nb_mux && - dst.idx_stream < sch->mux[dst.idx].nb_streams); enc = &sch->enc[src.idx]; - ms = &sch->mux[dst.idx].streams[dst.idx_stream]; - - av_assert0(!ms->src.type); ret = GROW_ARRAY(enc->dst, enc->nb_dst); if (ret < 0) return ret; enc->dst[enc->nb_dst - 1] = dst; - ms->src = src; + + // encoding packets go to muxing or decoding + switch (dst.type) { + case SCH_NODE_TYPE_MUX: { + SchMuxStream *ms; + + av_assert0(dst.idx < sch->nb_mux && + dst.idx_stream < sch->mux[dst.idx].nb_streams); + ms = &sch->mux[dst.idx].streams[dst.idx_stream]; + + av_assert0(!ms->src.type); + ms->src = src; + + break; + } + case SCH_NODE_TYPE_DEC: { + SchDec *dec; + + av_assert0(dst.idx < sch->nb_dec); + dec = &sch->dec[dst.idx]; + + av_assert0(!dec->src.type); + dec->src = src; + + break; + } + default: av_assert0(0); + } break; } @@ -1217,6 +1236,31 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ return 0; } +static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) +{ + while (1) { + SchFilterGraph *fg; + + // fed directly by a demuxer (i.e. not through a filtergraph) + if (src.type == SCH_NODE_TYPE_DEMUX) { + sch->demux[src.idx].waiter.choked_next = 0; + return; + } + + av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT); + fg = &sch->filters[src.idx]; + + // the filtergraph contains internal sources and + // requested to be scheduled directly + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + return; + } + + src = fg->inputs[fg->best_input].src_sched; + } +} + static void schedule_update_locked(Scheduler *sch) { int64_t dts; @@ -1245,7 +1289,6 @@ static void schedule_update_locked(Scheduler *sch) for (unsigned j = 0; j < mux->nb_streams; j++) { SchMuxStream *ms = &mux->streams[j]; - SchDemux *d; // unblock sources for output streams that are not finished // and not too far ahead of the trailing stream @@ -1256,28 +1299,9 @@ static void schedule_update_locked(Scheduler *sch) if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) continue; - // for outputs fed from filtergraphs, consider that filtergraph's - // best_input information, in other cases there is a well-defined - // source demuxer - if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) { - SchFilterGraph *fg = &sch->filters[ms->src_sched.idx]; - SchFilterIn *fi; - - // the filtergraph contains internal sources and - // requested to be scheduled directly - if (fg->best_input == fg->nb_inputs) { - fg->waiter.choked_next = 0; - have_unchoked = 1; - continue; - } - - fi = &fg->inputs[fg->best_input]; - d = &sch->demux[fi->src_sched.idx]; - } else - d = &sch->demux[ms->src_sched.idx]; - - d->waiter.choked_next = 0; - have_unchoked = 1; + // resolve the source to unchoke + unchoke_for_stream(sch, ms->src_sched); + have_unchoked = 1; } } @@ -1303,6 +1327,105 @@ static void schedule_update_locked(Scheduler *sch) } +enum { + CYCLE_NODE_NEW = 0, + CYCLE_NODE_STARTED, + CYCLE_NODE_DONE, +}; + +static int +check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, + uint8_t *filters_visited, SchedulerNode *filters_stack) +{ + unsigned nb_filters_stack = 0; + + memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited)); + + while (1) { + const SchFilterGraph *fg = &sch->filters[src.idx]; + + filters_visited[src.idx] = CYCLE_NODE_STARTED; + + // descend into every input, depth first + if (src.idx_stream < fg->nb_inputs) { + const SchFilterIn *fi = &fg->inputs[src.idx_stream++]; + + // connected to demuxer, no cycles possible + if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX) + continue; + + // otherwise connected to another filtergraph + av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); + + // found a cycle + if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED) + return AVERROR(EINVAL); + + // place current position on stack and descend + av_assert0(nb_filters_stack < sch->nb_filters); + filters_stack[nb_filters_stack++] = src; + src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 }; + continue; + } + + filters_visited[src.idx] = CYCLE_NODE_DONE; + + // previous search finished, + if (nb_filters_stack) { + src = filters_stack[--nb_filters_stack]; + continue; + } + return 0; + } +} + +static int check_acyclic(Scheduler *sch) +{ + uint8_t *filters_visited = NULL; + SchedulerNode *filters_stack = NULL; + + int ret = 0; + + if (!sch->nb_filters) + return 0; + + filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited)); + if (!filters_visited) + return AVERROR(ENOMEM); + + filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack)); + if (!filters_stack) { + ret = AVERROR(ENOMEM); + goto fail; + } + + // trace the transcoding graph upstream from every output stream + // fed by a filtergraph + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + SchedulerNode src = ms->src_sched; + + if (src.type != SCH_NODE_TYPE_FILTER_OUT) + continue; + src.idx_stream = 0; + + ret = check_acyclic_for_output(sch, src, filters_visited, filters_stack); + if (ret < 0) { + av_log(mux, AV_LOG_ERROR, "Transcoding graph has a cycle\n"); + goto fail; + } + } + } + +fail: + av_freep(&filters_visited); + av_freep(&filters_stack); + return ret; +} + static int start_prepare(Scheduler *sch) { int ret; @@ -1402,14 +1525,21 @@ static int start_prepare(Scheduler *sch) for (unsigned j = 0; j < fg->nb_inputs; j++) { SchFilterIn *fi = &fg->inputs[j]; + SchDec *dec; if (!fi->src.type) { av_log(fg, AV_LOG_ERROR, "Filtergraph input %u not connected to a source\n", j); return AVERROR(EINVAL); } + av_assert0(fi->src.type == SCH_NODE_TYPE_DEC); + dec = &sch->dec[fi->src.idx]; - fi->src_sched = sch->dec[fi->src.idx].src; + switch (dec->src.type) { + case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break; + case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break; + default: av_assert0(0); + } } for (unsigned j = 0; j < fg->nb_outputs; j++) { @@ -1423,6 +1553,11 @@ static int start_prepare(Scheduler *sch) } } + // Check that the transcoding graph has no cycles. + ret = check_acyclic(sch); + if (ret < 0) + return ret; + return 0; } @@ -1575,6 +1710,8 @@ static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame) SchMux *mux; SchMuxStream *ms; + if (enc->dst[i].type != SCH_NODE_TYPE_MUX) + continue; mux = &sch->mux[enc->dst[i].idx]; ms = &mux->streams[enc->dst[i].idx_stream]; @@ -2150,14 +2287,19 @@ static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, if (!pkt) goto finish; - ret = send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt); + ret = (dst.type == SCH_NODE_TYPE_MUX) ? + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) : + tq_send(sch->dec[dst.idx].queue, 0, pkt); if (ret == AVERROR_EOF) goto finish; return ret; finish: - send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); + if (dst.type == SCH_NODE_TYPE_MUX) + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); + else + tq_send_finish(sch->dec[dst.idx].queue, 0); *dst_finished = 1; diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h index fc6711f9c3..a9190bd3d1 100644 --- a/fftools/ffmpeg_sched.h +++ b/fftools/ffmpeg_sched.h @@ -35,9 +35,9 @@ * - demuxers, each containing any number of demuxed streams; demuxed packets * belonging to some stream are sent to any number of decoders (transcoding) * and/or muxers (streamcopy); - * - decoders, which receive encoded packets from some demuxed stream, decode - * them, and send decoded frames to any number of filtergraph inputs - * (audio/video) or encoders (subtitles); + * - decoders, which receive encoded packets from some demuxed stream or + * encoder, decode them, and send decoded frames to any number of filtergraph + * inputs (audio/video) or encoders (subtitles); * - filtergraphs, each containing zero or more inputs (0 in case the * filtergraph contains a lavfi source filter), and one or more outputs; the * inputs and outputs need not have matching media types; @@ -45,7 +45,7 @@ * filtered frames from each output are sent to some encoder; * - encoders, which receive decoded frames from some decoder (subtitles) or * some filtergraph output (audio/video), encode them, and send encoded - * packets to some muxed stream; + * packets to any number of muxed streams or decoders; * - muxers, each containing any number of muxed streams; each muxed stream * receives encoded packets from some demuxed stream (streamcopy) or some * encoder (transcoding); those packets are interleaved and written out by the -- cgit v1.2.3