summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2023-06-19 15:49:24 +0200
committerAnton Khirnov <anton@khirnov.net>2023-06-19 15:49:24 +0200
commit50b1708e140742e87d69c6fd07599bdb100a1517 (patch)
treeffa9a03dd4ba31b160911a6dc64d687444624137
parent153529400c2267c796bfc720ec7e8bfcddd98f43 (diff)
WIPwip
-rw-r--r--Makefile5
-rw-r--r--libthreadpool.v2
-rw-r--r--threadpool.c252
-rw-r--r--threadpool.h5
-rw-r--r--worker.c84
-rw-r--r--worker.h4
6 files changed, 291 insertions, 61 deletions
diff --git a/Makefile b/Makefile
index d2c4b7d..4c6e883 100644
--- a/Makefile
+++ b/Makefile
@@ -1,12 +1,13 @@
-CFLAGS = -std=c11 -D_XOPEN_SOURCE=700 -fPIC -g -O3 -Wall -fsanitize=thread
+CFLAGS = -std=c11 -D_XOPEN_SOURCE=700 -D_GNU_SOURCE -fPIC -g -O3 -Wall
LIBS=-lm
-LDFLAGS = -Wl,--version-script=libthreadpool.v -shared -pthread $(LIBS) -fsanitize=thread
+LDFLAGS = -Wl,--version-script=libthreadpool.v -shared -pthread $(LIBS)
CC = cc
TARGET = libthreadpool.so
OBJECTS = \
threadpool.o \
+ timer.o \
worker.o \
all: $(TARGET)
diff --git a/libthreadpool.v b/libthreadpool.v
index e8fe7b5..6597847 100644
--- a/libthreadpool.v
+++ b/libthreadpool.v
@@ -1,4 +1,4 @@
-LIBTHREADPOOL_1 {
+LIBTHREADPOOL_2 {
global: tp_*;
local: *;
};
diff --git a/threadpool.c b/threadpool.c
index 50d8cb7..049f406 100644
--- a/threadpool.c
+++ b/threadpool.c
@@ -23,39 +23,66 @@
#include <sys/time.h>
#include "threadpool.h"
+#include "timer.h"
#include "worker.h"
+#include <sched.h>
#include <stdio.h>
+#define MAX(x, y) ((x) < (y) ? (y) : (x))
#define MIN(x, y) ((x) < (y) ? (x) : (y))
struct TPContext {
struct {
Worker *w;
WorkBatch wb;
- int have_work;
+ atomic_int have_work;
pthread_mutex_t work_lock;
pthread_cond_t work_cond;
- atomic_int work_completed;
atomic_int queue_empty;
+ atomic_int being_robbed;
+
+ unsigned int steal_iterations;
+ unsigned int steals;
unsigned int rand_seed;
} *workers;
unsigned int nb_workers;
+ pthread_mutex_t init_lock;
+ pthread_cond_t init_cond;
+ atomic_int init_done;
atomic_int finish;
pthread_mutex_t execute_lock;
+ pthread_mutex_t submit_lock;
+ pthread_cond_t submit_cond;
+
pthread_mutex_t progress_lock;
pthread_cond_t progress_cond;
atomic_int nb_workers_finished;
atomic_int nb_workers_queue_empty;
+
+ Timer timer_execute;
+ Timer timer_submit;
+ Timer timer_all;
};
+void tpi_worksource_queue_empty(TPContext *ctx, unsigned int worker_idx)
+{
+ atomic_store(&ctx->workers[worker_idx].queue_empty, 1);
+ atomic_fetch_add(&ctx->nb_workers_queue_empty, 1);
+}
+
+void pause(void)
+{
+ __asm__ volatile("rep; nop" ::: "memory");
+}
+
void tp_free(TPContext **pctx)
{
TPContext *ctx = *pctx;
@@ -63,21 +90,35 @@ void tp_free(TPContext **pctx)
if (!ctx)
return;
+ fprintf(stderr, "%ld executes: all %g us; submit %g us; execute %g us\n",
+ ctx->timer_all.nb_runs, ctx->timer_all.time_nsec / 1e3 / ctx->timer_all.nb_runs,
+ ctx->timer_submit.time_nsec / 1e3 / ctx->timer_submit.nb_runs,
+ ctx->timer_execute.time_nsec / 1e3 / ctx->timer_execute.nb_runs);
+
atomic_store(&ctx->finish, 1);
+ for (unsigned int i = 0; i < ctx->nb_workers; i++)
+ atomic_store(&ctx->workers[i].have_work, 1);
- for (unsigned int i = 0; i < ctx->nb_workers; i++) {
- pthread_mutex_lock(&ctx->workers[i].work_lock);
- pthread_cond_signal(&ctx->workers[i].work_cond);
- pthread_mutex_unlock(&ctx->workers[i].work_lock);
+ pthread_mutex_lock(&ctx->submit_lock);
+ pthread_cond_broadcast(&ctx->submit_cond);
+ pthread_mutex_unlock(&ctx->submit_lock);
+ for (unsigned int i = 0; i < ctx->nb_workers; i++) {
+ fprintf(stderr, "%d %d steals %g iterations/steal\n", i, ctx->workers[i].steals, (double)ctx->workers[i].steal_iterations / ctx->workers[i].steals);
tpi_worker_destroy(&ctx->workers[i].w);
- pthread_mutex_destroy(&ctx->workers[i].work_lock);
- pthread_cond_destroy(&ctx->workers[i].work_cond);
}
free(ctx->workers);
ctx->nb_workers = 0;
pthread_mutex_destroy(&ctx->execute_lock);
+ pthread_mutex_destroy(&ctx->init_lock);
+ pthread_cond_destroy(&ctx->init_cond);
+
+ pthread_mutex_destroy(&ctx->progress_lock);
+ pthread_cond_destroy(&ctx->progress_cond);
+
+ pthread_mutex_destroy(&ctx->submit_lock);
+ pthread_cond_destroy(&ctx->submit_cond);
free(ctx);
*pctx = NULL;
@@ -86,11 +127,24 @@ void tp_free(TPContext **pctx)
int tp_init(TPContext **pctx, unsigned int nb_threads)
{
TPContext *ctx = NULL;
+ cpu_set_t cpus;
+ int *cpu_indices;
int ret;
- if (!nb_threads) {
- ret = -EINVAL;
- goto fail;
+ if (!nb_threads)
+ return -EINVAL;
+
+ ret = sched_getaffinity(0, sizeof(cpus), &cpus);
+ if (ret != 0)
+ return errno;
+
+ cpu_indices = malloc(CPU_COUNT(&cpus) * sizeof(*cpu_indices));
+ if (!cpu_indices)
+ return -ENOMEM;
+
+ for (int cur = 0, i = 0; i < CPU_SETSIZE; i++) {
+ if (CPU_ISSET(i, &cpus))
+ cpu_indices[cur++] = i;
}
ctx = calloc(1, sizeof(*ctx));
@@ -100,7 +154,21 @@ int tp_init(TPContext **pctx, unsigned int nb_threads)
}
pthread_mutex_init(&ctx->execute_lock, NULL);
- atomic_store(&ctx->finish, 0);
+ atomic_store(&ctx->finish, 0);
+
+ pthread_mutex_init(&ctx->init_lock, NULL);
+ pthread_cond_init(&ctx->init_cond, NULL);
+ atomic_store(&ctx->init_done, 0);
+
+ pthread_mutex_init(&ctx->progress_lock, NULL);
+ pthread_cond_init(&ctx->progress_cond, NULL);
+
+ pthread_mutex_init(&ctx->submit_lock, NULL);
+ pthread_cond_init(&ctx->submit_cond, NULL);
+
+ tpi_timer_init(&ctx->timer_all);
+ tpi_timer_init(&ctx->timer_execute);
+ tpi_timer_init(&ctx->timer_submit);
ctx->workers = calloc(nb_threads, sizeof(*ctx->workers));
if (!ctx->workers) {
@@ -108,25 +176,44 @@ int tp_init(TPContext **pctx, unsigned int nb_threads)
goto fail;
}
+ /* spawn the workers */
for (unsigned int i = 0; i < nb_threads; i++) {
struct timeval tv;
- pthread_mutex_init(&ctx->workers[i].work_lock, NULL);
- pthread_cond_init(&ctx->workers[i].work_cond, NULL);
-
gettimeofday(&tv, NULL);
ctx->workers[i].rand_seed = tv.tv_usec;
- ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx);
+ ret = tpi_worker_spawn(&ctx->workers[i].w, i, ctx, cpu_indices[i % CPU_COUNT(&cpus)]);
if (ret < 0)
- goto fail;
+ goto fail_spawn;
ctx->nb_workers++;
}
+ /* signal to the workers that we are done initializing */
+ pthread_mutex_lock(&ctx->init_lock);
+ atomic_store(&ctx->init_done, 1);
+ pthread_cond_broadcast(&ctx->init_cond);
+ pthread_mutex_unlock(&ctx->init_lock);
+
+ /* wait until all the workers are settled */
+ pthread_mutex_lock(&ctx->progress_lock);
+ while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers)
+ pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock);
+ pthread_mutex_unlock(&ctx->progress_lock);
+
*pctx = ctx;
+ free(cpu_indices);
+
return 0;
+fail_spawn:
+ pthread_mutex_lock(&ctx->init_lock);
+ atomic_store(&ctx->init_done, 1);
+ pthread_cond_broadcast(&ctx->init_cond);
+ pthread_mutex_unlock(&ctx->init_lock);
+
fail:
+ free(cpu_indices);
tp_free(&ctx);
*pctx = NULL;
return ret;
@@ -135,22 +222,40 @@ fail:
int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx,
WorkBatch *wb)
{
- pthread_mutex_lock(&ctx->progress_lock);
- atomic_store(&ctx->workers[worker_idx].work_completed, 1);
- atomic_fetch_add(&ctx->nb_workers_finished, 1);
- pthread_cond_signal(&ctx->progress_cond);
- pthread_mutex_unlock(&ctx->progress_lock);
+ /* if the context is still spawning threads, we wait until it is done */
+ if (!atomic_load(&ctx->init_done)) {
+ pthread_mutex_lock(&ctx->init_lock);
+ while (!atomic_load(&ctx->init_done))
+ pthread_cond_wait(&ctx->init_cond, &ctx->init_lock);
+ pthread_mutex_unlock(&ctx->init_lock);
+ }
- pthread_mutex_lock(&ctx->workers[worker_idx].work_lock);
+ /* signal to the context that we are done */
+ if (atomic_fetch_add(&ctx->nb_workers_finished, 1) + 1 == ctx->nb_workers) {
+ pthread_mutex_lock(&ctx->progress_lock);
+ pthread_cond_signal(&ctx->progress_cond);
+ pthread_mutex_unlock(&ctx->progress_lock);
+ }
- while (!atomic_load(&ctx->finish) && !ctx->workers[worker_idx].have_work)
- pthread_cond_wait(&ctx->workers[worker_idx].work_cond, &ctx->workers[worker_idx].work_lock);
+ for (int i = 0; i < (1 << 0); i++) {
+ if (atomic_load(&ctx->workers[worker_idx].have_work))
+ break;
+ if (!(i & ((1 << 8) - 1)))
+ sched_yield();
+ pause();
+ }
+ if (!atomic_load(&ctx->workers[worker_idx].have_work)) {
+ pthread_mutex_lock(&ctx->submit_lock);
+
+ while (!atomic_load(&ctx->workers[worker_idx].have_work))
+ pthread_cond_wait(&ctx->submit_cond, &ctx->submit_lock);
+
+ pthread_mutex_unlock(&ctx->submit_lock);
+ }
memcpy(wb, &ctx->workers[worker_idx].wb, sizeof(*wb));
wb->finish = atomic_load(&ctx->finish);
- ctx->workers[worker_idx].have_work = 0;
-
- pthread_mutex_unlock(&ctx->workers[worker_idx].work_lock);
+ atomic_store(&ctx->workers[worker_idx].have_work, 0);
return 0;
}
@@ -158,58 +263,113 @@ int tpi_worksource_wait_jobs(TPContext *ctx, unsigned int worker_idx,
int tpi_worksource_steal_job(TPContext *ctx, unsigned int worker_idx)
{
int ret = -1;
+ unsigned int w_victim_idx, iterations = 0;
- atomic_store(&ctx->workers[worker_idx].queue_empty, 1);
- atomic_fetch_add(&ctx->nb_workers_queue_empty, 1);
-
+ w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers;
while (atomic_load(&ctx->nb_workers_queue_empty) < ctx->nb_workers) {
- unsigned int w_victim_idx = rand_r(&ctx->workers[worker_idx].rand_seed) % ctx->nb_workers;
+ int expected = 0;
+
+ iterations++;
- if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty))
+ if (w_victim_idx == worker_idx || atomic_load(&ctx->workers[w_victim_idx].queue_empty)) {
+ w_victim_idx = (w_victim_idx + 1) % ctx->nb_workers;
continue;
+ }
+
+ if (!atomic_compare_exchange_strong(&ctx->workers[w_victim_idx].being_robbed, &expected, 1)) {
+ w_victim_idx = (w_victim_idx + 1) % ctx->nb_workers;
+ continue;
+ }
ret = tpi_worker_steal_job(ctx->workers[w_victim_idx].w);
+ atomic_store(&ctx->workers[w_victim_idx].being_robbed, 0);
if (ret >= 0)
break;
}
+ ctx->workers[worker_idx].steals++;
+ ctx->workers[worker_idx].steal_iterations += iterations;
+
return ret;
}
+static int execute_serial(unsigned int nb_jobs, TPExecuteCallback func,
+ void *func_arg)
+{
+ for (unsigned int i = 0; i < nb_jobs; i++)
+ func(func_arg, i, 0);
+
+ return 0;
+}
+
int tp_execute(TPContext *ctx, unsigned int nb_jobs,
TPExecuteCallback func, void *func_arg)
{
- unsigned int max_batch_size = (nb_jobs + ctx->nb_workers - 1) / ctx->nb_workers;
+ unsigned int max_batch_size = MAX(1, nb_jobs / ctx->nb_workers);
+
+ if (nb_jobs == 1 || ctx->nb_workers == 1)
+ return execute_serial(nb_jobs, func, func_arg);
+ tpi_timer_start(&ctx->timer_all);
pthread_mutex_lock(&ctx->execute_lock);
- atomic_store(&ctx->nb_workers_finished, 0);
+ atomic_store(&ctx->nb_workers_finished, 0);
atomic_store(&ctx->nb_workers_queue_empty, 0);
+ tpi_timer_start(&ctx->timer_submit);
for (unsigned int i = 0; i < ctx->nb_workers; i++) {
WorkBatch *wb = &ctx->workers[i].wb;
+ unsigned int job_start = i * max_batch_size;
+ unsigned int job_end = (i == ctx->nb_workers - 1) ? nb_jobs : MIN((i + 1) * max_batch_size, nb_jobs);
- pthread_mutex_lock(&ctx->workers[i].work_lock);
+ if (job_start >= job_end) {
+ atomic_fetch_add(&ctx->nb_workers_finished, 1);
+ atomic_fetch_add(&ctx->nb_workers_queue_empty, 1);
+ continue;
+ }
wb->job_func = func;
wb->func_arg = func_arg;
- wb->job_idx_start = i * max_batch_size;
- wb->job_idx_end = MIN((i + 1) * max_batch_size, nb_jobs);
- ctx->workers[i].have_work = 1;
- atomic_store(&ctx->workers[i].work_completed, 0);
+ wb->job_idx_start = job_start;
+ wb->job_idx_end = job_end;
+ atomic_store(&ctx->workers[i].have_work, 1);
atomic_store(&ctx->workers[i].queue_empty, 0);
+ atomic_store(&ctx->workers[i].being_robbed, 0);
+
+ }
- pthread_cond_signal(&ctx->workers[i].work_cond);
+ pthread_mutex_lock(&ctx->submit_lock);
+ pthread_cond_broadcast(&ctx->submit_cond);
+ pthread_mutex_unlock(&ctx->submit_lock);
+ tpi_timer_stop(&ctx->timer_submit);
- pthread_mutex_unlock(&ctx->workers[i].work_lock);
+ tpi_timer_start(&ctx->timer_execute);
+
+ for (int i = 0; i < (1 << 0); i++) {
+ if (atomic_load(&ctx->nb_workers_finished) >= ctx->nb_workers)
+ break;
+ if (!(i & ((1 << 8) - 1)))
+ sched_yield();
+ pause();
}
- pthread_mutex_lock(&ctx->progress_lock);
- while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers)
- pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock);
- pthread_mutex_unlock(&ctx->progress_lock);
+ if (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers) {
+ pthread_mutex_lock(&ctx->progress_lock);
+ while (atomic_load(&ctx->nb_workers_finished) < ctx->nb_workers)
+ pthread_cond_wait(&ctx->progress_cond, &ctx->progress_lock);
+ pthread_mutex_unlock(&ctx->progress_lock);
+ }
+
+ tpi_timer_stop(&ctx->timer_execute);
pthread_mutex_unlock(&ctx->execute_lock);
+ tpi_timer_stop(&ctx->timer_all);
+
return 0;
}
+
+unsigned int tp_get_nb_threads(TPContext *ctx)
+{
+ return ctx->nb_workers;
+}
diff --git a/threadpool.h b/threadpool.h
index 17bf3f2..0f120ab 100644
--- a/threadpool.h
+++ b/threadpool.h
@@ -63,4 +63,9 @@ void tp_free(TPContext **ctx);
int tp_execute(TPContext *ctx, unsigned int nb_jobs,
TPExecuteCallback func, void *func_arg);
+/**
+ * Get the number of threads in this thread pool.
+ */
+unsigned int tp_get_nb_threads(TPContext *ctx);
+
#endif /* TP_THREADPOOL_H */
diff --git a/worker.c b/worker.c
index 306bc10..8686ec1 100644
--- a/worker.c
+++ b/worker.c
@@ -20,28 +20,58 @@
#include <stdatomic.h>
#include <stdlib.h>
+#include <stdio.h>
+#include <sched.h>
+
#include "worker.h"
+#define ENABLE_WORK_STEAL 1
+
struct Worker {
WorkSource *ws;
unsigned int worker_idx;
+ int cpu_idx;
pthread_t thread_id;
pthread_mutex_t wb_lock;
atomic_int job_idx_start;
atomic_int job_idx_end;
+ atomic_int queue_empty;
+
+ int batches;
+ int steal_races;
};
+static void queue_empty(Worker *w)
+{
+ atomic_store(&w->queue_empty, 1);
+ tpi_worksource_queue_empty(w->ws, w->worker_idx);
+}
+
int tpi_worker_steal_job(Worker *w)
{
int ret = -1;
- pthread_mutex_lock(&w->wb_lock);
+#if ENABLE_WORK_STEAL
+ int job_idx_start, job_idx_end;
- if (w->job_idx_start < w->job_idx_end)
- ret = --w->job_idx_end;
+ if (atomic_load(&w->queue_empty))
+ return -1;
+
+ //pthread_mutex_lock(&w->wb_lock);
+
+ job_idx_end = atomic_fetch_add(&w->job_idx_end, -1);
+ job_idx_start = atomic_load(&w->job_idx_start);
+
+ if (job_idx_start < job_idx_end) {
+ ret = job_idx_end - 1;
+ if (job_idx_start + 1 >= job_idx_end)
+ queue_empty(w);
+ } else
+ atomic_fetch_add(&w->job_idx_end, 1);
- pthread_mutex_unlock(&w->wb_lock);
+ //pthread_mutex_unlock(&w->wb_lock);
+#endif
return ret;
}
@@ -50,15 +80,31 @@ static int worker_get_next_job(Worker *w)
{
int ret = -1;
- pthread_mutex_lock(&w->wb_lock);
+#if ENABLE_WORK_STEAL
+ while (!atomic_load(&w->queue_empty)) {
+ int job_idx_start, job_idx_end;
- if (w->job_idx_start < w->job_idx_end)
- ret = w->job_idx_start++;
+ job_idx_start = atomic_fetch_add(&w->job_idx_start, 1);
+ job_idx_end = atomic_load(&w->job_idx_end);
+
+ if (job_idx_start < job_idx_end) {
+ if (job_idx_start + 1 >= job_idx_end)
+ queue_empty(w);
+ return job_idx_start;
+ }
- pthread_mutex_unlock(&w->wb_lock);
+ // we got preempted by a thief, unroll our changes and try again
+ atomic_fetch_add(&w->job_idx_start, -1);
+ w->steal_races++;
+ }
if (ret < 0)
ret = tpi_worksource_steal_job(w->ws, w->worker_idx);
+#else
+ if (w->job_idx_start < w->job_idx_end)
+ return w->job_idx_start++;
+ queue_empty(w);
+#endif
return ret;
}
@@ -68,6 +114,17 @@ static void *worker_thread(void *arg)
Worker *w = arg;
int ret;
+ if (w->cpu_idx >= 0) {
+ cpu_set_t cpumask;
+
+ CPU_ZERO(&cpumask);
+ CPU_SET(w->cpu_idx, &cpumask);
+
+ ret = sched_setaffinity(0, sizeof(cpumask), &cpumask);
+ if (ret != 0)
+ fprintf(stderr, "setaffinity(%d) failed\n", w->cpu_idx);
+ }
+
while (1) {
WorkBatch wb;
@@ -75,19 +132,22 @@ static void *worker_thread(void *arg)
if (ret < 0 || wb.finish)
break;
- pthread_mutex_lock(&w->wb_lock);
+ w->batches++;
+
atomic_store(&w->job_idx_start, wb.job_idx_start);
atomic_store(&w->job_idx_end, wb.job_idx_end);
- pthread_mutex_unlock(&w->wb_lock);
+ atomic_store(&w->queue_empty, 0);
while ((ret = worker_get_next_job(w)) >= 0)
wb.job_func(wb.func_arg, ret, w->worker_idx);
}
+ fprintf(stderr, "%d %d batches %d (%2.2f) steal races\n", w->worker_idx, w->batches, w->steal_races, (double)w->batches / w->steal_races );
+
return NULL;
}
-int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws)
+int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws, int cpu_idx)
{
Worker *w;
int ret;
@@ -104,9 +164,11 @@ int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws)
w->worker_idx = worker_idx;
w->ws = ws;
+ w->cpu_idx = cpu_idx;
atomic_store(&w->job_idx_start, -1);
atomic_store(&w->job_idx_end, -1);
+ atomic_store(&w->queue_empty, 1);
ret = pthread_create(&w->thread_id, NULL, worker_thread, w);
if (ret != 0) {
diff --git a/worker.h b/worker.h
index 1fa87d8..33a760c 100644
--- a/worker.h
+++ b/worker.h
@@ -50,6 +50,8 @@ int tpi_worksource_wait_jobs(WorkSource *ws, unsigned int worker_idx,
*/
int tpi_worksource_steal_job(WorkSource *ws, unsigned int worker_idx);
+void tpi_worksource_queue_empty(WorkSource *ws, unsigned int worker_idx);
+
typedef struct Worker Worker;
/**
@@ -57,7 +59,7 @@ typedef struct Worker Worker;
*
* @return 0 if the worker has been launched, a negative error code otherwise
*/
-int tpi_worker_spawn(Worker **w, unsigned int worker_idx, WorkSource *ws);
+int tpi_worker_spawn(Worker **w, unsigned int worker_idx, WorkSource *ws, int cpu_idx);
void tpi_worker_destroy(Worker **w);
/**