diff options
-rw-r--r-- | libavfilter/dnn/dnn_backend_openvino.c | 157 |
1 files changed, 115 insertions, 42 deletions
diff --git a/libavfilter/dnn/dnn_backend_openvino.c b/libavfilter/dnn/dnn_backend_openvino.c index 267c154c87..a8a02d7589 100644 --- a/libavfilter/dnn/dnn_backend_openvino.c +++ b/libavfilter/dnn/dnn_backend_openvino.c @@ -54,8 +54,10 @@ typedef struct OVModel{ ie_executable_network_t *exe_network; SafeQueue *request_queue; // holds RequestItem Queue *task_queue; // holds TaskItem + Queue *inference_queue; // holds InferenceItem } OVModel; +// one task for one function call from dnn interface typedef struct TaskItem { OVModel *ov_model; const char *input_name; @@ -64,13 +66,20 @@ typedef struct TaskItem { AVFrame *out_frame; int do_ioproc; int async; - int done; + uint32_t inference_todo; + uint32_t inference_done; } TaskItem; +// one task might have multiple inferences +typedef struct InferenceItem { + TaskItem *task; +} InferenceItem; + +// one request for one call to openvino typedef struct RequestItem { ie_infer_request_t *infer_request; - TaskItem **tasks; - int task_count; + InferenceItem **inferences; + uint32_t inference_count; ie_complete_call_back_t callback; } RequestItem; @@ -127,7 +136,12 @@ static DNNReturnType fill_model_input_ov(OVModel *ov_model, RequestItem *request IEStatusCode status; DNNData input; ie_blob_t *input_blob = NULL; - TaskItem *task = request->tasks[0]; + InferenceItem *inference; + TaskItem *task; + + inference = ff_queue_peek_front(ov_model->inference_queue); + av_assert0(inference); + task = inference->task; status = ie_infer_request_get_blob(request->infer_request, task->input_name, &input_blob); if (status != OK) { @@ -159,9 +173,14 @@ static DNNReturnType fill_model_input_ov(OVModel *ov_model, RequestItem *request // change to be an option when necessary. input.order = DCO_BGR; - av_assert0(request->task_count <= dims.dims[0]); - for (int i = 0; i < request->task_count; ++i) { - task = request->tasks[i]; + for (int i = 0; i < ctx->options.batch_size; ++i) { + inference = ff_queue_pop_front(ov_model->inference_queue); + if (!inference) { + break; + } + request->inferences[i] = inference; + request->inference_count = i + 1; + task = inference->task; if (task->do_ioproc) { if (ov_model->model->frame_pre_proc != NULL) { ov_model->model->frame_pre_proc(task->in_frame, &input, ov_model->model->filter_ctx); @@ -183,7 +202,8 @@ static void infer_completion_callback(void *args) precision_e precision; IEStatusCode status; RequestItem *request = args; - TaskItem *task = request->tasks[0]; + InferenceItem *inference = request->inferences[0]; + TaskItem *task = inference->task; SafeQueue *requestq = task->ov_model->request_queue; ie_blob_t *output_blob = NULL; ie_blob_buffer_t blob_buffer; @@ -229,10 +249,11 @@ static void infer_completion_callback(void *args) output.dt = precision_to_datatype(precision); output.data = blob_buffer.buffer; - av_assert0(request->task_count <= dims.dims[0]); - av_assert0(request->task_count >= 1); - for (int i = 0; i < request->task_count; ++i) { - task = request->tasks[i]; + av_assert0(request->inference_count <= dims.dims[0]); + av_assert0(request->inference_count >= 1); + for (int i = 0; i < request->inference_count; ++i) { + task = request->inferences[i]->task; + task->inference_done++; switch (task->ov_model->model->func_type) { case DFT_PROCESS_FRAME: @@ -259,13 +280,13 @@ static void infer_completion_callback(void *args) break; } - task->done = 1; + av_freep(&request->inferences[i]); output.data = (uint8_t *)output.data + output.width * output.height * output.channels * get_datatype_size(output.dt); } ie_blob_free(&output_blob); - request->task_count = 0; + request->inference_count = 0; if (ff_safe_queue_push_back(requestq, request) < 0) { av_log(ctx, AV_LOG_ERROR, "Failed to push back request_queue.\n"); return; @@ -370,11 +391,11 @@ static DNNReturnType init_model_ov(OVModel *ov_model, const char *input_name, co goto err; } - item->tasks = av_malloc_array(ctx->options.batch_size, sizeof(*item->tasks)); - if (!item->tasks) { + item->inferences = av_malloc_array(ctx->options.batch_size, sizeof(*item->inferences)); + if (!item->inferences) { goto err; } - item->task_count = 0; + item->inference_count = 0; } ov_model->task_queue = ff_queue_create(); @@ -382,6 +403,11 @@ static DNNReturnType init_model_ov(OVModel *ov_model, const char *input_name, co goto err; } + ov_model->inference_queue = ff_queue_create(); + if (!ov_model->inference_queue) { + goto err; + } + return DNN_SUCCESS; err: @@ -389,15 +415,24 @@ err: return DNN_ERROR; } -static DNNReturnType execute_model_ov(RequestItem *request) +static DNNReturnType execute_model_ov(RequestItem *request, Queue *inferenceq) { IEStatusCode status; DNNReturnType ret; - TaskItem *task = request->tasks[0]; - OVContext *ctx = &task->ov_model->ctx; + InferenceItem *inference; + TaskItem *task; + OVContext *ctx; + + if (ff_queue_size(inferenceq) == 0) { + return DNN_SUCCESS; + } + + inference = ff_queue_peek_front(inferenceq); + task = inference->task; + ctx = &task->ov_model->ctx; if (task->async) { - if (request->task_count < ctx->options.batch_size) { + if (ff_queue_size(inferenceq) < ctx->options.batch_size) { if (ff_safe_queue_push_front(task->ov_model->request_queue, request) < 0) { av_log(ctx, AV_LOG_ERROR, "Failed to push back request_queue.\n"); return DNN_ERROR; @@ -430,7 +465,7 @@ static DNNReturnType execute_model_ov(RequestItem *request) return DNN_ERROR; } infer_completion_callback(request); - return task->done ? DNN_SUCCESS : DNN_ERROR; + return (task->inference_done == task->inference_todo) ? DNN_SUCCESS : DNN_ERROR; } } @@ -484,6 +519,31 @@ static DNNReturnType get_input_ov(void *model, DNNData *input, const char *input return DNN_ERROR; } +static DNNReturnType extract_inference_from_task(DNNFunctionType func_type, TaskItem *task, Queue *inference_queue) +{ + switch (func_type) { + case DFT_PROCESS_FRAME: + case DFT_ANALYTICS_DETECT: + { + InferenceItem *inference = av_malloc(sizeof(*inference)); + if (!inference) { + return DNN_ERROR; + } + task->inference_todo = 1; + task->inference_done = 0; + inference->task = task; + if (ff_queue_push_back(inference_queue, inference) < 0) { + av_freep(&inference); + return DNN_ERROR; + } + return DNN_SUCCESS; + } + default: + av_assert0(!"should not reach here"); + return DNN_ERROR; + } +} + static DNNReturnType get_output_ov(void *model, const char *input_name, int input_width, int input_height, const char *output_name, int *output_width, int *output_height) { @@ -536,7 +596,6 @@ static DNNReturnType get_output_ov(void *model, const char *input_name, int inpu return DNN_ERROR; } - task.done = 0; task.do_ioproc = 0; task.async = 0; task.input_name = input_name; @@ -545,6 +604,13 @@ static DNNReturnType get_output_ov(void *model, const char *input_name, int inpu task.out_frame = out_frame; task.ov_model = ov_model; + if (extract_inference_from_task(ov_model->model->func_type, &task, ov_model->inference_queue) != DNN_SUCCESS) { + av_frame_free(&out_frame); + av_frame_free(&in_frame); + av_log(ctx, AV_LOG_ERROR, "unable to extract inference from task.\n"); + return DNN_ERROR; + } + request = ff_safe_queue_pop_front(ov_model->request_queue); if (!request) { av_frame_free(&out_frame); @@ -552,9 +618,8 @@ static DNNReturnType get_output_ov(void *model, const char *input_name, int inpu av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n"); return DNN_ERROR; } - request->tasks[request->task_count++] = &task; - ret = execute_model_ov(request); + ret = execute_model_ov(request, ov_model->inference_queue); *output_width = out_frame->width; *output_height = out_frame->height; @@ -657,7 +722,6 @@ DNNReturnType ff_dnn_execute_model_ov(const DNNModel *model, const char *input_n } } - task.done = 0; task.do_ioproc = 1; task.async = 0; task.input_name = input_name; @@ -666,14 +730,18 @@ DNNReturnType ff_dnn_execute_model_ov(const DNNModel *model, const char *input_n task.out_frame = out_frame; task.ov_model = ov_model; + if (extract_inference_from_task(ov_model->model->func_type, &task, ov_model->inference_queue) != DNN_SUCCESS) { + av_log(ctx, AV_LOG_ERROR, "unable to extract inference from task.\n"); + return DNN_ERROR; + } + request = ff_safe_queue_pop_front(ov_model->request_queue); if (!request) { av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n"); return DNN_ERROR; } - request->tasks[request->task_count++] = &task; - return execute_model_ov(request); + return execute_model_ov(request, ov_model->inference_queue); } DNNReturnType ff_dnn_execute_model_async_ov(const DNNModel *model, const char *input_name, AVFrame *in_frame, @@ -707,7 +775,6 @@ DNNReturnType ff_dnn_execute_model_async_ov(const DNNModel *model, const char *i return DNN_ERROR; } - task->done = 0; task->do_ioproc = 1; task->async = 1; task->input_name = input_name; @@ -721,14 +788,18 @@ DNNReturnType ff_dnn_execute_model_async_ov(const DNNModel *model, const char *i return DNN_ERROR; } + if (extract_inference_from_task(ov_model->model->func_type, task, ov_model->inference_queue) != DNN_SUCCESS) { + av_log(ctx, AV_LOG_ERROR, "unable to extract inference from task.\n"); + return DNN_ERROR; + } + request = ff_safe_queue_pop_front(ov_model->request_queue); if (!request) { av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n"); return DNN_ERROR; } - request->tasks[request->task_count++] = task; - return execute_model_ov(request); + return execute_model_ov(request, ov_model->inference_queue); } DNNAsyncStatusType ff_dnn_get_async_result_ov(const DNNModel *model, AVFrame **in, AVFrame **out) @@ -740,7 +811,7 @@ DNNAsyncStatusType ff_dnn_get_async_result_ov(const DNNModel *model, AVFrame **i return DAST_EMPTY_QUEUE; } - if (!task->done) { + if (task->inference_done != task->inference_todo) { return DAST_NOT_READY; } @@ -760,21 +831,17 @@ DNNReturnType ff_dnn_flush_ov(const DNNModel *model) IEStatusCode status; DNNReturnType ret; + if (ff_queue_size(ov_model->inference_queue) == 0) { + // no pending task need to flush + return DNN_SUCCESS; + } + request = ff_safe_queue_pop_front(ov_model->request_queue); if (!request) { av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n"); return DNN_ERROR; } - if (request->task_count == 0) { - // no pending task need to flush - if (ff_safe_queue_push_back(ov_model->request_queue, request) < 0) { - av_log(ctx, AV_LOG_ERROR, "Failed to push back request_queue.\n"); - return DNN_ERROR; - } - return DNN_SUCCESS; - } - ret = fill_model_input_ov(ov_model, request); if (ret != DNN_SUCCESS) { av_log(ctx, AV_LOG_ERROR, "Failed to fill model input.\n"); @@ -803,11 +870,17 @@ void ff_dnn_free_model_ov(DNNModel **model) if (item && item->infer_request) { ie_infer_request_free(&item->infer_request); } - av_freep(&item->tasks); + av_freep(&item->inferences); av_freep(&item); } ff_safe_queue_destroy(ov_model->request_queue); + while (ff_queue_size(ov_model->inference_queue) != 0) { + TaskItem *item = ff_queue_pop_front(ov_model->inference_queue); + av_freep(&item); + } + ff_queue_destroy(ov_model->inference_queue); + while (ff_queue_size(ov_model->task_queue) != 0) { TaskItem *item = ff_queue_pop_front(ov_model->task_queue); av_frame_free(&item->in_frame); |