From 50b1708e140742e87d69c6fd07599bdb100a1517 Mon Sep 17 00:00:00 2001 From: Anton Khirnov Date: Mon, 19 Jun 2023 15:49:24 +0200 Subject: WIP --- Makefile | 5 +- libthreadpool.v | 2 +- threadpool.c | 252 +++++++++++++++++++++++++++++++++++++++++++++----------- threadpool.h | 5 ++ worker.c | 84 ++++++++++++++++--- worker.h | 4 +- 6 files changed, 291 insertions(+), 61 deletions(-) diff --git a/Makefile b/Makefile index d2c4b7d..4c6e883 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,13 @@ -CFLAGS = -std=c11 -D_XOPEN_SOURCE=700 -fPIC -g -O3 -Wall -fsanitize=thread +CFLAGS = -std=c11 -D_XOPEN_SOURCE=700 -D_GNU_SOURCE -fPIC -g -O3 -Wall LIBS=-lm -LDFLAGS = -Wl,--version-script=libthreadpool.v -shared -pthread $(LIBS) -fsanitize=thread +LDFLAGS = -Wl,--version-script=libthreadpool.v -shared -pthread $(LIBS) CC = cc TARGET = libthreadpool.so OBJECTS = \ threadpool.o \ + timer.o \ worker.o \ all: $(TARGET) diff --git a/libthreadpool.v b/libthreadpool.v index e8fe7b5..6597847 100644 --- a/libthreadpool.v +++ b/libthreadpool.v @@ -1,4 +1,4 @@ -LIBTHREADPOOL_1 { +LIBTHREADPOOL_2 { global: tp_*; local: *; }; diff --git a/threadpool.c b/threadpool.c index 50d8cb7..049f406 100644 --- a/threadpool.c +++ b/threadpool.c @@ -23,39 +23,66 @@ #include #include "threadpool.h" +#include "timer.h" #include "worker.h" +#include #include +#define MAX(x, y) ((x) < (y) ? (y) : (x)) #define MIN(x, y) ((x) < (y) ? (x) : (y)) struct TPContext { struct { Worker *w; WorkBatch wb; - int have_work; + atomic_int have_work; pthread_mutex_t work_lock; pthread_cond_t work_cond; - atomic_int work_completed; atomic_int queue_empty; + atomic_int being_robbed; + + unsigned int steal_iterations; + unsigned int steals; unsigned int rand_seed; } *workers; unsigned int nb_workers; + pthread_mutex_t init_lock; + pthread_cond_t init_cond; + atomic_int init_done; atomic_int finish; pthread_mutex_t execute_lock; + pthread_mutex_t submit_lock; + pthread_cond_t submit_cond; + pthread_mutex_t progress_lock; pthread_cond_t progress_cond; atomic_int nb_workers_finished; atomic_int nb_workers_queue_empty; + + Timer timer_execute; + Timer timer_submit; + Timer timer_all; }; +void tpi_worksource_queue_empty(TPContext *ctx, unsigned int worker_idx) +{ + atomic_store(&ctx->workers[worker_idx].queue_empty, 1); + atomic_fetch_add(&ctx->nb_workers_queue_empty, 1); +} + +void pause(void) +{ + __asm__ volatile("rep; nop" ::: "memory"); +} + void tp_free(TPContext **pctx) { TPContext *ctx = *pctx; @@ -63,21 +90,35 @@ void tp_free(TPContext **pctx) if (!ctx) return; + fprintf(stderr, "%ld executes: all %g us; submit %g us; execute %g us\n", + ctx->timer_all.nb_runs, ctx->timer_all.time_nsec / 1e3 / ctx->timer_all.nb_runs, + ctx->timer_submit.time_nsec / 1e3 / ctx->timer_submit.nb_runs, + ctx->timer_execute.time_nsec / 1e3 / ctx->timer_execute.nb_runs); + atomic_store(&ctx->finish, 1); + for (unsigned int i = 0; i < ctx->nb_workers; i++) + atomic_store(&ctx->workers[i].have_work, 1); - for (unsigned int i = 0; i < ctx->nb_workers; i++) { - pthread_mutex_lock(&ctx->workers[i].work_lock); - pthread_cond_signal(&ctx->workers[i].work_cond); - pthread_mutex_unlock(&ctx->workers[i].work_lock); + pthread_mutex_lock(&ctx->submit_lock); + pthread_cond_broadcast(&ctx->submit_cond); + pthread_mutex_unlock(&ctx->submit_lock); + for (unsigned int i = 0; i < ctx->nb_workers; i++) { + fprintf(stderr, "%d %d steals %g iterations/steal\n", i, ctx->workers[i].steals, (double)ctx->workers[i].steal_iterations / ctx->workers[i].steals); tpi_worker_destroy(&ctx->workers[i].w); - pthread_mutex_destroy(&ctx->workers[i].work_lock); - pthread_cond_destroy(&ctx->workers[i].work_cond); } free(ctx->workers); ctx->nb_workers = 0; pthread_mutex_destroy(&ctx->execute_lock); + pthread_mutex_destroy(&ctx->init_lock); + pthread_cond_destroy(&ctx->init_cond); + + pthread_mutex_destroy(&ctx->progress_lock); + pthread_cond_destroy(&ctx->progress_cond); + + pthread_mutex_destroy(&ctx->submit_lock); + pthread_cond_destroy(&ctx->submit_cond); free(ctx); *pctx = NULL; @@ -86,11 +127,24 @@ void tp_free(TPContext **pctx) int tp_init(TPContext **pctx, unsigned int nb_threads) { TPContext *ctx = NULL; + cpu_set_t cpus; + int *cpu_indices; int ret; - if (!nb_threads) { - ret = -EINVAL; - goto fail; + if (!nb_threads) + return -EINVAL; + + ret = sched_getaffinity(0, sizeof(cpus), &cpus); + if (ret != 0) + return errno; + + cpu_indices = malloc(CPU_COUNT(&cpus) * sizeof(*cpu_indices)); + if (!cpu_indices) + return -ENOMEM; + + for (int cur = 0, i = 0; i < CPU_SETSIZE; i++) { + if (CPU_ISSET(i, &cpus)) + cpu_indices[cur++] = i; } ctx = calloc(1, sizeof(*ctx)); @@ -100,7 +154,21 @@ int tp_init(TPContext **pctx, unsigned int nb_threads) } pthread_mutex_init(&ctx->execute_lock, NULL); - atomic_store(&ctx->finish, 0); + atomic_store(&ctx->finish, 0); + + pthread_mutex_init(&ctx->init_lock, NULL); + pthread_cond_init(&ctx->init_cond, NULL); + atomic_store(&ctx->init_done, 0); + + pthread_mutex_init(&ctx->progress_lock, NULL); + pthread_cond_init(&ctx->progress_cond, NULL); + + pthread_mutex_init(&ctx->submit_lock, NULL); + pthread_cond_init(&ctx->submit_cond, NULL); + + tpi_timer_init(&ctx->timer_all); + tpi_timer_init(&ctx->timer_execute); + tpi_timer_init(&ctx->timer_submit); ctx->workers = calloc(nb_threads, sizeof(*ctx->workers)); if (!ctx->workers) { @@ -108,25 +176,44 @@ int tp_init(TPContext **pctx, unsigned int nb_threads) goto fail; } + /* spawn the workers */ for (unsigned int i = 0; i < nb_threads; i++) { struct timeval tv; - pthread_mutex_init(&ctx->workers[i].work_lock, NULL); - pthread_cond_init(&ctx->workers[i].work_cond, NULL); - gettimeofday(&tv, NULL); ctx->workers[i].rand_seed = tv.tv_usec; - ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx); + ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx, cpu_indices[i % CPU_COUNT(&cpus)]); if (ret < 0) - goto fail; + goto fail_spawn; ctx->nb_workers++; } + /* signal to the workers that we are done initializing */ + pthread_mutex_lock(&ctx->init_lock); + atomic_store(&ctx->init_done, 1); + pthread_cond_broadcast(&ctx->init_cond); + pthread_mutex_unlock(&ctx->init_lock); + + /* wait until all the workers are settled */ + pthread_mutex_lock(&ctx->progress_lock); + while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) + pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock); + pthread_mutex_unlock(&ctx->progress_lock); + *pctx = ctx; + free(cpu_indices); + return 0; +fail_spawn: + pthread_mutex_lock(&ctx->init_lock); + atomic_store(&ctx->init_done, 1); + pthread_cond_broadcast(&ctx->init_cond); + pthread_mutex_unlock(&ctx->init_lock); + fail: + free(cpu_indices); tp_free(&ctx); *pctx = NULL; return ret; @@ -135,22 +222,40 @@ fail: int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx, WorkBatch *wb) { - pthread_mutex_lock(&ctx->progress_lock); - atomic_store(&ctx->workers[worker_idx].work_completed, 1); - atomic_fetch_add(&ctx->nb_workers_finished, 1); - pthread_cond_signal(&ctx->progress_cond); - pthread_mutex_unlock(&ctx->progress_lock); + /* if the context is still spawning threads, we wait until it is done */ + if (!atomic_load(&ctx->init_done)) { + pthread_mutex_lock(&ctx->init_lock); + while (!atomic_load(&ctx->init_done)) + pthread_cond_wait(&ctx->init_cond, &ctx->init_lock); + pthread_mutex_unlock(&ctx->init_lock); + } - pthread_mutex_lock(&ctx->workers[worker_idx].work_lock); + /* signal to the context that we are done */ + if (atomic_fetch_add(&ctx->nb_workers_finished, 1) + 1 == ctx->nb_workers) { + pthread_mutex_lock(&ctx->progress_lock); + pthread_cond_signal(&ctx->progress_cond); + pthread_mutex_unlock(&ctx->progress_lock); + } - while (!atomic_load(&ctx->finish) && !ctx->workers[worker_idx].have_work) - pthread_cond_wait(&ctx->workers[worker_idx].work_cond, &ctx->workers[worker_idx].work_lock); + for (int i = 0; i < (1 << 0); i++) { + if (atomic_load(&ctx->workers[worker_idx].have_work)) + break; + if (!(i & ((1 << 8) - 1))) + sched_yield(); + pause(); + } + if (!atomic_load(&ctx->workers[worker_idx].have_work)) { + pthread_mutex_lock(&ctx->submit_lock); + + while (!atomic_load(&ctx->workers[worker_idx].have_work)) + pthread_cond_wait(&ctx->submit_cond, &ctx->submit_lock); + + pthread_mutex_unlock(&ctx->submit_lock); + } memcpy(wb, &ctx->workers[worker_idx].wb, sizeof(*wb)); wb->finish = atomic_load(&ctx->finish); - ctx->workers[worker_idx].have_work = 0; - - pthread_mutex_unlock(&ctx->workers[worker_idx].work_lock); + atomic_store(&ctx->workers[worker_idx].have_work, 0); return 0; } @@ -158,58 +263,113 @@ int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx, int tpi_worksource_steal_job(TPContext *ctx, unsigned int worker_idx) { int ret = -1; + unsigned int w_victim_idx, iterations = 0; - atomic_store(&ctx->workers[worker_idx].queue_empty, 1); - atomic_fetch_add(&ctx->nb_workers_queue_empty, 1); - + w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers; while (atomic_load(&ctx->nb_workers_queue_empty) < ctx->nb_workers) { - unsigned int w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers; + int expected = 0; + + iterations++; - if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty)) + if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty)) { + w_victim_idx = (w_victim_idx + 1) % ctx->nb_workers; continue; + } + + if (!atomic_compare_exchange_strong(&ctx->workers[w_victim_idx].being_robbed, &expected, 1)) { + w_victim_idx = (w_victim_idx + 1) % ctx->nb_workers; + continue; + } ret = tpi_worker_steal_job(ctx->workers[w_victim_idx].w); + atomic_store(&ctx->workers[w_victim_idx].being_robbed, 0); if (ret >= 0) break; } + ctx->workers[worker_idx].steals++; + ctx->workers[worker_idx].steal_iterations += iterations; + return ret; } +static int execute_serial(unsigned int nb_jobs, TPExecuteCallback func, + void *func_arg) +{ + for (unsigned int i = 0; i < nb_jobs; i++) + func(func_arg, i, 0); + + return 0; +} + int tp_execute(TPContext *ctx, unsigned int nb_jobs, TPExecuteCallback func, void *func_arg) { - unsigned int max_batch_size = (nb_jobs + ctx->nb_workers - 1) / ctx->nb_workers; + unsigned int max_batch_size = MAX(1, nb_jobs / ctx->nb_workers); + + if (nb_jobs == 1 || ctx->nb_workers == 1) + return execute_serial(nb_jobs, func, func_arg); + tpi_timer_start(&ctx->timer_all); pthread_mutex_lock(&ctx->execute_lock); - atomic_store(&ctx->nb_workers_finished, 0); + atomic_store(&ctx->nb_workers_finished, 0); atomic_store(&ctx->nb_workers_queue_empty, 0); + tpi_timer_start(&ctx->timer_submit); for (unsigned int i = 0; i < ctx->nb_workers; i++) { WorkBatch *wb = &ctx->workers[i].wb; + unsigned int job_start = i * max_batch_size; + unsigned int job_end = (i == ctx->nb_workers - 1) ? nb_jobs : MIN((i + 1) * max_batch_size, nb_jobs); - pthread_mutex_lock(&ctx->workers[i].work_lock); + if (job_start >= job_end) { + atomic_fetch_add(&ctx->nb_workers_finished, 1); + atomic_fetch_add(&ctx->nb_workers_queue_empty, 1); + continue; + } wb->job_func = func; wb->func_arg = func_arg; - wb->job_idx_start = i * max_batch_size; - wb->job_idx_end = MIN((i + 1) * max_batch_size, nb_jobs); - ctx->workers[i].have_work = 1; - atomic_store(&ctx->workers[i].work_completed, 0); + wb->job_idx_start = job_start; + wb->job_idx_end = job_end; + atomic_store(&ctx->workers[i].have_work, 1); atomic_store(&ctx->workers[i].queue_empty, 0); + atomic_store(&ctx->workers[i].being_robbed, 0); + + } - pthread_cond_signal(&ctx->workers[i].work_cond); + pthread_mutex_lock(&ctx->submit_lock); + pthread_cond_broadcast(&ctx->submit_cond); + pthread_mutex_unlock(&ctx->submit_lock); + tpi_timer_stop(&ctx->timer_submit); - pthread_mutex_unlock(&ctx->workers[i].work_lock); + tpi_timer_start(&ctx->timer_execute); + + for (int i = 0; i < (1 << 0); i++) { + if (atomic_load(&ctx->nb_workers_finished) >= ctx->nb_workers) + break; + if (!(i & ((1 << 8) - 1))) + sched_yield(); + pause(); } - pthread_mutex_lock(&ctx->progress_lock); - while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) - pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock); - pthread_mutex_unlock(&ctx->progress_lock); + if (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) { + pthread_mutex_lock(&ctx->progress_lock); + while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) + pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock); + pthread_mutex_unlock(&ctx->progress_lock); + } + + tpi_timer_stop(&ctx->timer_execute); pthread_mutex_unlock(&ctx->execute_lock); + tpi_timer_stop(&ctx->timer_all); + return 0; } + +unsigned int tp_get_nb_threads(TPContext *ctx) +{ + return ctx->nb_workers; +} diff --git a/threadpool.h b/threadpool.h index 17bf3f2..0f120ab 100644 --- a/threadpool.h +++ b/threadpool.h @@ -63,4 +63,9 @@ void tp_free(TPContext **ctx); int tp_execute(TPContext *ctx, unsigned int nb_jobs, TPExecuteCallback func, void *func_arg); +/** + * Get the number of threads in this thread pool. + */ +unsigned int tp_get_nb_threads(TPContext *ctx); + #endif /* TP_THREADPOOL_H */ diff --git a/worker.c b/worker.c index 306bc10..8686ec1 100644 --- a/worker.c +++ b/worker.c @@ -20,28 +20,58 @@ #include #include +#include +#include + #include "worker.h" +#define ENABLE_WORK_STEAL 1 + struct Worker { WorkSource *ws; unsigned int worker_idx; + int cpu_idx; pthread_t thread_id; pthread_mutex_t wb_lock; atomic_int job_idx_start; atomic_int job_idx_end; + atomic_int queue_empty; + + int batches; + int steal_races; }; +static void queue_empty(Worker *w) +{ + atomic_store(&w->queue_empty, 1); + tpi_worksource_queue_empty(w->ws, w->worker_idx); +} + int tpi_worker_steal_job(Worker *w) { int ret = -1; - pthread_mutex_lock(&w->wb_lock); +#if ENABLE_WORK_STEAL + int job_idx_start, job_idx_end; - if (w->job_idx_start < w->job_idx_end) - ret = --w->job_idx_end; + if (atomic_load(&w->queue_empty)) + return -1; + + //pthread_mutex_lock(&w->wb_lock); + + job_idx_end = atomic_fetch_add(&w->job_idx_end, -1); + job_idx_start = atomic_load(&w->job_idx_start); + + if (job_idx_start < job_idx_end) { + ret = job_idx_end - 1; + if (job_idx_start + 1 >= job_idx_end) + queue_empty(w); + } else + atomic_fetch_add(&w->job_idx_end, 1); - pthread_mutex_unlock(&w->wb_lock); + //pthread_mutex_unlock(&w->wb_lock); +#endif return ret; } @@ -50,15 +80,31 @@ static int worker_get_next_job(Worker *w) { int ret = -1; - pthread_mutex_lock(&w->wb_lock); +#if ENABLE_WORK_STEAL + while (!atomic_load(&w->queue_empty)) { + int job_idx_start, job_idx_end; - if (w->job_idx_start < w->job_idx_end) - ret = w->job_idx_start++; + job_idx_start = atomic_fetch_add(&w->job_idx_start, 1); + job_idx_end = atomic_load(&w->job_idx_end); + + if (job_idx_start < job_idx_end) { + if (job_idx_start + 1 >= job_idx_end) + queue_empty(w); + return job_idx_start; + } - pthread_mutex_unlock(&w->wb_lock); + // we got preempted by a thief, unroll our changes and try again + atomic_fetch_add(&w->job_idx_start, -1); + w->steal_races++; + } if (ret < 0) ret = tpi_worksource_steal_job(w->ws, w->worker_idx); +#else + if (w->job_idx_start < w->job_idx_end) + return w->job_idx_start++; + queue_empty(w); +#endif return ret; } @@ -68,6 +114,17 @@ static void *worker_thread(void *arg) Worker *w = arg; int ret; + if (w->cpu_idx >= 0) { + cpu_set_t cpumask; + + CPU_ZERO(&cpumask); + CPU_SET(w->cpu_idx, &cpumask); + + ret = sched_setaffinity(0, sizeof(cpumask), &cpumask); + if (ret != 0) + fprintf(stderr, "setaffinity(%d) failed\n", w->cpu_idx); + } + while (1) { WorkBatch wb; @@ -75,19 +132,22 @@ static void *worker_thread(void *arg) if (ret < 0 || wb.finish) break; - pthread_mutex_lock(&w->wb_lock); + w->batches++; + atomic_store(&w->job_idx_start, wb.job_idx_start); atomic_store(&w->job_idx_end, wb.job_idx_end); - pthread_mutex_unlock(&w->wb_lock); + atomic_store(&w->queue_empty, 0); while ((ret = worker_get_next_job(w)) >= 0) wb.job_func(wb.func_arg, ret, w->worker_idx); } + fprintf(stderr, "%d %d batches %d (%2.2f) steal races\n", w->worker_idx, w->batches, w->steal_races, (double)w->batches / w->steal_races ); + return NULL; } -int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws) +int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws, int cpu_idx) { Worker *w; int ret; @@ -104,9 +164,11 @@ int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws) w->worker_idx = worker_idx; w->ws = ws; + w->cpu_idx = cpu_idx; atomic_store(&w->job_idx_start, -1); atomic_store(&w->job_idx_end, -1); + atomic_store(&w->queue_empty, 1); ret = pthread_create(&w->thread_id, NULL, worker_thread, w); if (ret != 0) { diff --git a/worker.h b/worker.h index 1fa87d8..33a760c 100644 --- a/worker.h +++ b/worker.h @@ -50,6 +50,8 @@ int tpi_worksource_wait_jobs(WorkSource *ws, unsigned int worker_idx, */ int tpi_worksource_steal_job(WorkSource *ws, unsigned int worker_idx); +void tpi_worksource_queue_empty(WorkSource *ws, unsigned int worker_idx); + typedef struct Worker Worker; /** @@ -57,7 +59,7 @@ typedef struct Worker Worker; * * @return 0 if the worker has been launched, a negative error code otherwise */ -int tpi_worker_spawn(Worker **w, unsigned int worker_idx, WorkSource *ws); +int tpi_worker_spawn(Worker **w, unsigned int worker_idx, WorkSource *ws, int cpu_idx); void tpi_worker_destroy(Worker **w); /** -- cgit v1.2.3