diff options
-rw-r--r-- | alot/buffers/search.py | 46 | ||||
-rw-r--r-- | alot/db/manager.py | 130 | ||||
-rw-r--r-- | tests/db/test_manager.py | 1 |
3 files changed, 17 insertions, 160 deletions
diff --git a/alot/buffers/search.py b/alot/buffers/search.py index c5b6bbcd..8cd76a87 100644 --- a/alot/buffers/search.py +++ b/alot/buffers/search.py @@ -8,16 +8,16 @@ from .buffer import Buffer from ..settings.const import settings from ..widgets.search import ThreadlineWidget -class PipeWalker(urwid.ListWalker): +class IterWalker(urwid.ListWalker): """ - urwid.ListWalker that reads next items from a pipe and wraps them in + urwid.ListWalker that reads next items from a generator and wraps them in ThreadlineWidget widgets for displaying """ _pipe = None _dbman = None - _pipe_eof = False + _iter_done = False # list of the thread IDs _tids = None @@ -26,9 +26,9 @@ class PipeWalker(urwid.ListWalker): _focus = None - def __init__(self, pipe, dbman): - self._pipe = pipe - self._dbman = dbman + def __init__(self, threads, dbman): + self._threads = threads + self._dbman = dbman self._tids = [] self._wgts = {} @@ -38,7 +38,7 @@ class PipeWalker(urwid.ListWalker): super().__init__() def __len__(self): - while not self._pipe_eof: + while not self._iter_done: self._get_next_item() return len(self._tids) @@ -58,7 +58,7 @@ class PipeWalker(urwid.ListWalker): if pos < 0: raise IndexError - while not self._pipe_eof and pos >= len(self._tids): + while not self._iter_done and pos >= len(self._tids): self._get_next_item() if pos >= len(self._tids): @@ -80,13 +80,13 @@ class PipeWalker(urwid.ListWalker): self._modified() def _get_next_item(self): - if self._pipe_eof: + if self._iter_done: return None try: - self._tids.append(self._pipe.recv()) - except EOFError: - self._pipe_eof = True + self._tids.append(next(self._threads)) + except StopIteration: + self._iter_done = True class SearchBuffer(Buffer): """shows a result list of threads for a query""" @@ -102,7 +102,6 @@ class SearchBuffer(Buffer): self.querystring = initialquery default_order = settings.get('search_threads_sort_order') self.sort_order = sort_order or default_order - self.proc = None # process that fills our pipe self.rebuild() super().__init__() @@ -132,21 +131,7 @@ class SearchBuffer(Buffer): info['result_count_positive'] = 's' if info['result_count'] > 1 else '' return info - def cleanup(self): - self.kill_filler_process() - - def kill_filler_process(self): - """ - terminates the process that fills this buffers - :class:`~alot.walker.PipeWalker`. - """ - if self.proc: - if self.proc.is_alive(): - self.proc.terminate() - def rebuild(self): - self.kill_filler_process() - exclude_tags = settings.get_notmuch_setting('search', 'exclude_tags') if exclude_tags: exclude_tags = frozenset([t for t in exclude_tags.split(';') if t]) @@ -157,9 +142,8 @@ class SearchBuffer(Buffer): self._thread_count_val = None try: - self.pipe, self.proc = self.dbman.get_threads(self.querystring, - self.sort_order, - exclude_tags) + threads = self.dbman.get_threads(self.querystring, self.sort_order, + exclude_tags) except NotmuchError: self.ui.notify('malformed query string: %s' % self.querystring, 'error') @@ -167,7 +151,7 @@ class SearchBuffer(Buffer): self.body = self.listbox return - self.threadlist = PipeWalker(self.pipe, self.dbman) + self.threadlist = IterWalker(threads, self.dbman) self.listbox = urwid.ListBox(self.threadlist) self.body = self.listbox diff --git a/alot/db/manager.py b/alot/db/manager.py index cf452c42..c902eb54 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -2,13 +2,8 @@ # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file from collections import deque -import errno import logging -import multiprocessing import os -import signal -import sys -import threading from notmuch2 import Database, NotmuchError, XapianError @@ -28,47 +23,6 @@ def _is_subdir_of(subpath, superpath): # e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b return os.path.commonprefix([subpath, superpath]) == superpath -class FillPipeProcess(multiprocessing.Process): - - def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)): - multiprocessing.Process.__init__(self) - self.it = it - self.pipe = pipe[1] - self.fun = fun - self.keep_going = True - self.stdout = stdout - self.stderr = stderr - - def handle_sigterm(self, signo, frame): - # this is used to suppress any EINTR errors at interpreter - # shutdown - self.keep_going = False - - # raises SystemExit to shut down the interpreter from the - # signal handler - sys.exit() - - def run(self): - # replace filedescriptors 1 and 2 (stdout and stderr) with - # pipes to the parent process - os.dup2(self.stdout, 1) - os.dup2(self.stderr, 2) - - # register a signal handler for SIGTERM - signal.signal(signal.SIGTERM, self.handle_sigterm) - - for a in self.it: - try: - self.pipe.send(self.fun(a)) - except IOError as e: - # suppress spurious EINTR errors at interpreter - # shutdown - if e.errno != errno.EINTR or self.keep_going: - raise - - self.pipe.close() - - class DBManager: """ Keeps track of your index parameters, maintains a write-queue and @@ -188,15 +142,6 @@ class DBManager: raise e logging.debug('flush finished') - def kill_search_processes(self): - """ - terminate all search processes that originate from - this managers :meth:`get_threads`. - """ - for p in self.processes: - p.terminate() - self.processes = [] - def tag(self, querystring, tags, afterwards=None, remove_rest=False): """ add tags to messages matching `querystring`. @@ -287,73 +232,6 @@ class DBManager: queries = filter(lambda k: k.startswith(q_prefix), db.config) return { q[len(q_prefix):] : db.config[q] for q in queries } - def async_(self, seq, fun): - """ - return a pair (pipe, process) so that the process writes - `fun(a)` to the pipe for each element `a` in the iterable returned - by the callable `seq`. - - :param fun: an unary translation function - :type fun: callable - :rtype: (:class:`multiprocessing.Pipe`, - :class:`multiprocessing.Process`) - """ - # create two unix pipes to redirect the workers stdout and - # stderr - stdout = os.pipe() - stderr = os.pipe() - - # create a multiprocessing pipe for the results - pipe = multiprocessing.Pipe(False) - receiver, sender = pipe - - process = FillPipeProcess(seq, stdout[1], stderr[1], pipe, fun) - process.start() - self.processes.append(process) - logging.debug('Worker process %s spawned', process.pid) - - def threaded_wait(): - # wait(2) for the process to die - process.join() - - if process.exitcode < 0: - msg = 'received signal {0}'.format(-process.exitcode) - elif process.exitcode > 0: - msg = 'returned error code {0}'.format(process.exitcode) - else: - msg = 'exited successfully' - - logging.debug('Worker process %s %s', process.pid, msg) - self.processes.remove(process) - - # XXX: it would be much nicer to run this as a coroutine than a thread, - # except that this code is executed before the eventloop is started. - # - # spawn a thread to collect the worker process once it dies - # preventing it from hanging around as zombie - threading.Thread(target=threaded_wait).start() - - # TODO: avoid this if logging level > debug - def threaded_reader(prefix, fd): - with os.fdopen(fd) as handle: - for line in handle: - logging.debug('Worker process %s said on %s: %s', - process.pid, prefix, line.rstrip()) - - # spawn two threads that read from the stdout and stderr pipes - # and write anything that appears there to the log - threading.Thread(target=threaded_reader, - args=('stdout', stdout[0])).start() - os.close(stdout[1]) - threading.Thread(target=threaded_reader, - args=('stderr', stderr[0])).start() - os.close(stderr[1]) - - # closing the sending end in this (receiving) process guarantees - # that here the appropriate EOFError is raised upon .recv in the walker - sender.close() - return receiver, process - def get_threads(self, querystring, sort='newest_first', exclude_tags = frozenset()): """ asynchronously look up thread ids matching `querystring`. @@ -366,10 +244,7 @@ class DBManager: :param exclude_tags: Tags to exclude by default unless included in the search :type exclude_tags: set of str - :returns: a pipe together with the process that asynchronously - writes to it. - :rtype: (:class:`multiprocessing.Pipe`, - :class:`multiprocessing.Process`) + :returns: iterator over thread ids """ # TODO: use a symbolic constant for this assert sort in self._sort_orders @@ -378,8 +253,7 @@ class DBManager: sort = self._sort_orders[sort] exclude_tags = self._exclude_tags | exclude_tags - return self.async_(db.threads(querystring, sort = sort, exclude_tags = exclude_tags), - lambda t: t.threadid) + return (t.threadid for t in db.threads(querystring, sort = sort, exclude_tags = exclude_tags)) def add_message(self, path, tags): """ diff --git a/tests/db/test_manager.py b/tests/db/test_manager.py index 0f3a64ac..dbef6c6c 100644 --- a/tests/db/test_manager.py +++ b/tests/db/test_manager.py @@ -71,7 +71,6 @@ class TestDBManager(utilities.TestCaseClassCleanup): cls.manager = manager.DBManager(cls.dbpath) # clean up temporary database - cls.addClassCleanup(cls.manager.kill_search_processes) cls.addClassCleanup(shutil.rmtree, cls.dbpath) # let global settings manager read our temporary notmuch config |