summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDylan Baker <dylan@pnwbakers.com>2018-07-31 09:08:48 -0700
committerAnton Khirnov <anton@khirnov.net>2021-01-10 11:10:40 +0100
commitc79196b8b1c5eb99a2f240aaf0069853d798bdd0 (patch)
tree5852f43886237c3e245b687cabe8450fc8333fb5
parent47ebd7656057fadb5b627dfc1844bc09dc171ff2 (diff)
db/manager: Drop async method
As far as I can tell using a separate process doesn't actually improve performance, it makes it worse. The work that we're passing off to the separate function isn't necessarily work that's well suited to being handed off, there isn't a lot of computation and the objects that need to be passed across the pipe are fairly large (at least when considering a pipe). Converting the function to a generator gives better performance and simplifies the implementation.
-rw-r--r--alot/buffers/search.py46
-rw-r--r--alot/db/manager.py130
-rw-r--r--tests/db/test_manager.py1
3 files changed, 17 insertions, 160 deletions
diff --git a/alot/buffers/search.py b/alot/buffers/search.py
index c5b6bbcd..8cd76a87 100644
--- a/alot/buffers/search.py
+++ b/alot/buffers/search.py
@@ -8,16 +8,16 @@ from .buffer import Buffer
from ..settings.const import settings
from ..widgets.search import ThreadlineWidget
-class PipeWalker(urwid.ListWalker):
+class IterWalker(urwid.ListWalker):
"""
- urwid.ListWalker that reads next items from a pipe and wraps them in
+ urwid.ListWalker that reads next items from a generator and wraps them in
ThreadlineWidget widgets for displaying
"""
_pipe = None
_dbman = None
- _pipe_eof = False
+ _iter_done = False
# list of the thread IDs
_tids = None
@@ -26,9 +26,9 @@ class PipeWalker(urwid.ListWalker):
_focus = None
- def __init__(self, pipe, dbman):
- self._pipe = pipe
- self._dbman = dbman
+ def __init__(self, threads, dbman):
+ self._threads = threads
+ self._dbman = dbman
self._tids = []
self._wgts = {}
@@ -38,7 +38,7 @@ class PipeWalker(urwid.ListWalker):
super().__init__()
def __len__(self):
- while not self._pipe_eof:
+ while not self._iter_done:
self._get_next_item()
return len(self._tids)
@@ -58,7 +58,7 @@ class PipeWalker(urwid.ListWalker):
if pos < 0:
raise IndexError
- while not self._pipe_eof and pos >= len(self._tids):
+ while not self._iter_done and pos >= len(self._tids):
self._get_next_item()
if pos >= len(self._tids):
@@ -80,13 +80,13 @@ class PipeWalker(urwid.ListWalker):
self._modified()
def _get_next_item(self):
- if self._pipe_eof:
+ if self._iter_done:
return None
try:
- self._tids.append(self._pipe.recv())
- except EOFError:
- self._pipe_eof = True
+ self._tids.append(next(self._threads))
+ except StopIteration:
+ self._iter_done = True
class SearchBuffer(Buffer):
"""shows a result list of threads for a query"""
@@ -102,7 +102,6 @@ class SearchBuffer(Buffer):
self.querystring = initialquery
default_order = settings.get('search_threads_sort_order')
self.sort_order = sort_order or default_order
- self.proc = None # process that fills our pipe
self.rebuild()
super().__init__()
@@ -132,21 +131,7 @@ class SearchBuffer(Buffer):
info['result_count_positive'] = 's' if info['result_count'] > 1 else ''
return info
- def cleanup(self):
- self.kill_filler_process()
-
- def kill_filler_process(self):
- """
- terminates the process that fills this buffers
- :class:`~alot.walker.PipeWalker`.
- """
- if self.proc:
- if self.proc.is_alive():
- self.proc.terminate()
-
def rebuild(self):
- self.kill_filler_process()
-
exclude_tags = settings.get_notmuch_setting('search', 'exclude_tags')
if exclude_tags:
exclude_tags = frozenset([t for t in exclude_tags.split(';') if t])
@@ -157,9 +142,8 @@ class SearchBuffer(Buffer):
self._thread_count_val = None
try:
- self.pipe, self.proc = self.dbman.get_threads(self.querystring,
- self.sort_order,
- exclude_tags)
+ threads = self.dbman.get_threads(self.querystring, self.sort_order,
+ exclude_tags)
except NotmuchError:
self.ui.notify('malformed query string: %s' % self.querystring,
'error')
@@ -167,7 +151,7 @@ class SearchBuffer(Buffer):
self.body = self.listbox
return
- self.threadlist = PipeWalker(self.pipe, self.dbman)
+ self.threadlist = IterWalker(threads, self.dbman)
self.listbox = urwid.ListBox(self.threadlist)
self.body = self.listbox
diff --git a/alot/db/manager.py b/alot/db/manager.py
index cf452c42..c902eb54 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -2,13 +2,8 @@
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
from collections import deque
-import errno
import logging
-import multiprocessing
import os
-import signal
-import sys
-import threading
from notmuch2 import Database, NotmuchError, XapianError
@@ -28,47 +23,6 @@ def _is_subdir_of(subpath, superpath):
# e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
return os.path.commonprefix([subpath, superpath]) == superpath
-class FillPipeProcess(multiprocessing.Process):
-
- def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)):
- multiprocessing.Process.__init__(self)
- self.it = it
- self.pipe = pipe[1]
- self.fun = fun
- self.keep_going = True
- self.stdout = stdout
- self.stderr = stderr
-
- def handle_sigterm(self, signo, frame):
- # this is used to suppress any EINTR errors at interpreter
- # shutdown
- self.keep_going = False
-
- # raises SystemExit to shut down the interpreter from the
- # signal handler
- sys.exit()
-
- def run(self):
- # replace filedescriptors 1 and 2 (stdout and stderr) with
- # pipes to the parent process
- os.dup2(self.stdout, 1)
- os.dup2(self.stderr, 2)
-
- # register a signal handler for SIGTERM
- signal.signal(signal.SIGTERM, self.handle_sigterm)
-
- for a in self.it:
- try:
- self.pipe.send(self.fun(a))
- except IOError as e:
- # suppress spurious EINTR errors at interpreter
- # shutdown
- if e.errno != errno.EINTR or self.keep_going:
- raise
-
- self.pipe.close()
-
-
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -188,15 +142,6 @@ class DBManager:
raise e
logging.debug('flush finished')
- def kill_search_processes(self):
- """
- terminate all search processes that originate from
- this managers :meth:`get_threads`.
- """
- for p in self.processes:
- p.terminate()
- self.processes = []
-
def tag(self, querystring, tags, afterwards=None, remove_rest=False):
"""
add tags to messages matching `querystring`.
@@ -287,73 +232,6 @@ class DBManager:
queries = filter(lambda k: k.startswith(q_prefix), db.config)
return { q[len(q_prefix):] : db.config[q] for q in queries }
- def async_(self, seq, fun):
- """
- return a pair (pipe, process) so that the process writes
- `fun(a)` to the pipe for each element `a` in the iterable returned
- by the callable `seq`.
-
- :param fun: an unary translation function
- :type fun: callable
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
- """
- # create two unix pipes to redirect the workers stdout and
- # stderr
- stdout = os.pipe()
- stderr = os.pipe()
-
- # create a multiprocessing pipe for the results
- pipe = multiprocessing.Pipe(False)
- receiver, sender = pipe
-
- process = FillPipeProcess(seq, stdout[1], stderr[1], pipe, fun)
- process.start()
- self.processes.append(process)
- logging.debug('Worker process %s spawned', process.pid)
-
- def threaded_wait():
- # wait(2) for the process to die
- process.join()
-
- if process.exitcode < 0:
- msg = 'received signal {0}'.format(-process.exitcode)
- elif process.exitcode > 0:
- msg = 'returned error code {0}'.format(process.exitcode)
- else:
- msg = 'exited successfully'
-
- logging.debug('Worker process %s %s', process.pid, msg)
- self.processes.remove(process)
-
- # XXX: it would be much nicer to run this as a coroutine than a thread,
- # except that this code is executed before the eventloop is started.
- #
- # spawn a thread to collect the worker process once it dies
- # preventing it from hanging around as zombie
- threading.Thread(target=threaded_wait).start()
-
- # TODO: avoid this if logging level > debug
- def threaded_reader(prefix, fd):
- with os.fdopen(fd) as handle:
- for line in handle:
- logging.debug('Worker process %s said on %s: %s',
- process.pid, prefix, line.rstrip())
-
- # spawn two threads that read from the stdout and stderr pipes
- # and write anything that appears there to the log
- threading.Thread(target=threaded_reader,
- args=('stdout', stdout[0])).start()
- os.close(stdout[1])
- threading.Thread(target=threaded_reader,
- args=('stderr', stderr[0])).start()
- os.close(stderr[1])
-
- # closing the sending end in this (receiving) process guarantees
- # that here the appropriate EOFError is raised upon .recv in the walker
- sender.close()
- return receiver, process
-
def get_threads(self, querystring, sort='newest_first', exclude_tags = frozenset()):
"""
asynchronously look up thread ids matching `querystring`.
@@ -366,10 +244,7 @@ class DBManager:
:param exclude_tags: Tags to exclude by default unless included in the
search
:type exclude_tags: set of str
- :returns: a pipe together with the process that asynchronously
- writes to it.
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
+ :returns: iterator over thread ids
"""
# TODO: use a symbolic constant for this
assert sort in self._sort_orders
@@ -378,8 +253,7 @@ class DBManager:
sort = self._sort_orders[sort]
exclude_tags = self._exclude_tags | exclude_tags
- return self.async_(db.threads(querystring, sort = sort, exclude_tags = exclude_tags),
- lambda t: t.threadid)
+ return (t.threadid for t in db.threads(querystring, sort = sort, exclude_tags = exclude_tags))
def add_message(self, path, tags):
"""
diff --git a/tests/db/test_manager.py b/tests/db/test_manager.py
index 0f3a64ac..dbef6c6c 100644
--- a/tests/db/test_manager.py
+++ b/tests/db/test_manager.py
@@ -71,7 +71,6 @@ class TestDBManager(utilities.TestCaseClassCleanup):
cls.manager = manager.DBManager(cls.dbpath)
# clean up temporary database
- cls.addClassCleanup(cls.manager.kill_search_processes)
cls.addClassCleanup(shutil.rmtree, cls.dbpath)
# let global settings manager read our temporary notmuch config