diff options
Diffstat (limited to 'threadpool.c')
-rw-r--r-- | threadpool.c | 252 |
1 files changed, 206 insertions, 46 deletions
diff --git a/threadpool.c b/threadpool.c index 50d8cb7..049f406 100644 --- a/threadpool.c +++ b/threadpool.c @@ -23,39 +23,66 @@ #include <sys/time.h> #include "threadpool.h" +#include "timer.h" #include "worker.h" +#include <sched.h> #include <stdio.h> +#define MAX(x, y) ((x) < (y) ? (y) : (x)) #define MIN(x, y) ((x) < (y) ? (x) : (y)) struct TPContext { struct { Worker *w; WorkBatch wb; - int have_work; + atomic_int have_work; pthread_mutex_t work_lock; pthread_cond_t work_cond; - atomic_int work_completed; atomic_int queue_empty; + atomic_int being_robbed; + + unsigned int steal_iterations; + unsigned int steals; unsigned int rand_seed; } *workers; unsigned int nb_workers; + pthread_mutex_t init_lock; + pthread_cond_t init_cond; + atomic_int init_done; atomic_int finish; pthread_mutex_t execute_lock; + pthread_mutex_t submit_lock; + pthread_cond_t submit_cond; + pthread_mutex_t progress_lock; pthread_cond_t progress_cond; atomic_int nb_workers_finished; atomic_int nb_workers_queue_empty; + + Timer timer_execute; + Timer timer_submit; + Timer timer_all; }; +void tpi_worksource_queue_empty(TPContext *ctx, unsigned int worker_idx) +{ + atomic_store(&ctx->workers[worker_idx].queue_empty, 1); + atomic_fetch_add(&ctx->nb_workers_queue_empty, 1); +} + +void pause(void) +{ + __asm__ volatile("rep; nop" ::: "memory"); +} + void tp_free(TPContext **pctx) { TPContext *ctx = *pctx; @@ -63,21 +90,35 @@ void tp_free(TPContext **pctx) if (!ctx) return; + fprintf(stderr, "%ld executes: all %g us; submit %g us; execute %g us\n", + ctx->timer_all.nb_runs, ctx->timer_all.time_nsec / 1e3 / ctx->timer_all.nb_runs, + ctx->timer_submit.time_nsec / 1e3 / ctx->timer_submit.nb_runs, + ctx->timer_execute.time_nsec / 1e3 / ctx->timer_execute.nb_runs); + atomic_store(&ctx->finish, 1); + for (unsigned int i = 0; i < ctx->nb_workers; i++) + atomic_store(&ctx->workers[i].have_work, 1); - for (unsigned int i = 0; i < ctx->nb_workers; i++) { - pthread_mutex_lock(&ctx->workers[i].work_lock); - pthread_cond_signal(&ctx->workers[i].work_cond); - pthread_mutex_unlock(&ctx->workers[i].work_lock); + pthread_mutex_lock(&ctx->submit_lock); + pthread_cond_broadcast(&ctx->submit_cond); + pthread_mutex_unlock(&ctx->submit_lock); + for (unsigned int i = 0; i < ctx->nb_workers; i++) { + fprintf(stderr, "%d %d steals %g iterations/steal\n", i, ctx->workers[i].steals, (double)ctx->workers[i].steal_iterations / ctx->workers[i].steals); tpi_worker_destroy(&ctx->workers[i].w); - pthread_mutex_destroy(&ctx->workers[i].work_lock); - pthread_cond_destroy(&ctx->workers[i].work_cond); } free(ctx->workers); ctx->nb_workers = 0; pthread_mutex_destroy(&ctx->execute_lock); + pthread_mutex_destroy(&ctx->init_lock); + pthread_cond_destroy(&ctx->init_cond); + + pthread_mutex_destroy(&ctx->progress_lock); + pthread_cond_destroy(&ctx->progress_cond); + + pthread_mutex_destroy(&ctx->submit_lock); + pthread_cond_destroy(&ctx->submit_cond); free(ctx); *pctx = NULL; @@ -86,11 +127,24 @@ void tp_free(TPContext **pctx) int tp_init(TPContext **pctx, unsigned int nb_threads) { TPContext *ctx = NULL; + cpu_set_t cpus; + int *cpu_indices; int ret; - if (!nb_threads) { - ret = -EINVAL; - goto fail; + if (!nb_threads) + return -EINVAL; + + ret = sched_getaffinity(0, sizeof(cpus), &cpus); + if (ret != 0) + return errno; + + cpu_indices = malloc(CPU_COUNT(&cpus) * sizeof(*cpu_indices)); + if (!cpu_indices) + return -ENOMEM; + + for (int cur = 0, i = 0; i < CPU_SETSIZE; i++) { + if (CPU_ISSET(i, &cpus)) + cpu_indices[cur++] = i; } ctx = calloc(1, sizeof(*ctx)); @@ -100,7 +154,21 @@ int tp_init(TPContext **pctx, unsigned int nb_threads) } pthread_mutex_init(&ctx->execute_lock, NULL); - atomic_store(&ctx->finish, 0); + atomic_store(&ctx->finish, 0); + + pthread_mutex_init(&ctx->init_lock, NULL); + pthread_cond_init(&ctx->init_cond, NULL); + atomic_store(&ctx->init_done, 0); + + pthread_mutex_init(&ctx->progress_lock, NULL); + pthread_cond_init(&ctx->progress_cond, NULL); + + pthread_mutex_init(&ctx->submit_lock, NULL); + pthread_cond_init(&ctx->submit_cond, NULL); + + tpi_timer_init(&ctx->timer_all); + tpi_timer_init(&ctx->timer_execute); + tpi_timer_init(&ctx->timer_submit); ctx->workers = calloc(nb_threads, sizeof(*ctx->workers)); if (!ctx->workers) { @@ -108,25 +176,44 @@ int tp_init(TPContext **pctx, unsigned int nb_threads) goto fail; } + /* spawn the workers */ for (unsigned int i = 0; i < nb_threads; i++) { struct timeval tv; - pthread_mutex_init(&ctx->workers[i].work_lock, NULL); - pthread_cond_init(&ctx->workers[i].work_cond, NULL); - gettimeofday(&tv, NULL); ctx->workers[i].rand_seed = tv.tv_usec; - ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx); + ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx, cpu_indices[i % CPU_COUNT(&cpus)]); if (ret < 0) - goto fail; + goto fail_spawn; ctx->nb_workers++; } + /* signal to the workers that we are done initializing */ + pthread_mutex_lock(&ctx->init_lock); + atomic_store(&ctx->init_done, 1); + pthread_cond_broadcast(&ctx->init_cond); + pthread_mutex_unlock(&ctx->init_lock); + + /* wait until all the workers are settled */ + pthread_mutex_lock(&ctx->progress_lock); + while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) + pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock); + pthread_mutex_unlock(&ctx->progress_lock); + *pctx = ctx; + free(cpu_indices); + return 0; +fail_spawn: + pthread_mutex_lock(&ctx->init_lock); + atomic_store(&ctx->init_done, 1); + pthread_cond_broadcast(&ctx->init_cond); + pthread_mutex_unlock(&ctx->init_lock); + fail: + free(cpu_indices); tp_free(&ctx); *pctx = NULL; return ret; @@ -135,22 +222,40 @@ fail: int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx, WorkBatch *wb) { - pthread_mutex_lock(&ctx->progress_lock); - atomic_store(&ctx->workers[worker_idx].work_completed, 1); - atomic_fetch_add(&ctx->nb_workers_finished, 1); - pthread_cond_signal(&ctx->progress_cond); - pthread_mutex_unlock(&ctx->progress_lock); + /* if the context is still spawning threads, we wait until it is done */ + if (!atomic_load(&ctx->init_done)) { + pthread_mutex_lock(&ctx->init_lock); + while (!atomic_load(&ctx->init_done)) + pthread_cond_wait(&ctx->init_cond, &ctx->init_lock); + pthread_mutex_unlock(&ctx->init_lock); + } - pthread_mutex_lock(&ctx->workers[worker_idx].work_lock); + /* signal to the context that we are done */ + if (atomic_fetch_add(&ctx->nb_workers_finished, 1) + 1 == ctx->nb_workers) { + pthread_mutex_lock(&ctx->progress_lock); + pthread_cond_signal(&ctx->progress_cond); + pthread_mutex_unlock(&ctx->progress_lock); + } - while (!atomic_load(&ctx->finish) && !ctx->workers[worker_idx].have_work) - pthread_cond_wait(&ctx->workers[worker_idx].work_cond, &ctx->workers[worker_idx].work_lock); + for (int i = 0; i < (1 << 0); i++) { + if (atomic_load(&ctx->workers[worker_idx].have_work)) + break; + if (!(i & ((1 << 8) - 1))) + sched_yield(); + pause(); + } + if (!atomic_load(&ctx->workers[worker_idx].have_work)) { + pthread_mutex_lock(&ctx->submit_lock); + + while (!atomic_load(&ctx->workers[worker_idx].have_work)) + pthread_cond_wait(&ctx->submit_cond, &ctx->submit_lock); + + pthread_mutex_unlock(&ctx->submit_lock); + } memcpy(wb, &ctx->workers[worker_idx].wb, sizeof(*wb)); wb->finish = atomic_load(&ctx->finish); - ctx->workers[worker_idx].have_work = 0; - - pthread_mutex_unlock(&ctx->workers[worker_idx].work_lock); + atomic_store(&ctx->workers[worker_idx].have_work, 0); return 0; } @@ -158,58 +263,113 @@ int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx, int tpi_worksource_steal_job(TPContext *ctx, unsigned int worker_idx) { int ret = -1; + unsigned int w_victim_idx, iterations = 0; - atomic_store(&ctx->workers[worker_idx].queue_empty, 1); - atomic_fetch_add(&ctx->nb_workers_queue_empty, 1); - + w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers; while (atomic_load(&ctx->nb_workers_queue_empty) < ctx->nb_workers) { - unsigned int w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers; + int expected = 0; + + iterations++; - if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty)) + if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty)) { + w_victim_idx = (w_victim_idx + 1) % ctx->nb_workers; continue; + } + + if (!atomic_compare_exchange_strong(&ctx->workers[w_victim_idx].being_robbed, &expected, 1)) { + w_victim_idx = (w_victim_idx + 1) % ctx->nb_workers; + continue; + } ret = tpi_worker_steal_job(ctx->workers[w_victim_idx].w); + atomic_store(&ctx->workers[w_victim_idx].being_robbed, 0); if (ret >= 0) break; } + ctx->workers[worker_idx].steals++; + ctx->workers[worker_idx].steal_iterations += iterations; + return ret; } +static int execute_serial(unsigned int nb_jobs, TPExecuteCallback func, + void *func_arg) +{ + for (unsigned int i = 0; i < nb_jobs; i++) + func(func_arg, i, 0); + + return 0; +} + int tp_execute(TPContext *ctx, unsigned int nb_jobs, TPExecuteCallback func, void *func_arg) { - unsigned int max_batch_size = (nb_jobs + ctx->nb_workers - 1) / ctx->nb_workers; + unsigned int max_batch_size = MAX(1, nb_jobs / ctx->nb_workers); + + if (nb_jobs == 1 || ctx->nb_workers == 1) + return execute_serial(nb_jobs, func, func_arg); + tpi_timer_start(&ctx->timer_all); pthread_mutex_lock(&ctx->execute_lock); - atomic_store(&ctx->nb_workers_finished, 0); + atomic_store(&ctx->nb_workers_finished, 0); atomic_store(&ctx->nb_workers_queue_empty, 0); + tpi_timer_start(&ctx->timer_submit); for (unsigned int i = 0; i < ctx->nb_workers; i++) { WorkBatch *wb = &ctx->workers[i].wb; + unsigned int job_start = i * max_batch_size; + unsigned int job_end = (i == ctx->nb_workers - 1) ? nb_jobs : MIN((i + 1) * max_batch_size, nb_jobs); - pthread_mutex_lock(&ctx->workers[i].work_lock); + if (job_start >= job_end) { + atomic_fetch_add(&ctx->nb_workers_finished, 1); + atomic_fetch_add(&ctx->nb_workers_queue_empty, 1); + continue; + } wb->job_func = func; wb->func_arg = func_arg; - wb->job_idx_start = i * max_batch_size; - wb->job_idx_end = MIN((i + 1) * max_batch_size, nb_jobs); - ctx->workers[i].have_work = 1; - atomic_store(&ctx->workers[i].work_completed, 0); + wb->job_idx_start = job_start; + wb->job_idx_end = job_end; + atomic_store(&ctx->workers[i].have_work, 1); atomic_store(&ctx->workers[i].queue_empty, 0); + atomic_store(&ctx->workers[i].being_robbed, 0); + + } - pthread_cond_signal(&ctx->workers[i].work_cond); + pthread_mutex_lock(&ctx->submit_lock); + pthread_cond_broadcast(&ctx->submit_cond); + pthread_mutex_unlock(&ctx->submit_lock); + tpi_timer_stop(&ctx->timer_submit); - pthread_mutex_unlock(&ctx->workers[i].work_lock); + tpi_timer_start(&ctx->timer_execute); + + for (int i = 0; i < (1 << 0); i++) { + if (atomic_load(&ctx->nb_workers_finished) >= ctx->nb_workers) + break; + if (!(i & ((1 << 8) - 1))) + sched_yield(); + pause(); } - pthread_mutex_lock(&ctx->progress_lock); - while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) - pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock); - pthread_mutex_unlock(&ctx->progress_lock); + if (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) { + pthread_mutex_lock(&ctx->progress_lock); + while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) + pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock); + pthread_mutex_unlock(&ctx->progress_lock); + } + + tpi_timer_stop(&ctx->timer_execute); pthread_mutex_unlock(&ctx->execute_lock); + tpi_timer_stop(&ctx->timer_all); + return 0; } + +unsigned int tp_get_nb_threads(TPContext *ctx) +{ + return ctx->nb_workers; +} |