summaryrefslogtreecommitdiff
path: root/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'worker.c')
-rw-r--r--worker.c84
1 files changed, 73 insertions, 11 deletions
diff --git a/worker.c b/worker.c
index 306bc10..8686ec1 100644
--- a/worker.c
+++ b/worker.c
@@ -20,28 +20,58 @@
#include <stdatomic.h>
#include <stdlib.h>
+#include <stdio.h>
+#include <sched.h>
+
#include "worker.h"
+#define ENABLE_WORK_STEAL 1
+
struct Worker {
WorkSource *ws;
unsigned int worker_idx;
+ int cpu_idx;
pthread_t thread_id;
pthread_mutex_t wb_lock;
atomic_int job_idx_start;
atomic_int job_idx_end;
+ atomic_int queue_empty;
+
+ int batches;
+ int steal_races;
};
+static void queue_empty(Worker *w)
+{
+ atomic_store(&w->queue_empty, 1);
+ tpi_worksource_queue_empty(w->ws, w->worker_idx);
+}
+
int tpi_worker_steal_job(Worker *w)
{
int ret = -1;
- pthread_mutex_lock(&w->wb_lock);
+#if ENABLE_WORK_STEAL
+ int job_idx_start, job_idx_end;
- if (w->job_idx_start < w->job_idx_end)
- ret = --w->job_idx_end;
+ if (atomic_load(&w->queue_empty))
+ return -1;
+
+ //pthread_mutex_lock(&w->wb_lock);
+
+ job_idx_end = atomic_fetch_add(&w->job_idx_end, -1);
+ job_idx_start = atomic_load(&w->job_idx_start);
+
+ if (job_idx_start < job_idx_end) {
+ ret = job_idx_end - 1;
+ if (job_idx_start + 1 >= job_idx_end)
+ queue_empty(w);
+ } else
+ atomic_fetch_add(&w->job_idx_end, 1);
- pthread_mutex_unlock(&w->wb_lock);
+ //pthread_mutex_unlock(&w->wb_lock);
+#endif
return ret;
}
@@ -50,15 +80,31 @@ static int worker_get_next_job(Worker *w)
{
int ret = -1;
- pthread_mutex_lock(&w->wb_lock);
+#if ENABLE_WORK_STEAL
+ while (!atomic_load(&w->queue_empty)) {
+ int job_idx_start, job_idx_end;
- if (w->job_idx_start < w->job_idx_end)
- ret = w->job_idx_start++;
+ job_idx_start = atomic_fetch_add(&w->job_idx_start, 1);
+ job_idx_end = atomic_load(&w->job_idx_end);
+
+ if (job_idx_start < job_idx_end) {
+ if (job_idx_start + 1 >= job_idx_end)
+ queue_empty(w);
+ return job_idx_start;
+ }
- pthread_mutex_unlock(&w->wb_lock);
+ // we got preempted by a thief, unroll our changes and try again
+ atomic_fetch_add(&w->job_idx_start, -1);
+ w->steal_races++;
+ }
if (ret < 0)
ret = tpi_worksource_steal_job(w->ws, w->worker_idx);
+#else
+ if (w->job_idx_start < w->job_idx_end)
+ return w->job_idx_start++;
+ queue_empty(w);
+#endif
return ret;
}
@@ -68,6 +114,17 @@ static void *worker_thread(void *arg)
Worker *w = arg;
int ret;
+ if (w->cpu_idx >= 0) {
+ cpu_set_t cpumask;
+
+ CPU_ZERO(&cpumask);
+ CPU_SET(w->cpu_idx, &cpumask);
+
+ ret = sched_setaffinity(0, sizeof(cpumask), &cpumask);
+ if (ret != 0)
+ fprintf(stderr, "setaffinity(%d) failed\n", w->cpu_idx);
+ }
+
while (1) {
WorkBatch wb;
@@ -75,19 +132,22 @@ static void *worker_thread(void *arg)
if (ret < 0 || wb.finish)
break;
- pthread_mutex_lock(&w->wb_lock);
+ w->batches++;
+
atomic_store(&w->job_idx_start, wb.job_idx_start);
atomic_store(&w->job_idx_end, wb.job_idx_end);
- pthread_mutex_unlock(&w->wb_lock);
+ atomic_store(&w->queue_empty, 0);
while ((ret = worker_get_next_job(w)) >= 0)
wb.job_func(wb.func_arg, ret, w->worker_idx);
}
+ fprintf(stderr, "%d %d batches %d (%2.2f) steal races\n", w->worker_idx, w->batches, w->steal_races, (double)w->batches / w->steal_races );
+
return NULL;
}
-int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws)
+int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws, int cpu_idx)
{
Worker *w;
int ret;
@@ -104,9 +164,11 @@ int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws)
w->worker_idx = worker_idx;
w->ws = ws;
+ w->cpu_idx = cpu_idx;
atomic_store(&w->job_idx_start, -1);
atomic_store(&w->job_idx_end, -1);
+ atomic_store(&w->queue_empty, 1);
ret = pthread_create(&w->thread_id, NULL, worker_thread, w);
if (ret != 0) {