From d8e21cfe33c9fd98109374510311479955684412 Mon Sep 17 00:00:00 2001 From: Anton Khirnov Date: Fri, 30 Jun 2017 09:35:21 +0200 Subject: Add a threadpool implementation. --- Makefile | 1 + threadpool.c | 178 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ threadpool.h | 32 +++++++++++ 3 files changed, 211 insertions(+) create mode 100644 threadpool.c create mode 100644 threadpool.h diff --git a/Makefile b/Makefile index c868322..2239dc6 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ OBJS = basis.o \ log.o \ qfunc.o \ solve.o \ + threadpool.o \ all: $(TARGET) diff --git a/threadpool.c b/threadpool.c new file mode 100644 index 0000000..85ac296 --- /dev/null +++ b/threadpool.c @@ -0,0 +1,178 @@ +/* + * Copyright 2016 Anton Khirnov + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include + +#include "cpu.h" +#include "threadpool.h" + +typedef struct WorkerContext { + ThreadPoolContext *parent; + pthread_t thread; + unsigned int idx; +} WorkerContext; + +struct ThreadPoolContext { + WorkerContext *workers; + unsigned int nb_workers; + + pthread_mutex_t mutex; + pthread_cond_t cond; + void (*func)(void *arg, + unsigned int job_idx, unsigned int nb_jobs, + unsigned int thread_idx, unsigned int nb_threads); + void *func_arg; + int next_job; + int nb_jobs; + int nb_jobs_finished; + + int finish; +}; + +void *worker_thread(void *arg) +{ + WorkerContext *w = arg; + ThreadPoolContext *ctx = w->parent; + int nb_jobs, job_idx; + + while (1) { + pthread_mutex_lock(&ctx->mutex); + while (!ctx->finish && ctx->next_job >= ctx->nb_jobs) + pthread_cond_wait(&ctx->cond, &ctx->mutex); + + if (ctx->finish) { + pthread_mutex_unlock(&ctx->mutex); + break; + } + + nb_jobs = ctx->nb_jobs; + job_idx = ctx->next_job++; + + pthread_mutex_unlock(&ctx->mutex); + + ctx->func(ctx->func_arg, job_idx, nb_jobs, w->idx, ctx->nb_workers); + + pthread_mutex_lock(&ctx->mutex); + + ctx->nb_jobs_finished++; + + pthread_cond_broadcast(&ctx->cond); + pthread_mutex_unlock(&ctx->mutex); + } + return NULL; +} + +int bdi_threadpool_init(ThreadPoolContext **pctx, unsigned int nb_threads) +{ + ThreadPoolContext *ctx; + int ret = 0; + + if (!nb_threads) { + nb_threads = bdi_cpu_count(); + if (!nb_threads) + return -ENOSYS; + } + + ctx = calloc(1, sizeof(*ctx)); + if (!ctx) + return -ENOMEM; + + pthread_mutex_init(&ctx->mutex, NULL); + pthread_cond_init(&ctx->cond, NULL); + + ctx->workers = calloc(nb_threads, sizeof(*ctx->workers)); + if (!ctx->workers) { + ret = -ENOMEM; + goto fail; + } + + for (int i = 0; i < nb_threads; i++) { + WorkerContext *w = &ctx->workers[i]; + + w->idx = i; + w->parent = ctx; + + ret = pthread_create(&w->thread, NULL, worker_thread, w); + if (ret) { + ret = -ret; + goto fail; + } + + ctx->nb_workers++; + } + + + *pctx = ctx; + return 0; +fail: + bdi_threadpool_free(&ctx); + return ret; +} + +void bdi_threadpool_free(ThreadPoolContext **pctx) +{ + ThreadPoolContext *ctx = *pctx; + + if (!ctx) + return; + + pthread_mutex_lock(&ctx->mutex); + ctx->finish = 1; + pthread_cond_broadcast(&ctx->cond); + pthread_mutex_unlock(&ctx->mutex); + + + for (int i = 0; i < ctx->nb_workers; i++) { + WorkerContext *w = &ctx->workers[i]; + pthread_join(w->thread, NULL); + } + + pthread_mutex_destroy(&ctx->mutex); + pthread_cond_destroy(&ctx->cond); + + free(ctx->workers); + + free(ctx); + *pctx = NULL; +} + +void bdi_threadpool_execute(ThreadPoolContext *ctx, unsigned int nb_jobs, + void (*func)(void *arg, + unsigned int job_idx, unsigned int nb_jobs, + unsigned int thread_idx, unsigned int nb_threads), + void *arg) +{ + pthread_mutex_lock(&ctx->mutex); + + ctx->func = func; + ctx->func_arg = arg; + + ctx->nb_jobs = nb_jobs; + ctx->nb_jobs_finished = 0; + ctx->next_job = 0; + + pthread_cond_broadcast(&ctx->cond); + while (ctx->nb_jobs_finished < ctx->nb_jobs) + pthread_cond_wait(&ctx->cond, &ctx->mutex); + + ctx->func = NULL; + ctx->func_arg = NULL; + + pthread_mutex_unlock(&ctx->mutex); +} diff --git a/threadpool.h b/threadpool.h new file mode 100644 index 0000000..47d1bd4 --- /dev/null +++ b/threadpool.h @@ -0,0 +1,32 @@ +/* + * Copyright 2016 Anton Khirnov + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef BRILL_DATA_THREADPOOL_H +#define BRILL_DATA_THREADPOOL_H + +typedef struct ThreadPoolContext ThreadPoolContext; + +int bdi_threadpool_init(ThreadPoolContext **ctx, unsigned int nb_threads); +void bdi_threadpool_free(ThreadPoolContext **ctx); + +void bdi_threadpool_execute(ThreadPoolContext *ctx, unsigned int nb_jobs, + void (*func)(void *arg, + unsigned int job_idx, unsigned int nb_jobs, + unsigned int thread_idx, unsigned int nb_threads), + void *arg); + +#endif /* BRILL_DATA_THREADPOOL_H */ -- cgit v1.2.3