#include #include #include #include #include #include #include "common.h" #include "options.h" #include "choices.h" #include "match.h" /* Initial size of buffer for storing input in memory */ #define INITIAL_BUFFER_CAPACITY 4096 /* Initial size of choices array */ #define INITIAL_CHOICE_CAPACITY 128 static int cmpchoice(const void *_idx1, const void *_idx2) { const struct scored_result *a = _idx1; const struct scored_result *b = _idx2; if (a->score == b->score) { /* To ensure a stable sort, we must also sort by the line * indices. */ if (a->idx < b->idx) { return -1; } else { return 1; } } else if (a->score < b->score) { return 1; } else { return -1; } } static void *safe_realloc(void *buffer, size_t size) { buffer = realloc(buffer, size); if (!buffer) { fprintf(stderr, "Error: Can't allocate memory (%zu bytes)\n", size); abort(); } return buffer; } static char *line_split(const char *line, const char *delim, const FieldSelector *fs) { int nb_fields; struct { int start; int len; } *fields = NULL; char *ret = NULL; size_t offset, ret_len; const char *p; /* split line into fields */ p = line; nb_fields = 0; while (*p) { p += strspn(p, delim); if (!*p) break; fields = safe_realloc(fields, (nb_fields + 1) * sizeof(*fields)); fields[nb_fields].start = p - line; fields[nb_fields].len = strcspn(p, delim); p += fields[nb_fields].len; nb_fields++; } /* count output length */ ret_len = offset = 0; for (size_t i = 0; i < fs->nb_ranges; i++) { size_t range_len; int start = fs->ranges[i].start; int end = fs->ranges[i].end; if (start < 0) start = MAX(nb_fields + start, 0); start = MIN(nb_fields - 1, start); if (end < 0) end = MAX(nb_fields + end, 0); end = MIN(nb_fields - 1, end); if (end < start) continue; range_len = fields[end].start + fields[end].len - fields[start].start; ret = safe_realloc(ret, ret_len + range_len + 1); strncpy(ret + offset, line + fields[start].start, range_len); offset += range_len; ret[offset] = 0; } return ret; } void choices_fread(choices_t *c, FILE *file, char input_delimiter) { /* Save current position for parsing later */ size_t buffer_start = c->buffer_size; /* Resize buffer to at least one byte more capacity than our current * size. This uses a power of two of INITIAL_BUFFER_CAPACITY. * This must work even when c->buffer is NULL and c->buffer_size is 0 */ size_t capacity = INITIAL_BUFFER_CAPACITY; while (capacity <= c->buffer_size) capacity *= 2; c->buffer = safe_realloc(c->buffer, capacity); /* Continue reading until we get a "short" read, indicating EOF */ while ((c->buffer_size += fread(c->buffer + c->buffer_size, 1, capacity - c->buffer_size, file)) == capacity) { capacity *= 2; c->buffer = safe_realloc(c->buffer, capacity); } c->buffer = safe_realloc(c->buffer, c->buffer_size + 1); c->buffer[c->buffer_size++] = '\0'; /* Truncate buffer to used size, (maybe) freeing some memory for * future allocations. */ /* Tokenize input and add to choices */ const char *line_end = c->buffer + c->buffer_size; char *line = c->buffer + buffer_start; do { char *nl = strchr(line, input_delimiter); if (nl) *nl++ = '\0'; /* Skip empty lines */ if (*line) choices_add(c, line); line = nl; } while (line && line < line_end); } static void choices_resize(choices_t *c, size_t new_capacity) { c->input_items = safe_realloc(c->input_items, new_capacity * sizeof(*c->input_items)); memset(c->input_items + c->capacity, 0, (new_capacity - c->capacity) * sizeof(*c->input_items)); c->capacity = new_capacity; } static void choices_reset_search(choices_t *c) { free(c->results); c->selection = c->available = 0; c->results = NULL; } void choices_init(choices_t *c, options_t *options) { c->input_items = NULL; c->results = NULL; c->buffer_size = 0; c->buffer = NULL; c->capacity = c->size = 0; c->delimiters = options->delimiters; c->search_fields = &options->search_fields; c->output_fields = &options->output_fields; choices_resize(c, INITIAL_CHOICE_CAPACITY); if (options->workers) { c->worker_count = options->workers; } else { c->worker_count = (int)sysconf(_SC_NPROCESSORS_ONLN); } choices_reset_search(c); } void choices_destroy(choices_t *c) { free(c->buffer); c->buffer = NULL; c->buffer_size = 0; for (size_t i = 0; i < c->size; i++) free(c->input_items[i].allocated); free(c->input_items); c->input_items = NULL; c->capacity = c->size = 0; free(c->results); c->results = NULL; c->available = c->selection = 0; } void choices_add(choices_t *c, const char *line) { InputItem *it; if (c->size == c->capacity) { choices_resize(c, c->capacity * 2); } it = &c->input_items[c->size++]; it->input_line = line; it->search_buf = it->input_line; /* extract the fields to be searched, if requested */ if (c->search_fields->nb_ranges) { it->allocated = line_split(line, c->delimiters, c->search_fields); it->search_buf = it->allocated; } /* Previous search is now invalid */ choices_reset_search(c); } size_t choices_available(choices_t *c) { return c->available; } #define BATCH_SIZE 512 struct result_list { struct scored_result *list; size_t size; }; struct search_job { pthread_mutex_t lock; choices_t *choices; const char *search; size_t processed; struct worker *workers; }; struct worker { pthread_t thread_id; struct search_job *job; unsigned int worker_num; struct result_list result; }; static void worker_get_next_batch(struct search_job *job, size_t *start, size_t *end) { pthread_mutex_lock(&job->lock); *start = job->processed; job->processed += BATCH_SIZE; if (job->processed > job->choices->size) { job->processed = job->choices->size; } *end = job->processed; pthread_mutex_unlock(&job->lock); } static struct result_list merge2(struct result_list list1, struct result_list list2) { size_t result_index = 0, index1 = 0, index2 = 0; struct result_list result; result.size = list1.size + list2.size; result.list = malloc(result.size * sizeof(struct scored_result)); if (!result.list) { fprintf(stderr, "Error: Can't allocate memory\n"); abort(); } while(index1 < list1.size && index2 < list2.size) { if (cmpchoice(&list1.list[index1], &list2.list[index2]) < 0) { result.list[result_index++] = list1.list[index1++]; } else { result.list[result_index++] = list2.list[index2++]; } } while(index1 < list1.size) { result.list[result_index++] = list1.list[index1++]; } while(index2 < list2.size) { result.list[result_index++] = list2.list[index2++]; } free(list1.list); free(list2.list); return result; } static void *choices_search_worker(void *data) { struct worker *w = (struct worker *)data; struct search_job *job = w->job; const choices_t *c = job->choices; struct result_list *result = &w->result; size_t start, end; for(;;) { worker_get_next_batch(job, &start, &end); if(start == end) { break; } for(size_t i = start; i < end; i++) { InputItem *it = &c->input_items[i]; const char *str = it->search_buf; if (has_match(job->search, str)) { result->list[result->size].idx = i; result->list[result->size].score = match(job->search, str); result->size++; } } } /* Sort the partial result */ qsort(result->list, result->size, sizeof(struct scored_result), cmpchoice); /* Fan-in, merging results */ for(unsigned int step = 0;; step++) { if (w->worker_num % (2 << step)) break; unsigned int next_worker = w->worker_num | (1 << step); if (next_worker >= c->worker_count) break; if ((errno = pthread_join(job->workers[next_worker].thread_id, NULL))) { perror("pthread_join"); exit(EXIT_FAILURE); } w->result = merge2(w->result, job->workers[next_worker].result); } return NULL; } void choices_search(choices_t *c, const char *search) { choices_reset_search(c); struct search_job *job = calloc(1, sizeof(struct search_job)); job->search = search; job->choices = c; if (pthread_mutex_init(&job->lock, NULL) != 0) { fprintf(stderr, "Error: pthread_mutex_init failed\n"); abort(); } job->workers = calloc(c->worker_count, sizeof(struct worker)); struct worker *workers = job->workers; for (int i = c->worker_count - 1; i >= 0; i--) { workers[i].job = job; workers[i].worker_num = i; workers[i].result.size = 0; workers[i].result.list = malloc(c->size * sizeof(struct scored_result)); /* FIXME: This is overkill */ /* These must be created last-to-first to avoid a race condition when fanning in */ if ((errno = pthread_create(&workers[i].thread_id, NULL, &choices_search_worker, &workers[i]))) { perror("pthread_create"); exit(EXIT_FAILURE); } } if (pthread_join(workers[0].thread_id, NULL)) { perror("pthread_join"); exit(EXIT_FAILURE); } c->results = workers[0].result.list; c->available = workers[0].result.size; free(workers); pthread_mutex_destroy(&job->lock); free(job); } const char *choices_get_search(choices_t *c, size_t n) { if (n < c->available) return c->input_items[c->results[n].idx].search_buf; return NULL; } char *choices_get_output(choices_t *c, size_t n) { const char *line; if (n >= c->available) return NULL; line = c->input_items[c->results[n].idx].input_line; if (c->output_fields->nb_ranges) return line_split(line, c->delimiters, c->output_fields); return strdup(line); } score_t choices_getscore(choices_t *c, size_t n) { return c->results[n].score; } void choices_prev(choices_t *c) { if (c->available) c->selection = (c->selection + c->available - 1) % c->available; } void choices_next(choices_t *c) { if (c->available) c->selection = (c->selection + 1) % c->available; }