From 1f77253e52439a9441daa5748cb4e4f4a98bd18d Mon Sep 17 00:00:00 2001 From: John Hawthorn Date: Sat, 14 Jan 2017 22:10:16 -0800 Subject: Merge partially sorted lists in parallel --- src/choices.c | 115 ++++++++++++++++++++++++++++------------------------------ 1 file changed, 56 insertions(+), 59 deletions(-) (limited to 'src') diff --git a/src/choices.c b/src/choices.c index 85403b4..6e34fd6 100644 --- a/src/choices.c +++ b/src/choices.c @@ -3,6 +3,7 @@ #include #include #include +#include #include "choices.h" #include "match.h" @@ -150,14 +151,14 @@ struct search_job { choices_t *choices; const char *search; size_t processed; + struct worker *workers; }; struct worker { pthread_t thread_id; struct search_job *job; - size_t worker_num; - struct scored_result *results; - size_t available; + unsigned int worker_num; + struct result_list result; }; static void worker_get_next_batch(struct search_job *job, size_t *start, size_t *end) { @@ -175,35 +176,6 @@ static void worker_get_next_batch(struct search_job *job, size_t *start, size_t pthread_mutex_unlock(&job->lock); } -static void *choices_search_worker(void *data) { - struct worker *w = (struct worker *)data; - struct search_job *job = w->job; - const choices_t *c = job->choices; - - size_t start, end; - - for(;;) { - worker_get_next_batch(job, &start, &end); - - if(start == end) { - break; - } - - for(size_t i = start; i < end; i++) { - if (has_match(job->search, c->strings[i])) { - w->results[w->available].str = c->strings[i]; - w->results[w->available].score = match(job->search, c->strings[i]); - w->available++; - } - } - } - - /* Sort the partial result */ - qsort(w->results, w->available, sizeof(struct scored_result), cmpchoice); - - return w; -} - static struct result_list merge2(struct result_list list1, struct result_list list2) { size_t result_index = 0, index1 = 0, index2 = 0; @@ -236,25 +208,51 @@ static struct result_list merge2(struct result_list list1, struct result_list li return result; } -static void merge_step(struct search_job *job, struct worker *workers) { - /* Merge our sorted partial-results */ - choices_t *c = job->choices; +static void *choices_search_worker(void *data) { + struct worker *w = (struct worker *)data; + struct search_job *job = w->job; + const choices_t *c = job->choices; + struct result_list *result = &w->result; - struct result_list result = {NULL, 0}; + size_t start, end; + + for(;;) { + worker_get_next_batch(job, &start, &end); + + if(start == end) { + break; + } + + for(size_t i = start; i < end; i++) { + if (has_match(job->search, c->strings[i])) { + result->list[result->size].str = c->strings[i]; + result->list[result->size].score = match(job->search, c->strings[i]); + result->size++; + } + } + } - for (unsigned int w = 0; w < c->worker_count; w++) { - struct result_list new_result; - struct worker *worker = &workers[w]; + /* Sort the partial result */ + qsort(result->list, result->size, sizeof(struct scored_result), cmpchoice); + + /* Fan-in, merging results */ + for(unsigned int step = 0;; step++) { + if (w->worker_num % (2 << step)) + break; - struct result_list worker_result = {worker->results, worker->available}; - new_result = merge2(result, worker_result); + unsigned int next_worker = w->worker_num | (1 << step); + if (next_worker >= c->worker_count) + break; + + if ((errno = pthread_join(job->workers[next_worker].thread_id, NULL))) { + perror("pthread_join"); + exit(EXIT_FAILURE); + } - free(result.list); - result = new_result; + w->result = merge2(w->result, job->workers[next_worker].result); } - c->results = result.list; - c->available = result.size; + return NULL; } void choices_search(choices_t *c, const char *search) { @@ -267,31 +265,30 @@ void choices_search(choices_t *c, const char *search) { fprintf(stderr, "Error: pthread_mutex_init failed\n"); abort(); } + job->workers = calloc(c->worker_count, sizeof(struct worker)); - struct worker *workers = calloc(c->worker_count, sizeof(struct worker)); - for (unsigned int i = 0; i < c->worker_count; i++) { + struct worker *workers = job->workers; + for (int i = c->worker_count - 1; i >= 0; i--) { workers[i].job = job; workers[i].worker_num = i; - workers[i].results = malloc(c->size * sizeof(struct scored_result)); /* FIXME: This is overkill */ - if (pthread_create(&workers[i].thread_id, NULL, &choices_search_worker, &workers[i])) { + workers[i].result.size = 0; + workers[i].result.list = malloc(c->size * sizeof(struct scored_result)); /* FIXME: This is overkill */ + + /* These must be created last-to-first to avoid a race condition when fanning in */ + if ((errno = pthread_create(&workers[i].thread_id, NULL, &choices_search_worker, &workers[i]))) { perror("pthread_create"); exit(EXIT_FAILURE); } } - for (unsigned int i = 0; i < c->worker_count; i++) { - if (pthread_join(workers[i].thread_id, NULL)) { - perror("pthread_join"); - exit(EXIT_FAILURE); - } - + if (pthread_join(workers[0].thread_id, NULL)) { + perror("pthread_join"); + exit(EXIT_FAILURE); } - merge_step(job, workers); + c->results = workers[0].result.list; + c->available = workers[0].result.size; - for (unsigned int i = 0; i < c->worker_count; i++) { - free(workers[i].results); - } free(workers); pthread_mutex_destroy(&job->lock); free(job); -- cgit v1.2.3