summaryrefslogtreecommitdiff
path: root/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'threadpool.c')
-rw-r--r--threadpool.c252
1 files changed, 206 insertions, 46 deletions
diff --git a/threadpool.c b/threadpool.c
index 50d8cb7..049f406 100644
--- a/threadpool.c
+++ b/threadpool.c
@@ -23,39 +23,66 @@
#include <sys/time.h>
#include "threadpool.h"
+#include "timer.h"
#include "worker.h"
+#include <sched.h>
#include <stdio.h>
+#define MAX(x, y) ((x) < (y) ? (y) : (x))
#define MIN(x, y) ((x) < (y) ? (x) : (y))
struct TPContext {
struct {
Worker *w;
WorkBatch wb;
- int have_work;
+ atomic_int have_work;
pthread_mutex_t work_lock;
pthread_cond_t work_cond;
- atomic_int work_completed;
atomic_int queue_empty;
+ atomic_int being_robbed;
+
+ unsigned int steal_iterations;
+ unsigned int steals;
unsigned int rand_seed;
} *workers;
unsigned int nb_workers;
+ pthread_mutex_t init_lock;
+ pthread_cond_t init_cond;
+ atomic_int init_done;
atomic_int finish;
pthread_mutex_t execute_lock;
+ pthread_mutex_t submit_lock;
+ pthread_cond_t submit_cond;
+
pthread_mutex_t progress_lock;
pthread_cond_t progress_cond;
atomic_int nb_workers_finished;
atomic_int nb_workers_queue_empty;
+
+ Timer timer_execute;
+ Timer timer_submit;
+ Timer timer_all;
};
+void tpi_worksource_queue_empty(TPContext *ctx, unsigned int worker_idx)
+{
+ atomic_store(&ctx->workers[worker_idx].queue_empty, 1);
+ atomic_fetch_add(&ctx->nb_workers_queue_empty, 1);
+}
+
+void pause(void)
+{
+ __asm__ volatile("rep; nop" ::: "memory");
+}
+
void tp_free(TPContext **pctx)
{
TPContext *ctx = *pctx;
@@ -63,21 +90,35 @@ void tp_free(TPContext **pctx)
if (!ctx)
return;
+ fprintf(stderr, "%ld executes: all %g us; submit %g us; execute %g us\n",
+ ctx->timer_all.nb_runs, ctx->timer_all.time_nsec / 1e3 / ctx->timer_all.nb_runs,
+ ctx->timer_submit.time_nsec / 1e3 / ctx->timer_submit.nb_runs,
+ ctx->timer_execute.time_nsec / 1e3 / ctx->timer_execute.nb_runs);
+
atomic_store(&ctx->finish, 1);
+ for (unsigned int i = 0; i < ctx->nb_workers; i++)
+ atomic_store(&ctx->workers[i].have_work, 1);
- for (unsigned int i = 0; i < ctx->nb_workers; i++) {
- pthread_mutex_lock(&ctx->workers[i].work_lock);
- pthread_cond_signal(&ctx->workers[i].work_cond);
- pthread_mutex_unlock(&ctx->workers[i].work_lock);
+ pthread_mutex_lock(&ctx->submit_lock);
+ pthread_cond_broadcast(&ctx->submit_cond);
+ pthread_mutex_unlock(&ctx->submit_lock);
+ for (unsigned int i = 0; i < ctx->nb_workers; i++) {
+ fprintf(stderr, "%d %d steals %g iterations/steal\n", i, ctx->workers[i].steals, (double)ctx->workers[i].steal_iterations / ctx->workers[i].steals);
tpi_worker_destroy(&ctx->workers[i].w);
- pthread_mutex_destroy(&ctx->workers[i].work_lock);
- pthread_cond_destroy(&ctx->workers[i].work_cond);
}
free(ctx->workers);
ctx->nb_workers = 0;
pthread_mutex_destroy(&ctx->execute_lock);
+ pthread_mutex_destroy(&ctx->init_lock);
+ pthread_cond_destroy(&ctx->init_cond);
+
+ pthread_mutex_destroy(&ctx->progress_lock);
+ pthread_cond_destroy(&ctx->progress_cond);
+
+ pthread_mutex_destroy(&ctx->submit_lock);
+ pthread_cond_destroy(&ctx->submit_cond);
free(ctx);
*pctx = NULL;
@@ -86,11 +127,24 @@ void tp_free(TPContext **pctx)
int tp_init(TPContext **pctx, unsigned int nb_threads)
{
TPContext *ctx = NULL;
+ cpu_set_t cpus;
+ int *cpu_indices;
int ret;
- if (!nb_threads) {
- ret = -EINVAL;
- goto fail;
+ if (!nb_threads)
+ return -EINVAL;
+
+ ret = sched_getaffinity(0, sizeof(cpus), &cpus);
+ if (ret != 0)
+ return errno;
+
+ cpu_indices = malloc(CPU_COUNT(&cpus) * sizeof(*cpu_indices));
+ if (!cpu_indices)
+ return -ENOMEM;
+
+ for (int cur = 0, i = 0; i < CPU_SETSIZE; i++) {
+ if (CPU_ISSET(i, &cpus))
+ cpu_indices[cur++] = i;
}
ctx = calloc(1, sizeof(*ctx));
@@ -100,7 +154,21 @@ int tp_init(TPContext **pctx, unsigned int nb_threads)
}
pthread_mutex_init(&ctx->execute_lock, NULL);
- atomic_store(&ctx->finish, 0);
+ atomic_store(&ctx->finish, 0);
+
+ pthread_mutex_init(&ctx->init_lock, NULL);
+ pthread_cond_init(&ctx->init_cond, NULL);
+ atomic_store(&ctx->init_done, 0);
+
+ pthread_mutex_init(&ctx->progress_lock, NULL);
+ pthread_cond_init(&ctx->progress_cond, NULL);
+
+ pthread_mutex_init(&ctx->submit_lock, NULL);
+ pthread_cond_init(&ctx->submit_cond, NULL);
+
+ tpi_timer_init(&ctx->timer_all);
+ tpi_timer_init(&ctx->timer_execute);
+ tpi_timer_init(&ctx->timer_submit);
ctx->workers = calloc(nb_threads, sizeof(*ctx->workers));
if (!ctx->workers) {
@@ -108,25 +176,44 @@ int tp_init(TPContext **pctx, unsigned int nb_threads)
goto fail;
}
+ /* spawn the workers */
for (unsigned int i = 0; i < nb_threads; i++) {
struct timeval tv;
- pthread_mutex_init(&ctx->workers[i].work_lock, NULL);
- pthread_cond_init(&ctx->workers[i].work_cond, NULL);
-
gettimeofday(&tv, NULL);
ctx->workers[i].rand_seed = tv.tv_usec;
- ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx);
+ ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx, cpu_indices[i % CPU_COUNT(&cpus)]);
if (ret < 0)
- goto fail;
+ goto fail_spawn;
ctx->nb_workers++;
}
+ /* signal to the workers that we are done initializing */
+ pthread_mutex_lock(&ctx->init_lock);
+ atomic_store(&ctx->init_done, 1);
+ pthread_cond_broadcast(&ctx->init_cond);
+ pthread_mutex_unlock(&ctx->init_lock);
+
+ /* wait until all the workers are settled */
+ pthread_mutex_lock(&ctx->progress_lock);
+ while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers)
+ pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock);
+ pthread_mutex_unlock(&ctx->progress_lock);
+
*pctx = ctx;
+ free(cpu_indices);
+
return 0;
+fail_spawn:
+ pthread_mutex_lock(&ctx->init_lock);
+ atomic_store(&ctx->init_done, 1);
+ pthread_cond_broadcast(&ctx->init_cond);
+ pthread_mutex_unlock(&ctx->init_lock);
+
fail:
+ free(cpu_indices);
tp_free(&ctx);
*pctx = NULL;
return ret;
@@ -135,22 +222,40 @@ fail:
int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx,
WorkBatch *wb)
{
- pthread_mutex_lock(&ctx->progress_lock);
- atomic_store(&ctx->workers[worker_idx].work_completed, 1);
- atomic_fetch_add(&ctx->nb_workers_finished, 1);
- pthread_cond_signal(&ctx->progress_cond);
- pthread_mutex_unlock(&ctx->progress_lock);
+ /* if the context is still spawning threads, we wait until it is done */
+ if (!atomic_load(&ctx->init_done)) {
+ pthread_mutex_lock(&ctx->init_lock);
+ while (!atomic_load(&ctx->init_done))
+ pthread_cond_wait(&ctx->init_cond, &ctx->init_lock);
+ pthread_mutex_unlock(&ctx->init_lock);
+ }
- pthread_mutex_lock(&ctx->workers[worker_idx].work_lock);
+ /* signal to the context that we are done */
+ if (atomic_fetch_add(&ctx->nb_workers_finished, 1) + 1 == ctx->nb_workers) {
+ pthread_mutex_lock(&ctx->progress_lock);
+ pthread_cond_signal(&ctx->progress_cond);
+ pthread_mutex_unlock(&ctx->progress_lock);
+ }
- while (!atomic_load(&ctx->finish) && !ctx->workers[worker_idx].have_work)
- pthread_cond_wait(&ctx->workers[worker_idx].work_cond, &ctx->workers[worker_idx].work_lock);
+ for (int i = 0; i < (1 << 0); i++) {
+ if (atomic_load(&ctx->workers[worker_idx].have_work))
+ break;
+ if (!(i & ((1 << 8) - 1)))
+ sched_yield();
+ pause();
+ }
+ if (!atomic_load(&ctx->workers[worker_idx].have_work)) {
+ pthread_mutex_lock(&ctx->submit_lock);
+
+ while (!atomic_load(&ctx->workers[worker_idx].have_work))
+ pthread_cond_wait(&ctx->submit_cond, &ctx->submit_lock);
+
+ pthread_mutex_unlock(&ctx->submit_lock);
+ }
memcpy(wb, &ctx->workers[worker_idx].wb, sizeof(*wb));
wb->finish = atomic_load(&ctx->finish);
- ctx->workers[worker_idx].have_work = 0;
-
- pthread_mutex_unlock(&ctx->workers[worker_idx].work_lock);
+ atomic_store(&ctx->workers[worker_idx].have_work, 0);
return 0;
}
@@ -158,58 +263,113 @@ int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx,
int tpi_worksource_steal_job(TPContext *ctx, unsigned int worker_idx)
{
int ret = -1;
+ unsigned int w_victim_idx, iterations = 0;
- atomic_store(&ctx->workers[worker_idx].queue_empty, 1);
- atomic_fetch_add(&ctx->nb_workers_queue_empty, 1);
-
+ w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers;
while (atomic_load(&ctx->nb_workers_queue_empty) < ctx->nb_workers) {
- unsigned int w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers;
+ int expected = 0;
+
+ iterations++;
- if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty))
+ if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty)) {
+ w_victim_idx = (w_victim_idx + 1) % ctx->nb_workers;
continue;
+ }
+
+ if (!atomic_compare_exchange_strong(&ctx->workers[w_victim_idx].being_robbed, &expected, 1)) {
+ w_victim_idx = (w_victim_idx + 1) % ctx->nb_workers;
+ continue;
+ }
ret = tpi_worker_steal_job(ctx->workers[w_victim_idx].w);
+ atomic_store(&ctx->workers[w_victim_idx].being_robbed, 0);
if (ret >= 0)
break;
}
+ ctx->workers[worker_idx].steals++;
+ ctx->workers[worker_idx].steal_iterations += iterations;
+
return ret;
}
+static int execute_serial(unsigned int nb_jobs, TPExecuteCallback func,
+ void *func_arg)
+{
+ for (unsigned int i = 0; i < nb_jobs; i++)
+ func(func_arg, i, 0);
+
+ return 0;
+}
+
int tp_execute(TPContext *ctx, unsigned int nb_jobs,
TPExecuteCallback func, void *func_arg)
{
- unsigned int max_batch_size = (nb_jobs + ctx->nb_workers - 1) / ctx->nb_workers;
+ unsigned int max_batch_size = MAX(1, nb_jobs / ctx->nb_workers);
+
+ if (nb_jobs == 1 || ctx->nb_workers == 1)
+ return execute_serial(nb_jobs, func, func_arg);
+ tpi_timer_start(&ctx->timer_all);
pthread_mutex_lock(&ctx->execute_lock);
- atomic_store(&ctx->nb_workers_finished, 0);
+ atomic_store(&ctx->nb_workers_finished, 0);
atomic_store(&ctx->nb_workers_queue_empty, 0);
+ tpi_timer_start(&ctx->timer_submit);
for (unsigned int i = 0; i < ctx->nb_workers; i++) {
WorkBatch *wb = &ctx->workers[i].wb;
+ unsigned int job_start = i * max_batch_size;
+ unsigned int job_end = (i == ctx->nb_workers - 1) ? nb_jobs : MIN((i + 1) * max_batch_size, nb_jobs);
- pthread_mutex_lock(&ctx->workers[i].work_lock);
+ if (job_start >= job_end) {
+ atomic_fetch_add(&ctx->nb_workers_finished, 1);
+ atomic_fetch_add(&ctx->nb_workers_queue_empty, 1);
+ continue;
+ }
wb->job_func = func;
wb->func_arg = func_arg;
- wb->job_idx_start = i * max_batch_size;
- wb->job_idx_end = MIN((i + 1) * max_batch_size, nb_jobs);
- ctx->workers[i].have_work = 1;
- atomic_store(&ctx->workers[i].work_completed, 0);
+ wb->job_idx_start = job_start;
+ wb->job_idx_end = job_end;
+ atomic_store(&ctx->workers[i].have_work, 1);
atomic_store(&ctx->workers[i].queue_empty, 0);
+ atomic_store(&ctx->workers[i].being_robbed, 0);
+
+ }
- pthread_cond_signal(&ctx->workers[i].work_cond);
+ pthread_mutex_lock(&ctx->submit_lock);
+ pthread_cond_broadcast(&ctx->submit_cond);
+ pthread_mutex_unlock(&ctx->submit_lock);
+ tpi_timer_stop(&ctx->timer_submit);
- pthread_mutex_unlock(&ctx->workers[i].work_lock);
+ tpi_timer_start(&ctx->timer_execute);
+
+ for (int i = 0; i < (1 << 0); i++) {
+ if (atomic_load(&ctx->nb_workers_finished) >= ctx->nb_workers)
+ break;
+ if (!(i & ((1 << 8) - 1)))
+ sched_yield();
+ pause();
}
- pthread_mutex_lock(&ctx->progress_lock);
- while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers)
- pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock);
- pthread_mutex_unlock(&ctx->progress_lock);
+ if (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) {
+ pthread_mutex_lock(&ctx->progress_lock);
+ while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers)
+ pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock);
+ pthread_mutex_unlock(&ctx->progress_lock);
+ }
+
+ tpi_timer_stop(&ctx->timer_execute);
pthread_mutex_unlock(&ctx->execute_lock);
+ tpi_timer_stop(&ctx->timer_all);
+
return 0;
}
+
+unsigned int tp_get_nb_threads(TPContext *ctx)
+{
+ return ctx->nb_workers;
+}