diff options
-rw-r--r-- | alot/buffers/search.py | 27 | ||||
-rw-r--r-- | alot/db/manager.py | 129 | ||||
-rw-r--r-- | alot/walker.py | 30 | ||||
-rw-r--r-- | tests/db/test_manager.py | 1 |
4 files changed, 158 insertions, 29 deletions
diff --git a/alot/buffers/search.py b/alot/buffers/search.py index d64e857f..203ef483 100644 --- a/alot/buffers/search.py +++ b/alot/buffers/search.py @@ -6,7 +6,7 @@ from notmuch import NotmuchError from .buffer import Buffer from ..settings.const import settings -from ..walker import IterableWalker +from ..walker import PipeWalker from ..widgets.search import ThreadlineWidget @@ -26,6 +26,7 @@ class SearchBuffer(Buffer): self.sort_order = sort_order or default_order self.result_count = 0 self.isinitialized = False + self.proc = None # process that fills our pipe self.rebuild() Buffer.__init__(self, ui, self.body) @@ -41,9 +42,22 @@ class SearchBuffer(Buffer): info['result_count_positive'] = 's' if self.result_count > 1 else '' return info + def cleanup(self): + self.kill_filler_process() + + def kill_filler_process(self): + """ + terminates the process that fills this buffers + :class:`~alot.walker.PipeWalker`. + """ + if self.proc: + if self.proc.is_alive(): + self.proc.terminate() + def rebuild(self, reverse=False): self.isinitialized = True self.reversed = reverse + self.kill_filler_process() if reverse: order = self._REVERSE[self.sort_order] @@ -56,8 +70,9 @@ class SearchBuffer(Buffer): try: self.result_count = self.dbman.count_messages(self.querystring) - threads = self.dbman.get_threads( - self.querystring, order, exclude_tags) + self.pipe, self.proc = self.dbman.get_threads(self.querystring, + order, + exclude_tags) except NotmuchError: self.ui.notify('malformed query string: %s' % self.querystring, 'error') @@ -65,9 +80,9 @@ class SearchBuffer(Buffer): self.body = self.listbox return - self.threadlist = IterableWalker(threads, ThreadlineWidget, - dbman=self.dbman, - reverse=reverse) + self.threadlist = PipeWalker(self.pipe, ThreadlineWidget, + dbman=self.dbman, + reverse=reverse) self.listbox = urwid.ListBox(self.threadlist) self.body = self.listbox diff --git a/alot/db/manager.py b/alot/db/manager.py index d95ea332..956ddcbc 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -1,9 +1,14 @@ # Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> -# Copyright © Dylan Baker # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file from collections import deque +import errno import logging +import multiprocessing +import os +import signal +import sys +import threading from notmuch import Database, NotmuchError, XapianError import notmuch @@ -18,6 +23,47 @@ from .utils import is_subdir_of from ..settings.const import settings +class FillPipeProcess(multiprocessing.Process): + + def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)): + multiprocessing.Process.__init__(self) + self.it = it + self.pipe = pipe[1] + self.fun = fun + self.keep_going = True + self.stdout = stdout + self.stderr = stderr + + def handle_sigterm(self, signo, frame): + # this is used to suppress any EINTR errors at interpreter + # shutdown + self.keep_going = False + + # raises SystemExit to shut down the interpreter from the + # signal handler + sys.exit() + + def run(self): + # replace filedescriptors 1 and 2 (stdout and stderr) with + # pipes to the parent process + os.dup2(self.stdout, 1) + os.dup2(self.stderr, 2) + + # register a signal handler for SIGTERM + signal.signal(signal.SIGTERM, self.handle_sigterm) + + for a in self.it: + try: + self.pipe.send(self.fun(a)) + except IOError as e: + # suppress spurious EINTR errors at interpreter + # shutdown + if e.errno != errno.EINTR or self.keep_going: + raise + + self.pipe.close() + + class DBManager: """ Keeps track of your index parameters, maintains a write-queue and @@ -152,6 +198,15 @@ class DBManager: raise e logging.debug('flush finished') + def kill_search_processes(self): + """ + terminate all search processes that originate from + this managers :meth:`get_threads`. + """ + for p in self.processes: + p.terminate() + self.processes = [] + def tag(self, querystring, tags, afterwards=None, remove_rest=False): """ add tags to messages matching `querystring`. @@ -254,6 +309,75 @@ class DBManager: db = Database(path=self.path) return {k[6:]: v for k, v in db.get_configs('query.')} + def async_(self, cbl, fun): + """ + return a pair (pipe, process) so that the process writes + `fun(a)` to the pipe for each element `a` in the iterable returned + by the callable `cbl`. + + :param cbl: a function returning something iterable + :type cbl: callable + :param fun: an unary translation function + :type fun: callable + :rtype: (:class:`multiprocessing.Pipe`, + :class:`multiprocessing.Process`) + """ + # create two unix pipes to redirect the workers stdout and + # stderr + stdout = os.pipe() + stderr = os.pipe() + + # create a multiprocessing pipe for the results + pipe = multiprocessing.Pipe(False) + receiver, sender = pipe + + process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun) + process.start() + self.processes.append(process) + logging.debug('Worker process %s spawned', process.pid) + + def threaded_wait(): + # wait(2) for the process to die + process.join() + + if process.exitcode < 0: + msg = 'received signal {0}'.format(-process.exitcode) + elif process.exitcode > 0: + msg = 'returned error code {0}'.format(process.exitcode) + else: + msg = 'exited successfully' + + logging.debug('Worker process %s %s', process.pid, msg) + self.processes.remove(process) + + # XXX: it would be much nicer to run this as a coroutine than a thread, + # except that this code is executed before the eventloop is started. + # + # spawn a thread to collect the worker process once it dies + # preventing it from hanging around as zombie + threading.Thread(target=threaded_wait).start() + + # TODO: avoid this if logging level > debug + def threaded_reader(prefix, fd): + with os.fdopen(fd) as handle: + for line in handle: + logging.debug('Worker process %s said on %s: %s', + process.pid, prefix, line.rstrip()) + + # spawn two threads that read from the stdout and stderr pipes + # and write anything that appears there to the log + threading.Thread(target=threaded_reader, + args=('stdout', stdout[0])).start() + os.close(stdout[1]) + threading.Thread(target=threaded_reader, + args=('stderr', stderr[0])).start() + os.close(stderr[1]) + + # closing the sending end in this (receiving) process guarantees + # that here the appropriate EOFError is raised upon .recv in the walker + sender.close() + return receiver, process + def get_threads(self, querystring, sort='newest_first', exclude_tags=None): """ asynchronously look up thread ids matching `querystring`. @@ -277,8 +401,7 @@ class DBManager: if exclude_tags: for tag in exclude_tags: q.exclude_tag(tag) - for t in q.search_threads(): - yield t.get_thread_id() + return self.async_(q.search_threads, (lambda a: a.get_thread_id())) def query(self, querystring): """ diff --git a/alot/walker.py b/alot/walker.py index cfefc9e6..e4264103 100644 --- a/alot/walker.py +++ b/alot/walker.py @@ -1,30 +1,20 @@ # Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> -# Copyright © 2018 Dylan Baker # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file import logging import urwid -class IterableWalker(urwid.ListWalker): +class PipeWalker(urwid.ListWalker): + """urwid.ListWalker that reads next items from a pipe and wraps them in + `containerclass` widgets for displaying - """An urwid walker for iterables. - - Works like ListWalker, except it takes an iterable object instead of a - concrete type. This allows for lazy operations of very large sequences of - data, such as a sequences of threads with certain notmuch tags. - - :param iterable: An iterator of objects to walk over - :type iterable: Iterable[T] - :param containerclass: An urwid widget to wrap each object in - :type containerclass: urwid.Widget - :param reverse: Reverse the order of the iterable - :type reverse: bool - :param **kwargs: Forwarded to container class. + Attributes that should be considered publicly readable: + :attr lines: the lines obtained from the pipe + :type lines: list(`containerclass`) """ - - def __init__(self, iterable, containerclass, reverse=False, **kwargs): - self.iterable = iterable + def __init__(self, pipe, containerclass, reverse=False, **kwargs): + self.pipe = pipe self.kwargs = kwargs self.containerclass = containerclass self.lines = [] @@ -81,10 +71,10 @@ class IterableWalker(urwid.ListWalker): try: # the next line blocks until it can read from the pipe or # EOFError is raised. No races here. - next_obj = next(self.iterable) + next_obj = self.pipe.recv() next_widget = self.containerclass(next_obj, **self.kwargs) self.lines.append(next_widget) - except StopIteration: + except EOFError: logging.debug('EMPTY PIPE') next_widget = None self.empty = True diff --git a/tests/db/test_manager.py b/tests/db/test_manager.py index f7a47a58..e675aed4 100644 --- a/tests/db/test_manager.py +++ b/tests/db/test_manager.py @@ -37,6 +37,7 @@ class TestDBManager(utilities.TestCaseClassCleanup): cls.manager = DBManager(cls.dbpath) # clean up temporary database + cls.addClassCleanup(cls.manager.kill_search_processes) cls.addClassCleanup(shutil.rmtree, cls.dbpath) # let global settings manager read our temporary notmuch config |