summaryrefslogtreecommitdiff
path: root/src/choices.c
diff options
context:
space:
mode:
authorJohn Hawthorn <john.hawthorn@gmail.com>2017-01-14 22:10:16 -0800
committerJohn Hawthorn <john.hawthorn@gmail.com>2017-01-26 22:09:16 -0800
commit1f77253e52439a9441daa5748cb4e4f4a98bd18d (patch)
treea2a331b7abba0bd0b5a3f2a04834f367aed23a0f /src/choices.c
parent9bd9d1f3d9077b9ba7baf339fe268823c002b08f (diff)
Merge partially sorted lists in parallel
Diffstat (limited to 'src/choices.c')
-rw-r--r--src/choices.c115
1 files changed, 56 insertions, 59 deletions
diff --git a/src/choices.c b/src/choices.c
index 85403b4..6e34fd6 100644
--- a/src/choices.c
+++ b/src/choices.c
@@ -3,6 +3,7 @@
#include <string.h>
#include <pthread.h>
#include <unistd.h>
+#include <errno.h>
#include "choices.h"
#include "match.h"
@@ -150,14 +151,14 @@ struct search_job {
choices_t *choices;
const char *search;
size_t processed;
+ struct worker *workers;
};
struct worker {
pthread_t thread_id;
struct search_job *job;
- size_t worker_num;
- struct scored_result *results;
- size_t available;
+ unsigned int worker_num;
+ struct result_list result;
};
static void worker_get_next_batch(struct search_job *job, size_t *start, size_t *end) {
@@ -175,35 +176,6 @@ static void worker_get_next_batch(struct search_job *job, size_t *start, size_t
pthread_mutex_unlock(&job->lock);
}
-static void *choices_search_worker(void *data) {
- struct worker *w = (struct worker *)data;
- struct search_job *job = w->job;
- const choices_t *c = job->choices;
-
- size_t start, end;
-
- for(;;) {
- worker_get_next_batch(job, &start, &end);
-
- if(start == end) {
- break;
- }
-
- for(size_t i = start; i < end; i++) {
- if (has_match(job->search, c->strings[i])) {
- w->results[w->available].str = c->strings[i];
- w->results[w->available].score = match(job->search, c->strings[i]);
- w->available++;
- }
- }
- }
-
- /* Sort the partial result */
- qsort(w->results, w->available, sizeof(struct scored_result), cmpchoice);
-
- return w;
-}
-
static struct result_list merge2(struct result_list list1, struct result_list list2) {
size_t result_index = 0, index1 = 0, index2 = 0;
@@ -236,25 +208,51 @@ static struct result_list merge2(struct result_list list1, struct result_list li
return result;
}
-static void merge_step(struct search_job *job, struct worker *workers) {
- /* Merge our sorted partial-results */
- choices_t *c = job->choices;
+static void *choices_search_worker(void *data) {
+ struct worker *w = (struct worker *)data;
+ struct search_job *job = w->job;
+ const choices_t *c = job->choices;
+ struct result_list *result = &w->result;
- struct result_list result = {NULL, 0};
+ size_t start, end;
+
+ for(;;) {
+ worker_get_next_batch(job, &start, &end);
+
+ if(start == end) {
+ break;
+ }
+
+ for(size_t i = start; i < end; i++) {
+ if (has_match(job->search, c->strings[i])) {
+ result->list[result->size].str = c->strings[i];
+ result->list[result->size].score = match(job->search, c->strings[i]);
+ result->size++;
+ }
+ }
+ }
- for (unsigned int w = 0; w < c->worker_count; w++) {
- struct result_list new_result;
- struct worker *worker = &workers[w];
+ /* Sort the partial result */
+ qsort(result->list, result->size, sizeof(struct scored_result), cmpchoice);
+
+ /* Fan-in, merging results */
+ for(unsigned int step = 0;; step++) {
+ if (w->worker_num % (2 << step))
+ break;
- struct result_list worker_result = {worker->results, worker->available};
- new_result = merge2(result, worker_result);
+ unsigned int next_worker = w->worker_num | (1 << step);
+ if (next_worker >= c->worker_count)
+ break;
+
+ if ((errno = pthread_join(job->workers[next_worker].thread_id, NULL))) {
+ perror("pthread_join");
+ exit(EXIT_FAILURE);
+ }
- free(result.list);
- result = new_result;
+ w->result = merge2(w->result, job->workers[next_worker].result);
}
- c->results = result.list;
- c->available = result.size;
+ return NULL;
}
void choices_search(choices_t *c, const char *search) {
@@ -267,31 +265,30 @@ void choices_search(choices_t *c, const char *search) {
fprintf(stderr, "Error: pthread_mutex_init failed\n");
abort();
}
+ job->workers = calloc(c->worker_count, sizeof(struct worker));
- struct worker *workers = calloc(c->worker_count, sizeof(struct worker));
- for (unsigned int i = 0; i < c->worker_count; i++) {
+ struct worker *workers = job->workers;
+ for (int i = c->worker_count - 1; i >= 0; i--) {
workers[i].job = job;
workers[i].worker_num = i;
- workers[i].results = malloc(c->size * sizeof(struct scored_result)); /* FIXME: This is overkill */
- if (pthread_create(&workers[i].thread_id, NULL, &choices_search_worker, &workers[i])) {
+ workers[i].result.size = 0;
+ workers[i].result.list = malloc(c->size * sizeof(struct scored_result)); /* FIXME: This is overkill */
+
+ /* These must be created last-to-first to avoid a race condition when fanning in */
+ if ((errno = pthread_create(&workers[i].thread_id, NULL, &choices_search_worker, &workers[i]))) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
- for (unsigned int i = 0; i < c->worker_count; i++) {
- if (pthread_join(workers[i].thread_id, NULL)) {
- perror("pthread_join");
- exit(EXIT_FAILURE);
- }
-
+ if (pthread_join(workers[0].thread_id, NULL)) {
+ perror("pthread_join");
+ exit(EXIT_FAILURE);
}
- merge_step(job, workers);
+ c->results = workers[0].result.list;
+ c->available = workers[0].result.size;
- for (unsigned int i = 0; i < c->worker_count; i++) {
- free(workers[i].results);
- }
free(workers);
pthread_mutex_destroy(&job->lock);
free(job);