/* * Copyright 2016 Anton Khirnov * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include "cpu.h" #include "threadpool.h" typedef struct WorkerContext { ThreadPoolContext *parent; pthread_t thread; unsigned int idx; } WorkerContext; struct ThreadPoolContext { WorkerContext *workers; unsigned int nb_workers; pthread_mutex_t mutex; pthread_cond_t cond; void (*func)(void *arg, unsigned int job_idx, unsigned int nb_jobs, unsigned int thread_idx, unsigned int nb_threads); void *func_arg; int next_job; int nb_jobs; int nb_jobs_finished; int finish; }; void *worker_thread(void *arg) { WorkerContext *w = arg; ThreadPoolContext *ctx = w->parent; int nb_jobs, job_idx; while (1) { pthread_mutex_lock(&ctx->mutex); while (!ctx->finish && ctx->next_job >= ctx->nb_jobs) pthread_cond_wait(&ctx->cond, &ctx->mutex); if (ctx->finish) { pthread_mutex_unlock(&ctx->mutex); break; } nb_jobs = ctx->nb_jobs; job_idx = ctx->next_job++; pthread_mutex_unlock(&ctx->mutex); ctx->func(ctx->func_arg, job_idx, nb_jobs, w->idx, ctx->nb_workers); pthread_mutex_lock(&ctx->mutex); ctx->nb_jobs_finished++; pthread_cond_broadcast(&ctx->cond); pthread_mutex_unlock(&ctx->mutex); } return NULL; } int bdi_threadpool_init(ThreadPoolContext **pctx, unsigned int nb_threads) { ThreadPoolContext *ctx; int ret = 0; if (!nb_threads) { nb_threads = bdi_cpu_count(); if (!nb_threads) return -ENOSYS; } ctx = calloc(1, sizeof(*ctx)); if (!ctx) return -ENOMEM; pthread_mutex_init(&ctx->mutex, NULL); pthread_cond_init(&ctx->cond, NULL); ctx->workers = calloc(nb_threads, sizeof(*ctx->workers)); if (!ctx->workers) { ret = -ENOMEM; goto fail; } for (int i = 0; i < nb_threads; i++) { WorkerContext *w = &ctx->workers[i]; w->idx = i; w->parent = ctx; ret = pthread_create(&w->thread, NULL, worker_thread, w); if (ret) { ret = -ret; goto fail; } ctx->nb_workers++; } *pctx = ctx; return 0; fail: bdi_threadpool_free(&ctx); return ret; } void bdi_threadpool_free(ThreadPoolContext **pctx) { ThreadPoolContext *ctx = *pctx; if (!ctx) return; pthread_mutex_lock(&ctx->mutex); ctx->finish = 1; pthread_cond_broadcast(&ctx->cond); pthread_mutex_unlock(&ctx->mutex); for (int i = 0; i < ctx->nb_workers; i++) { WorkerContext *w = &ctx->workers[i]; pthread_join(w->thread, NULL); } pthread_mutex_destroy(&ctx->mutex); pthread_cond_destroy(&ctx->cond); free(ctx->workers); free(ctx); *pctx = NULL; } void bdi_threadpool_execute(ThreadPoolContext *ctx, unsigned int nb_jobs, void (*func)(void *arg, unsigned int job_idx, unsigned int nb_jobs, unsigned int thread_idx, unsigned int nb_threads), void *arg) { pthread_mutex_lock(&ctx->mutex); ctx->func = func; ctx->func_arg = arg; ctx->nb_jobs = nb_jobs; ctx->nb_jobs_finished = 0; ctx->next_job = 0; pthread_cond_broadcast(&ctx->cond); while (ctx->nb_jobs_finished < ctx->nb_jobs) pthread_cond_wait(&ctx->cond, &ctx->mutex); ctx->func = NULL; ctx->func_arg = NULL; pthread_mutex_unlock(&ctx->mutex); }