/* * Copyright 2018 Anton Khirnov * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include "threadpool.h" #include "worker.h" #include #define MIN(x, y) ((x) < (y) ? (x) : (y)) struct TPContext { struct { Worker *w; WorkBatch wb; int have_work; pthread_mutex_t work_lock; pthread_cond_t work_cond; atomic_int work_completed; atomic_int queue_empty; unsigned int rand_seed; } *workers; unsigned int nb_workers; atomic_int finish; pthread_mutex_t execute_lock; pthread_mutex_t progress_lock; pthread_cond_t progress_cond; atomic_int nb_workers_finished; atomic_int nb_workers_queue_empty; }; void tp_free(TPContext **pctx) { TPContext *ctx = *pctx; if (!ctx) return; atomic_store(&ctx->finish, 1); for (unsigned int i = 0; i < ctx->nb_workers; i++) { pthread_mutex_lock(&ctx->workers[i].work_lock); pthread_cond_signal(&ctx->workers[i].work_cond); pthread_mutex_unlock(&ctx->workers[i].work_lock); tpi_worker_destroy(&ctx->workers[i].w); pthread_mutex_destroy(&ctx->workers[i].work_lock); pthread_cond_destroy(&ctx->workers[i].work_cond); } free(ctx->workers); ctx->nb_workers = 0; pthread_mutex_destroy(&ctx->execute_lock); free(ctx); *pctx = NULL; } int tp_init(TPContext **pctx, unsigned int nb_threads) { TPContext *ctx = NULL; int ret; if (!nb_threads) { ret = -EINVAL; goto fail; } ctx = calloc(1, sizeof(*ctx)); if (!ctx) { ret = -ENOMEM; goto fail; } pthread_mutex_init(&ctx->execute_lock, NULL); atomic_store(&ctx->finish, 0); ctx->workers = calloc(nb_threads, sizeof(*ctx->workers)); if (!ctx->workers) { ret = -ENOMEM; goto fail; } for (unsigned int i = 0; i < nb_threads; i++) { struct timeval tv; pthread_mutex_init(&ctx->workers[i].work_lock, NULL); pthread_cond_init(&ctx->workers[i].work_cond, NULL); gettimeofday(&tv, NULL); ctx->workers[i].rand_seed = tv.tv_usec; ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx); if (ret < 0) goto fail; ctx->nb_workers++; } *pctx = ctx; return 0; fail: tp_free(&ctx); *pctx = NULL; return ret; } int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx, WorkBatch *wb) { pthread_mutex_lock(&ctx->progress_lock); atomic_store(&ctx->workers[worker_idx].work_completed, 1); atomic_fetch_add(&ctx->nb_workers_finished, 1); pthread_cond_signal(&ctx->progress_cond); pthread_mutex_unlock(&ctx->progress_lock); pthread_mutex_lock(&ctx->workers[worker_idx].work_lock); while (!atomic_load(&ctx->finish) && !ctx->workers[worker_idx].have_work) pthread_cond_wait(&ctx->workers[worker_idx].work_cond, &ctx->workers[worker_idx].work_lock); memcpy(wb, &ctx->workers[worker_idx].wb, sizeof(*wb)); wb->finish = atomic_load(&ctx->finish); ctx->workers[worker_idx].have_work = 0; pthread_mutex_unlock(&ctx->workers[worker_idx].work_lock); return 0; } int tpi_worksource_steal_job(TPContext *ctx, unsigned int worker_idx) { int ret = -1; atomic_store(&ctx->workers[worker_idx].queue_empty, 1); atomic_fetch_add(&ctx->nb_workers_queue_empty, 1); while (atomic_load(&ctx->nb_workers_queue_empty) < ctx->nb_workers) { unsigned int w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers; if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty)) continue; ret = tpi_worker_steal_job(ctx->workers[w_victim_idx].w); if (ret >= 0) break; } return ret; } int tp_execute(TPContext *ctx, unsigned int nb_jobs, TPExecuteCallback func, void *func_arg) { unsigned int max_batch_size = (nb_jobs + ctx->nb_workers - 1) / ctx->nb_workers; pthread_mutex_lock(&ctx->execute_lock); atomic_store(&ctx->nb_workers_finished, 0); atomic_store(&ctx->nb_workers_queue_empty, 0); for (unsigned int i = 0; i < ctx->nb_workers; i++) { WorkBatch *wb = &ctx->workers[i].wb; pthread_mutex_lock(&ctx->workers[i].work_lock); wb->job_func = func; wb->func_arg = func_arg; wb->job_idx_start = i * max_batch_size; wb->job_idx_end = MIN((i + 1) * max_batch_size, nb_jobs); ctx->workers[i].have_work = 1; atomic_store(&ctx->workers[i].work_completed, 0); atomic_store(&ctx->workers[i].queue_empty, 0); pthread_cond_signal(&ctx->workers[i].work_cond); pthread_mutex_unlock(&ctx->workers[i].work_lock); } pthread_mutex_lock(&ctx->progress_lock); while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock); pthread_mutex_unlock(&ctx->progress_lock); pthread_mutex_unlock(&ctx->execute_lock); return 0; }