/* * Copyright 2018 Anton Khirnov * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include "worker.h" struct Worker { WorkSource *ws; unsigned int worker_idx; pthread_t thread_id; pthread_mutex_t wb_lock; atomic_int job_idx_start; atomic_int job_idx_end; }; int tpi_worker_steal_job(Worker *w) { int ret = -1; pthread_mutex_lock(&w->wb_lock); if (w->job_idx_start < w->job_idx_end) ret = --w->job_idx_end; pthread_mutex_unlock(&w->wb_lock); return ret; } static int worker_get_next_job(Worker *w) { int ret = -1; pthread_mutex_lock(&w->wb_lock); if (w->job_idx_start < w->job_idx_end) ret = w->job_idx_start++; pthread_mutex_unlock(&w->wb_lock); if (ret < 0) ret = tpi_worksource_steal_job(w->ws, w->worker_idx); return ret; } static void *worker_thread(void *arg) { Worker *w = arg; int ret; while (1) { WorkBatch wb; ret = tpi_worksource_wait_jobs(w->ws, w->worker_idx, &wb); if (ret < 0 || wb.finish) break; pthread_mutex_lock(&w->wb_lock); atomic_store(&w->job_idx_start, wb.job_idx_start); atomic_store(&w->job_idx_end, wb.job_idx_end); pthread_mutex_unlock(&w->wb_lock); while ((ret = worker_get_next_job(w)) >= 0) wb.job_func(wb.func_arg, ret, w->worker_idx); } return NULL; } int tpi_worker_spawn(Worker **pw, unsigned int worker_idx, WorkSource *ws) { Worker *w; int ret; w = calloc(1, sizeof(*w)); if (!w) return -ENOMEM; ret = pthread_mutex_init(&w->wb_lock, NULL); if (ret != 0) { ret = -ret; goto fail_free; } w->worker_idx = worker_idx; w->ws = ws; atomic_store(&w->job_idx_start, -1); atomic_store(&w->job_idx_end, -1); ret = pthread_create(&w->thread_id, NULL, worker_thread, w); if (ret != 0) { ret = -ret; goto fail_free; } *pw = w; return 0; fail_free: free(w); return ret; } void tpi_worker_destroy(Worker **pw) { Worker *w = *pw; if (!w) return; pthread_join(w->thread_id, NULL); pthread_mutex_destroy(&w->wb_lock); free(w); *pw = NULL; }