summaryrefslogtreecommitdiff
path: root/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'threadpool.c')
-rw-r--r--threadpool.c215
1 files changed, 215 insertions, 0 deletions
diff --git a/threadpool.c b/threadpool.c
new file mode 100644
index 0000000..50d8cb7
--- /dev/null
+++ b/threadpool.c
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2018 Anton Khirnov <anton@khirnov.net>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <errno.h>
+#include <pthread.h>
+#include <stdatomic.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+
+#include "threadpool.h"
+#include "worker.h"
+
+#include <stdio.h>
+
+#define MIN(x, y) ((x) < (y) ? (x) : (y))
+
+struct TPContext {
+ struct {
+ Worker *w;
+ WorkBatch wb;
+ int have_work;
+
+ pthread_mutex_t work_lock;
+ pthread_cond_t work_cond;
+
+ atomic_int work_completed;
+ atomic_int queue_empty;
+
+ unsigned int rand_seed;
+ } *workers;
+ unsigned int nb_workers;
+
+ atomic_int finish;
+
+ pthread_mutex_t execute_lock;
+
+ pthread_mutex_t progress_lock;
+ pthread_cond_t progress_cond;
+ atomic_int nb_workers_finished;
+
+ atomic_int nb_workers_queue_empty;
+};
+
+void tp_free(TPContext **pctx)
+{
+ TPContext *ctx = *pctx;
+
+ if (!ctx)
+ return;
+
+ atomic_store(&ctx->finish, 1);
+
+ for (unsigned int i = 0; i < ctx->nb_workers; i++) {
+ pthread_mutex_lock(&ctx->workers[i].work_lock);
+ pthread_cond_signal(&ctx->workers[i].work_cond);
+ pthread_mutex_unlock(&ctx->workers[i].work_lock);
+
+ tpi_worker_destroy(&ctx->workers[i].w);
+ pthread_mutex_destroy(&ctx->workers[i].work_lock);
+ pthread_cond_destroy(&ctx->workers[i].work_cond);
+ }
+ free(ctx->workers);
+ ctx->nb_workers = 0;
+
+ pthread_mutex_destroy(&ctx->execute_lock);
+
+ free(ctx);
+ *pctx = NULL;
+}
+
+int tp_init(TPContext **pctx, unsigned int nb_threads)
+{
+ TPContext *ctx = NULL;
+ int ret;
+
+ if (!nb_threads) {
+ ret = -EINVAL;
+ goto fail;
+ }
+
+ ctx = calloc(1, sizeof(*ctx));
+ if (!ctx) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ pthread_mutex_init(&ctx->execute_lock, NULL);
+ atomic_store(&ctx->finish, 0);
+
+ ctx->workers = calloc(nb_threads, sizeof(*ctx->workers));
+ if (!ctx->workers) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ for (unsigned int i = 0; i < nb_threads; i++) {
+ struct timeval tv;
+
+ pthread_mutex_init(&ctx->workers[i].work_lock, NULL);
+ pthread_cond_init(&ctx->workers[i].work_cond, NULL);
+
+ gettimeofday(&tv, NULL);
+ ctx->workers[i].rand_seed = tv.tv_usec;
+
+ ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx);
+ if (ret < 0)
+ goto fail;
+ ctx->nb_workers++;
+ }
+
+ *pctx = ctx;
+ return 0;
+
+fail:
+ tp_free(&ctx);
+ *pctx = NULL;
+ return ret;
+}
+
+int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx,
+ WorkBatch *wb)
+{
+ pthread_mutex_lock(&ctx->progress_lock);
+ atomic_store(&ctx->workers[worker_idx].work_completed, 1);
+ atomic_fetch_add(&ctx->nb_workers_finished, 1);
+ pthread_cond_signal(&ctx->progress_cond);
+ pthread_mutex_unlock(&ctx->progress_lock);
+
+ pthread_mutex_lock(&ctx->workers[worker_idx].work_lock);
+
+ while (!atomic_load(&ctx->finish) && !ctx->workers[worker_idx].have_work)
+ pthread_cond_wait(&ctx->workers[worker_idx].work_cond, &ctx->workers[worker_idx].work_lock);
+
+ memcpy(wb, &ctx->workers[worker_idx].wb, sizeof(*wb));
+ wb->finish = atomic_load(&ctx->finish);
+ ctx->workers[worker_idx].have_work = 0;
+
+ pthread_mutex_unlock(&ctx->workers[worker_idx].work_lock);
+
+ return 0;
+}
+
+int tpi_worksource_steal_job(TPContext *ctx, unsigned int worker_idx)
+{
+ int ret = -1;
+
+ atomic_store(&ctx->workers[worker_idx].queue_empty, 1);
+ atomic_fetch_add(&ctx->nb_workers_queue_empty, 1);
+
+ while (atomic_load(&ctx->nb_workers_queue_empty) < ctx->nb_workers) {
+ unsigned int w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers;
+
+ if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty))
+ continue;
+
+ ret = tpi_worker_steal_job(ctx->workers[w_victim_idx].w);
+ if (ret >= 0)
+ break;
+ }
+
+ return ret;
+}
+
+int tp_execute(TPContext *ctx, unsigned int nb_jobs,
+ TPExecuteCallback func, void *func_arg)
+{
+ unsigned int max_batch_size = (nb_jobs + ctx->nb_workers - 1) / ctx->nb_workers;
+
+ pthread_mutex_lock(&ctx->execute_lock);
+
+ atomic_store(&ctx->nb_workers_finished, 0);
+ atomic_store(&ctx->nb_workers_queue_empty, 0);
+
+ for (unsigned int i = 0; i < ctx->nb_workers; i++) {
+ WorkBatch *wb = &ctx->workers[i].wb;
+
+ pthread_mutex_lock(&ctx->workers[i].work_lock);
+
+ wb->job_func = func;
+ wb->func_arg = func_arg;
+ wb->job_idx_start = i * max_batch_size;
+ wb->job_idx_end = MIN((i + 1) * max_batch_size, nb_jobs);
+ ctx->workers[i].have_work = 1;
+ atomic_store(&ctx->workers[i].work_completed, 0);
+ atomic_store(&ctx->workers[i].queue_empty, 0);
+
+ pthread_cond_signal(&ctx->workers[i].work_cond);
+
+ pthread_mutex_unlock(&ctx->workers[i].work_lock);
+ }
+
+ pthread_mutex_lock(&ctx->progress_lock);
+ while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers)
+ pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock);
+ pthread_mutex_unlock(&ctx->progress_lock);
+
+ pthread_mutex_unlock(&ctx->execute_lock);
+
+ return 0;
+}