summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDylan Baker <dylan@pnwbakers.com>2018-07-31 09:08:48 -0700
committerPatrick Totzke <patricktotzke@gmail.com>2019-08-15 20:44:32 +0100
commite7e0c52db9093a9ecd9dcaa0766e66515a546a75 (patch)
treed669b366f43cc3cdbc7c57020deb8230ac69158a
parentcd44abdc4df887e1e9ed70e510560fc26e62f97e (diff)
db/manager: Drop async method
As far as I can tell using a separate process doesn't actually improve performance, it makes it worse. The work that we're passing off to the separate function isn't necessarily work that's well suited to being handed off, there isn't a lot of computation and the objects that need to be passed across the pipe are fairly large (at least when considering a pipe). Converting the function to a generator gives better performance and simplifies the implementation.
-rw-r--r--alot/buffers/search.py27
-rw-r--r--alot/db/manager.py130
-rw-r--r--alot/walker.py30
-rw-r--r--tests/db/test_manager.py1
4 files changed, 30 insertions, 158 deletions
diff --git a/alot/buffers/search.py b/alot/buffers/search.py
index 203ef483..d64e857f 100644
--- a/alot/buffers/search.py
+++ b/alot/buffers/search.py
@@ -6,7 +6,7 @@ from notmuch import NotmuchError
from .buffer import Buffer
from ..settings.const import settings
-from ..walker import PipeWalker
+from ..walker import IterableWalker
from ..widgets.search import ThreadlineWidget
@@ -26,7 +26,6 @@ class SearchBuffer(Buffer):
self.sort_order = sort_order or default_order
self.result_count = 0
self.isinitialized = False
- self.proc = None # process that fills our pipe
self.rebuild()
Buffer.__init__(self, ui, self.body)
@@ -42,22 +41,9 @@ class SearchBuffer(Buffer):
info['result_count_positive'] = 's' if self.result_count > 1 else ''
return info
- def cleanup(self):
- self.kill_filler_process()
-
- def kill_filler_process(self):
- """
- terminates the process that fills this buffers
- :class:`~alot.walker.PipeWalker`.
- """
- if self.proc:
- if self.proc.is_alive():
- self.proc.terminate()
-
def rebuild(self, reverse=False):
self.isinitialized = True
self.reversed = reverse
- self.kill_filler_process()
if reverse:
order = self._REVERSE[self.sort_order]
@@ -70,9 +56,8 @@ class SearchBuffer(Buffer):
try:
self.result_count = self.dbman.count_messages(self.querystring)
- self.pipe, self.proc = self.dbman.get_threads(self.querystring,
- order,
- exclude_tags)
+ threads = self.dbman.get_threads(
+ self.querystring, order, exclude_tags)
except NotmuchError:
self.ui.notify('malformed query string: %s' % self.querystring,
'error')
@@ -80,9 +65,9 @@ class SearchBuffer(Buffer):
self.body = self.listbox
return
- self.threadlist = PipeWalker(self.pipe, ThreadlineWidget,
- dbman=self.dbman,
- reverse=reverse)
+ self.threadlist = IterableWalker(threads, ThreadlineWidget,
+ dbman=self.dbman,
+ reverse=reverse)
self.listbox = urwid.ListBox(self.threadlist)
self.body = self.listbox
diff --git a/alot/db/manager.py b/alot/db/manager.py
index 956ddcbc..af42a1f5 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -1,14 +1,10 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
+# Copyright © Dylan Baker
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
from collections import deque
-import errno
+import asyncio
import logging
-import multiprocessing
-import os
-import signal
-import sys
-import threading
from notmuch import Database, NotmuchError, XapianError
import notmuch
@@ -23,47 +19,6 @@ from .utils import is_subdir_of
from ..settings.const import settings
-class FillPipeProcess(multiprocessing.Process):
-
- def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)):
- multiprocessing.Process.__init__(self)
- self.it = it
- self.pipe = pipe[1]
- self.fun = fun
- self.keep_going = True
- self.stdout = stdout
- self.stderr = stderr
-
- def handle_sigterm(self, signo, frame):
- # this is used to suppress any EINTR errors at interpreter
- # shutdown
- self.keep_going = False
-
- # raises SystemExit to shut down the interpreter from the
- # signal handler
- sys.exit()
-
- def run(self):
- # replace filedescriptors 1 and 2 (stdout and stderr) with
- # pipes to the parent process
- os.dup2(self.stdout, 1)
- os.dup2(self.stderr, 2)
-
- # register a signal handler for SIGTERM
- signal.signal(signal.SIGTERM, self.handle_sigterm)
-
- for a in self.it:
- try:
- self.pipe.send(self.fun(a))
- except IOError as e:
- # suppress spurious EINTR errors at interpreter
- # shutdown
- if e.errno != errno.EINTR or self.keep_going:
- raise
-
- self.pipe.close()
-
-
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -198,15 +153,6 @@ class DBManager:
raise e
logging.debug('flush finished')
- def kill_search_processes(self):
- """
- terminate all search processes that originate from
- this managers :meth:`get_threads`.
- """
- for p in self.processes:
- p.terminate()
- self.processes = []
-
def tag(self, querystring, tags, afterwards=None, remove_rest=False):
"""
add tags to messages matching `querystring`.
@@ -309,75 +255,6 @@ class DBManager:
db = Database(path=self.path)
return {k[6:]: v for k, v in db.get_configs('query.')}
- def async_(self, cbl, fun):
- """
- return a pair (pipe, process) so that the process writes
- `fun(a)` to the pipe for each element `a` in the iterable returned
- by the callable `cbl`.
-
- :param cbl: a function returning something iterable
- :type cbl: callable
- :param fun: an unary translation function
- :type fun: callable
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
- """
- # create two unix pipes to redirect the workers stdout and
- # stderr
- stdout = os.pipe()
- stderr = os.pipe()
-
- # create a multiprocessing pipe for the results
- pipe = multiprocessing.Pipe(False)
- receiver, sender = pipe
-
- process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun)
- process.start()
- self.processes.append(process)
- logging.debug('Worker process %s spawned', process.pid)
-
- def threaded_wait():
- # wait(2) for the process to die
- process.join()
-
- if process.exitcode < 0:
- msg = 'received signal {0}'.format(-process.exitcode)
- elif process.exitcode > 0:
- msg = 'returned error code {0}'.format(process.exitcode)
- else:
- msg = 'exited successfully'
-
- logging.debug('Worker process %s %s', process.pid, msg)
- self.processes.remove(process)
-
- # XXX: it would be much nicer to run this as a coroutine than a thread,
- # except that this code is executed before the eventloop is started.
- #
- # spawn a thread to collect the worker process once it dies
- # preventing it from hanging around as zombie
- threading.Thread(target=threaded_wait).start()
-
- # TODO: avoid this if logging level > debug
- def threaded_reader(prefix, fd):
- with os.fdopen(fd) as handle:
- for line in handle:
- logging.debug('Worker process %s said on %s: %s',
- process.pid, prefix, line.rstrip())
-
- # spawn two threads that read from the stdout and stderr pipes
- # and write anything that appears there to the log
- threading.Thread(target=threaded_reader,
- args=('stdout', stdout[0])).start()
- os.close(stdout[1])
- threading.Thread(target=threaded_reader,
- args=('stderr', stderr[0])).start()
- os.close(stderr[1])
-
- # closing the sending end in this (receiving) process guarantees
- # that here the appropriate EOFError is raised upon .recv in the walker
- sender.close()
- return receiver, process
-
def get_threads(self, querystring, sort='newest_first', exclude_tags=None):
"""
asynchronously look up thread ids matching `querystring`.
@@ -401,7 +278,8 @@ class DBManager:
if exclude_tags:
for tag in exclude_tags:
q.exclude_tag(tag)
- return self.async_(q.search_threads, (lambda a: a.get_thread_id()))
+ for t in q.search_threads():
+ yield t.get_thread_id()
def query(self, querystring):
"""
diff --git a/alot/walker.py b/alot/walker.py
index e4264103..cfefc9e6 100644
--- a/alot/walker.py
+++ b/alot/walker.py
@@ -1,20 +1,30 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
+# Copyright © 2018 Dylan Baker
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
import logging
import urwid
-class PipeWalker(urwid.ListWalker):
- """urwid.ListWalker that reads next items from a pipe and wraps them in
- `containerclass` widgets for displaying
+class IterableWalker(urwid.ListWalker):
- Attributes that should be considered publicly readable:
- :attr lines: the lines obtained from the pipe
- :type lines: list(`containerclass`)
+ """An urwid walker for iterables.
+
+ Works like ListWalker, except it takes an iterable object instead of a
+ concrete type. This allows for lazy operations of very large sequences of
+ data, such as a sequences of threads with certain notmuch tags.
+
+ :param iterable: An iterator of objects to walk over
+ :type iterable: Iterable[T]
+ :param containerclass: An urwid widget to wrap each object in
+ :type containerclass: urwid.Widget
+ :param reverse: Reverse the order of the iterable
+ :type reverse: bool
+ :param **kwargs: Forwarded to container class.
"""
- def __init__(self, pipe, containerclass, reverse=False, **kwargs):
- self.pipe = pipe
+
+ def __init__(self, iterable, containerclass, reverse=False, **kwargs):
+ self.iterable = iterable
self.kwargs = kwargs
self.containerclass = containerclass
self.lines = []
@@ -71,10 +81,10 @@ class PipeWalker(urwid.ListWalker):
try:
# the next line blocks until it can read from the pipe or
# EOFError is raised. No races here.
- next_obj = self.pipe.recv()
+ next_obj = next(self.iterable)
next_widget = self.containerclass(next_obj, **self.kwargs)
self.lines.append(next_widget)
- except EOFError:
+ except StopIteration:
logging.debug('EMPTY PIPE')
next_widget = None
self.empty = True
diff --git a/tests/db/test_manager.py b/tests/db/test_manager.py
index e675aed4..f7a47a58 100644
--- a/tests/db/test_manager.py
+++ b/tests/db/test_manager.py
@@ -37,7 +37,6 @@ class TestDBManager(utilities.TestCaseClassCleanup):
cls.manager = DBManager(cls.dbpath)
# clean up temporary database
- cls.addClassCleanup(cls.manager.kill_search_processes)
cls.addClassCleanup(shutil.rmtree, cls.dbpath)
# let global settings manager read our temporary notmuch config