diff options
Diffstat (limited to 'worker.c')
-rw-r--r-- | worker.c | 84 |
1 files changed, 73 insertions, 11 deletions
@@ -20,28 +20,58 @@ #include <stdatomic.h> #include <stdlib.h> +#include <stdio.h> +#include <sched.h> + #include "worker.h" +#define ENABLE_WORK_STEAL 1 + struct Worker { WorkSource *ws; unsigned int worker_idx; + int cpu_idx; pthread_t thread_id; pthread_mutex_t wb_lock; atomic_int job_idx_start; atomic_int job_idx_end; + atomic_int queue_empty; + + int batches; + int steal_races; }; +static void queue_empty(Worker *w) +{ + atomic_store(&w->queue_empty, 1); + tpi_worksource_queue_empty(w->ws, w->worker_idx); +} + int tpi_worker_steal_job(Worker *w) { int ret = -1; - pthread_mutex_lock(&w->wb_lock); +#if ENABLE_WORK_STEAL + int job_idx_start, job_idx_end; - if (w->job_idx_start < w->job_idx_end) - ret = --w->job_idx_end; + if (atomic_load(&w->queue_empty)) + return -1; + + //pthread_mutex_lock(&w->wb_lock); + + job_idx_end = atomic_fetch_add(&w->job_idx_end, -1); + job_idx_start = atomic_load(&w->job_idx_start); + + if (job_idx_start < job_idx_end) { + ret = job_idx_end - 1; + if (job_idx_start + 1 >= job_idx_end) + queue_empty(w); + } else + atomic_fetch_add(&w->job_idx_end, 1); - pthread_mutex_unlock(&w->wb_lock); + //pthread_mutex_unlock(&w->wb_lock); +#endif return ret; } @@ -50,15 +80,31 @@ static int worker_get_next_job(Worker *w) { int ret = -1; - pthread_mutex_lock(&w->wb_lock); +#if ENABLE_WORK_STEAL + while (!atomic_load(&w->queue_empty)) { + int job_idx_start, job_idx_end; - if (w->job_idx_start < w->job_idx_end) - ret = w->job_idx_start++; + job_idx_start = atomic_fetch_add(&w->job_idx_start, 1); + job_idx_end = atomic_load(&w->job_idx_end); + + if (job_idx_start < job_idx_end) { + if (job_idx_start + 1 >= job_idx_end) + queue_empty(w); + return job_idx_start; + } - pthread_mutex_unlock(&w->wb_lock); + // we got preempted by a thief, unroll our changes and try again + atomic_fetch_add(&w->job_idx_start, -1); + w->steal_races++; + } if (ret < 0) ret = tpi_worksource_steal_job(w->ws, w->worker_idx); +#else + if (w->job_idx_start < w->job_idx_end) + return w->job_idx_start++; + queue_empty(w); +#endif return ret; } @@ -68,6 +114,17 @@ static void *worker_thread(void *arg) Worker *w = arg; int ret; + if (w->cpu_idx >= 0) { + cpu_set_t cpumask; + + CPU_ZERO(&cpumask); + CPU_SET(w->cpu_idx, &cpumask); + + ret = sched_setaffinity(0, sizeof(cpumask), &cpumask); + if (ret != 0) + fprintf(stderr, "setaffinity(%d) failed\n", w->cpu_idx); + } + while (1) { WorkBatch wb; @@ -75,19 +132,22 @@ static void *worker_thread(void *arg) if (ret < 0 || wb.finish) break; - pthread_mutex_lock(&w->wb_lock); + w->batches++; + atomic_store(&w->job_idx_start, wb.job_idx_start); atomic_store(&w->job_idx_end, wb.job_idx_end); - pthread_mutex_unlock(&w->wb_lock); + atomic_store(&w->queue_empty, 0); while ((ret = worker_get_next_job(w)) >= 0) wb.job_func(wb.func_arg, ret, w->worker_idx); } + fprintf(stderr, "%d %d batches %d (%2.2f) steal races\n", w->worker_idx, w->batches, w->steal_races, (double)w->batches / w->steal_races ); + return NULL; } -int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws) +int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws, int cpu_idx) { Worker *w; int ret; @@ -104,9 +164,11 @@ int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws) w->worker_idx = worker_idx; w->ws = ws; + w->cpu_idx = cpu_idx; atomic_store(&w->job_idx_start, -1); atomic_store(&w->job_idx_end, -1); + atomic_store(&w->queue_empty, 1); ret = pthread_create(&w->thread_id, NULL, worker_thread, w); if (ret != 0) { |