summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2019-12-21 09:29:49 +0100
committerAnton Khirnov <anton@khirnov.net>2020-01-02 17:18:56 +0100
commit09d7db6dc591fbe44058b0e01acae331c52f0b28 (patch)
treec1ec33174778ae0e593453d510a030fe2b469f5d
parentf1ceccaa58bb36cac73e6886a0b14230e5518fda (diff)
Revert "db/manager: Drop async method"
This reverts commit e7e0c52db9093a9ecd9dcaa0766e66515a546a75.
-rw-r--r--alot/buffers/search.py27
-rw-r--r--alot/db/manager.py129
-rw-r--r--alot/walker.py30
-rw-r--r--tests/db/test_manager.py1
4 files changed, 158 insertions, 29 deletions
diff --git a/alot/buffers/search.py b/alot/buffers/search.py
index d64e857f..203ef483 100644
--- a/alot/buffers/search.py
+++ b/alot/buffers/search.py
@@ -6,7 +6,7 @@ from notmuch import NotmuchError
from .buffer import Buffer
from ..settings.const import settings
-from ..walker import IterableWalker
+from ..walker import PipeWalker
from ..widgets.search import ThreadlineWidget
@@ -26,6 +26,7 @@ class SearchBuffer(Buffer):
self.sort_order = sort_order or default_order
self.result_count = 0
self.isinitialized = False
+ self.proc = None # process that fills our pipe
self.rebuild()
Buffer.__init__(self, ui, self.body)
@@ -41,9 +42,22 @@ class SearchBuffer(Buffer):
info['result_count_positive'] = 's' if self.result_count > 1 else ''
return info
+ def cleanup(self):
+ self.kill_filler_process()
+
+ def kill_filler_process(self):
+ """
+ terminates the process that fills this buffers
+ :class:`~alot.walker.PipeWalker`.
+ """
+ if self.proc:
+ if self.proc.is_alive():
+ self.proc.terminate()
+
def rebuild(self, reverse=False):
self.isinitialized = True
self.reversed = reverse
+ self.kill_filler_process()
if reverse:
order = self._REVERSE[self.sort_order]
@@ -56,8 +70,9 @@ class SearchBuffer(Buffer):
try:
self.result_count = self.dbman.count_messages(self.querystring)
- threads = self.dbman.get_threads(
- self.querystring, order, exclude_tags)
+ self.pipe, self.proc = self.dbman.get_threads(self.querystring,
+ order,
+ exclude_tags)
except NotmuchError:
self.ui.notify('malformed query string: %s' % self.querystring,
'error')
@@ -65,9 +80,9 @@ class SearchBuffer(Buffer):
self.body = self.listbox
return
- self.threadlist = IterableWalker(threads, ThreadlineWidget,
- dbman=self.dbman,
- reverse=reverse)
+ self.threadlist = PipeWalker(self.pipe, ThreadlineWidget,
+ dbman=self.dbman,
+ reverse=reverse)
self.listbox = urwid.ListBox(self.threadlist)
self.body = self.listbox
diff --git a/alot/db/manager.py b/alot/db/manager.py
index d95ea332..956ddcbc 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -1,9 +1,14 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
-# Copyright © Dylan Baker
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
from collections import deque
+import errno
import logging
+import multiprocessing
+import os
+import signal
+import sys
+import threading
from notmuch import Database, NotmuchError, XapianError
import notmuch
@@ -18,6 +23,47 @@ from .utils import is_subdir_of
from ..settings.const import settings
+class FillPipeProcess(multiprocessing.Process):
+
+ def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)):
+ multiprocessing.Process.__init__(self)
+ self.it = it
+ self.pipe = pipe[1]
+ self.fun = fun
+ self.keep_going = True
+ self.stdout = stdout
+ self.stderr = stderr
+
+ def handle_sigterm(self, signo, frame):
+ # this is used to suppress any EINTR errors at interpreter
+ # shutdown
+ self.keep_going = False
+
+ # raises SystemExit to shut down the interpreter from the
+ # signal handler
+ sys.exit()
+
+ def run(self):
+ # replace filedescriptors 1 and 2 (stdout and stderr) with
+ # pipes to the parent process
+ os.dup2(self.stdout, 1)
+ os.dup2(self.stderr, 2)
+
+ # register a signal handler for SIGTERM
+ signal.signal(signal.SIGTERM, self.handle_sigterm)
+
+ for a in self.it:
+ try:
+ self.pipe.send(self.fun(a))
+ except IOError as e:
+ # suppress spurious EINTR errors at interpreter
+ # shutdown
+ if e.errno != errno.EINTR or self.keep_going:
+ raise
+
+ self.pipe.close()
+
+
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -152,6 +198,15 @@ class DBManager:
raise e
logging.debug('flush finished')
+ def kill_search_processes(self):
+ """
+ terminate all search processes that originate from
+ this managers :meth:`get_threads`.
+ """
+ for p in self.processes:
+ p.terminate()
+ self.processes = []
+
def tag(self, querystring, tags, afterwards=None, remove_rest=False):
"""
add tags to messages matching `querystring`.
@@ -254,6 +309,75 @@ class DBManager:
db = Database(path=self.path)
return {k[6:]: v for k, v in db.get_configs('query.')}
+ def async_(self, cbl, fun):
+ """
+ return a pair (pipe, process) so that the process writes
+ `fun(a)` to the pipe for each element `a` in the iterable returned
+ by the callable `cbl`.
+
+ :param cbl: a function returning something iterable
+ :type cbl: callable
+ :param fun: an unary translation function
+ :type fun: callable
+ :rtype: (:class:`multiprocessing.Pipe`,
+ :class:`multiprocessing.Process`)
+ """
+ # create two unix pipes to redirect the workers stdout and
+ # stderr
+ stdout = os.pipe()
+ stderr = os.pipe()
+
+ # create a multiprocessing pipe for the results
+ pipe = multiprocessing.Pipe(False)
+ receiver, sender = pipe
+
+ process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun)
+ process.start()
+ self.processes.append(process)
+ logging.debug('Worker process %s spawned', process.pid)
+
+ def threaded_wait():
+ # wait(2) for the process to die
+ process.join()
+
+ if process.exitcode < 0:
+ msg = 'received signal {0}'.format(-process.exitcode)
+ elif process.exitcode > 0:
+ msg = 'returned error code {0}'.format(process.exitcode)
+ else:
+ msg = 'exited successfully'
+
+ logging.debug('Worker process %s %s', process.pid, msg)
+ self.processes.remove(process)
+
+ # XXX: it would be much nicer to run this as a coroutine than a thread,
+ # except that this code is executed before the eventloop is started.
+ #
+ # spawn a thread to collect the worker process once it dies
+ # preventing it from hanging around as zombie
+ threading.Thread(target=threaded_wait).start()
+
+ # TODO: avoid this if logging level > debug
+ def threaded_reader(prefix, fd):
+ with os.fdopen(fd) as handle:
+ for line in handle:
+ logging.debug('Worker process %s said on %s: %s',
+ process.pid, prefix, line.rstrip())
+
+ # spawn two threads that read from the stdout and stderr pipes
+ # and write anything that appears there to the log
+ threading.Thread(target=threaded_reader,
+ args=('stdout', stdout[0])).start()
+ os.close(stdout[1])
+ threading.Thread(target=threaded_reader,
+ args=('stderr', stderr[0])).start()
+ os.close(stderr[1])
+
+ # closing the sending end in this (receiving) process guarantees
+ # that here the appropriate EOFError is raised upon .recv in the walker
+ sender.close()
+ return receiver, process
+
def get_threads(self, querystring, sort='newest_first', exclude_tags=None):
"""
asynchronously look up thread ids matching `querystring`.
@@ -277,8 +401,7 @@ class DBManager:
if exclude_tags:
for tag in exclude_tags:
q.exclude_tag(tag)
- for t in q.search_threads():
- yield t.get_thread_id()
+ return self.async_(q.search_threads, (lambda a: a.get_thread_id()))
def query(self, querystring):
"""
diff --git a/alot/walker.py b/alot/walker.py
index cfefc9e6..e4264103 100644
--- a/alot/walker.py
+++ b/alot/walker.py
@@ -1,30 +1,20 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
-# Copyright © 2018 Dylan Baker
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
import logging
import urwid
-class IterableWalker(urwid.ListWalker):
+class PipeWalker(urwid.ListWalker):
+ """urwid.ListWalker that reads next items from a pipe and wraps them in
+ `containerclass` widgets for displaying
- """An urwid walker for iterables.
-
- Works like ListWalker, except it takes an iterable object instead of a
- concrete type. This allows for lazy operations of very large sequences of
- data, such as a sequences of threads with certain notmuch tags.
-
- :param iterable: An iterator of objects to walk over
- :type iterable: Iterable[T]
- :param containerclass: An urwid widget to wrap each object in
- :type containerclass: urwid.Widget
- :param reverse: Reverse the order of the iterable
- :type reverse: bool
- :param **kwargs: Forwarded to container class.
+ Attributes that should be considered publicly readable:
+ :attr lines: the lines obtained from the pipe
+ :type lines: list(`containerclass`)
"""
-
- def __init__(self, iterable, containerclass, reverse=False, **kwargs):
- self.iterable = iterable
+ def __init__(self, pipe, containerclass, reverse=False, **kwargs):
+ self.pipe = pipe
self.kwargs = kwargs
self.containerclass = containerclass
self.lines = []
@@ -81,10 +71,10 @@ class IterableWalker(urwid.ListWalker):
try:
# the next line blocks until it can read from the pipe or
# EOFError is raised. No races here.
- next_obj = next(self.iterable)
+ next_obj = self.pipe.recv()
next_widget = self.containerclass(next_obj, **self.kwargs)
self.lines.append(next_widget)
- except StopIteration:
+ except EOFError:
logging.debug('EMPTY PIPE')
next_widget = None
self.empty = True
diff --git a/tests/db/test_manager.py b/tests/db/test_manager.py
index f7a47a58..e675aed4 100644
--- a/tests/db/test_manager.py
+++ b/tests/db/test_manager.py
@@ -37,6 +37,7 @@ class TestDBManager(utilities.TestCaseClassCleanup):
cls.manager = DBManager(cls.dbpath)
# clean up temporary database
+ cls.addClassCleanup(cls.manager.kill_search_processes)
cls.addClassCleanup(shutil.rmtree, cls.dbpath)
# let global settings manager read our temporary notmuch config