diff options
-rw-r--r-- | alot/buffers/search.py | 27 | ||||
-rw-r--r-- | alot/db/manager.py | 130 | ||||
-rw-r--r-- | alot/walker.py | 30 | ||||
-rw-r--r-- | tests/db/test_manager.py | 1 |
4 files changed, 30 insertions, 158 deletions
diff --git a/alot/buffers/search.py b/alot/buffers/search.py index 203ef483..d64e857f 100644 --- a/alot/buffers/search.py +++ b/alot/buffers/search.py @@ -6,7 +6,7 @@ from notmuch import NotmuchError from .buffer import Buffer from ..settings.const import settings -from ..walker import PipeWalker +from ..walker import IterableWalker from ..widgets.search import ThreadlineWidget @@ -26,7 +26,6 @@ class SearchBuffer(Buffer): self.sort_order = sort_order or default_order self.result_count = 0 self.isinitialized = False - self.proc = None # process that fills our pipe self.rebuild() Buffer.__init__(self, ui, self.body) @@ -42,22 +41,9 @@ class SearchBuffer(Buffer): info['result_count_positive'] = 's' if self.result_count > 1 else '' return info - def cleanup(self): - self.kill_filler_process() - - def kill_filler_process(self): - """ - terminates the process that fills this buffers - :class:`~alot.walker.PipeWalker`. - """ - if self.proc: - if self.proc.is_alive(): - self.proc.terminate() - def rebuild(self, reverse=False): self.isinitialized = True self.reversed = reverse - self.kill_filler_process() if reverse: order = self._REVERSE[self.sort_order] @@ -70,9 +56,8 @@ class SearchBuffer(Buffer): try: self.result_count = self.dbman.count_messages(self.querystring) - self.pipe, self.proc = self.dbman.get_threads(self.querystring, - order, - exclude_tags) + threads = self.dbman.get_threads( + self.querystring, order, exclude_tags) except NotmuchError: self.ui.notify('malformed query string: %s' % self.querystring, 'error') @@ -80,9 +65,9 @@ class SearchBuffer(Buffer): self.body = self.listbox return - self.threadlist = PipeWalker(self.pipe, ThreadlineWidget, - dbman=self.dbman, - reverse=reverse) + self.threadlist = IterableWalker(threads, ThreadlineWidget, + dbman=self.dbman, + reverse=reverse) self.listbox = urwid.ListBox(self.threadlist) self.body = self.listbox diff --git a/alot/db/manager.py b/alot/db/manager.py index 956ddcbc..af42a1f5 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -1,14 +1,10 @@ # Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> +# Copyright © Dylan Baker # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file from collections import deque -import errno +import asyncio import logging -import multiprocessing -import os -import signal -import sys -import threading from notmuch import Database, NotmuchError, XapianError import notmuch @@ -23,47 +19,6 @@ from .utils import is_subdir_of from ..settings.const import settings -class FillPipeProcess(multiprocessing.Process): - - def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)): - multiprocessing.Process.__init__(self) - self.it = it - self.pipe = pipe[1] - self.fun = fun - self.keep_going = True - self.stdout = stdout - self.stderr = stderr - - def handle_sigterm(self, signo, frame): - # this is used to suppress any EINTR errors at interpreter - # shutdown - self.keep_going = False - - # raises SystemExit to shut down the interpreter from the - # signal handler - sys.exit() - - def run(self): - # replace filedescriptors 1 and 2 (stdout and stderr) with - # pipes to the parent process - os.dup2(self.stdout, 1) - os.dup2(self.stderr, 2) - - # register a signal handler for SIGTERM - signal.signal(signal.SIGTERM, self.handle_sigterm) - - for a in self.it: - try: - self.pipe.send(self.fun(a)) - except IOError as e: - # suppress spurious EINTR errors at interpreter - # shutdown - if e.errno != errno.EINTR or self.keep_going: - raise - - self.pipe.close() - - class DBManager: """ Keeps track of your index parameters, maintains a write-queue and @@ -198,15 +153,6 @@ class DBManager: raise e logging.debug('flush finished') - def kill_search_processes(self): - """ - terminate all search processes that originate from - this managers :meth:`get_threads`. - """ - for p in self.processes: - p.terminate() - self.processes = [] - def tag(self, querystring, tags, afterwards=None, remove_rest=False): """ add tags to messages matching `querystring`. @@ -309,75 +255,6 @@ class DBManager: db = Database(path=self.path) return {k[6:]: v for k, v in db.get_configs('query.')} - def async_(self, cbl, fun): - """ - return a pair (pipe, process) so that the process writes - `fun(a)` to the pipe for each element `a` in the iterable returned - by the callable `cbl`. - - :param cbl: a function returning something iterable - :type cbl: callable - :param fun: an unary translation function - :type fun: callable - :rtype: (:class:`multiprocessing.Pipe`, - :class:`multiprocessing.Process`) - """ - # create two unix pipes to redirect the workers stdout and - # stderr - stdout = os.pipe() - stderr = os.pipe() - - # create a multiprocessing pipe for the results - pipe = multiprocessing.Pipe(False) - receiver, sender = pipe - - process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun) - process.start() - self.processes.append(process) - logging.debug('Worker process %s spawned', process.pid) - - def threaded_wait(): - # wait(2) for the process to die - process.join() - - if process.exitcode < 0: - msg = 'received signal {0}'.format(-process.exitcode) - elif process.exitcode > 0: - msg = 'returned error code {0}'.format(process.exitcode) - else: - msg = 'exited successfully' - - logging.debug('Worker process %s %s', process.pid, msg) - self.processes.remove(process) - - # XXX: it would be much nicer to run this as a coroutine than a thread, - # except that this code is executed before the eventloop is started. - # - # spawn a thread to collect the worker process once it dies - # preventing it from hanging around as zombie - threading.Thread(target=threaded_wait).start() - - # TODO: avoid this if logging level > debug - def threaded_reader(prefix, fd): - with os.fdopen(fd) as handle: - for line in handle: - logging.debug('Worker process %s said on %s: %s', - process.pid, prefix, line.rstrip()) - - # spawn two threads that read from the stdout and stderr pipes - # and write anything that appears there to the log - threading.Thread(target=threaded_reader, - args=('stdout', stdout[0])).start() - os.close(stdout[1]) - threading.Thread(target=threaded_reader, - args=('stderr', stderr[0])).start() - os.close(stderr[1]) - - # closing the sending end in this (receiving) process guarantees - # that here the appropriate EOFError is raised upon .recv in the walker - sender.close() - return receiver, process - def get_threads(self, querystring, sort='newest_first', exclude_tags=None): """ asynchronously look up thread ids matching `querystring`. @@ -401,7 +278,8 @@ class DBManager: if exclude_tags: for tag in exclude_tags: q.exclude_tag(tag) - return self.async_(q.search_threads, (lambda a: a.get_thread_id())) + for t in q.search_threads(): + yield t.get_thread_id() def query(self, querystring): """ diff --git a/alot/walker.py b/alot/walker.py index e4264103..cfefc9e6 100644 --- a/alot/walker.py +++ b/alot/walker.py @@ -1,20 +1,30 @@ # Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> +# Copyright © 2018 Dylan Baker # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file import logging import urwid -class PipeWalker(urwid.ListWalker): - """urwid.ListWalker that reads next items from a pipe and wraps them in - `containerclass` widgets for displaying +class IterableWalker(urwid.ListWalker): - Attributes that should be considered publicly readable: - :attr lines: the lines obtained from the pipe - :type lines: list(`containerclass`) + """An urwid walker for iterables. + + Works like ListWalker, except it takes an iterable object instead of a + concrete type. This allows for lazy operations of very large sequences of + data, such as a sequences of threads with certain notmuch tags. + + :param iterable: An iterator of objects to walk over + :type iterable: Iterable[T] + :param containerclass: An urwid widget to wrap each object in + :type containerclass: urwid.Widget + :param reverse: Reverse the order of the iterable + :type reverse: bool + :param **kwargs: Forwarded to container class. """ - def __init__(self, pipe, containerclass, reverse=False, **kwargs): - self.pipe = pipe + + def __init__(self, iterable, containerclass, reverse=False, **kwargs): + self.iterable = iterable self.kwargs = kwargs self.containerclass = containerclass self.lines = [] @@ -71,10 +81,10 @@ class PipeWalker(urwid.ListWalker): try: # the next line blocks until it can read from the pipe or # EOFError is raised. No races here. - next_obj = self.pipe.recv() + next_obj = next(self.iterable) next_widget = self.containerclass(next_obj, **self.kwargs) self.lines.append(next_widget) - except EOFError: + except StopIteration: logging.debug('EMPTY PIPE') next_widget = None self.empty = True diff --git a/tests/db/test_manager.py b/tests/db/test_manager.py index e675aed4..f7a47a58 100644 --- a/tests/db/test_manager.py +++ b/tests/db/test_manager.py @@ -37,7 +37,6 @@ class TestDBManager(utilities.TestCaseClassCleanup): cls.manager = DBManager(cls.dbpath) # clean up temporary database - cls.addClassCleanup(cls.manager.kill_search_processes) cls.addClassCleanup(shutil.rmtree, cls.dbpath) # let global settings manager read our temporary notmuch config |