summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--alot/buffers/search.py46
-rw-r--r--alot/db/manager.py130
-rw-r--r--tests/db/test_manager.py1
3 files changed, 17 insertions, 160 deletions
diff --git a/alot/buffers/search.py b/alot/buffers/search.py
index c5b6bbcd..8cd76a87 100644
--- a/alot/buffers/search.py
+++ b/alot/buffers/search.py
@@ -8,16 +8,16 @@ from .buffer import Buffer
from ..settings.const import settings
from ..widgets.search import ThreadlineWidget
-class PipeWalker(urwid.ListWalker):
+class IterWalker(urwid.ListWalker):
"""
- urwid.ListWalker that reads next items from a pipe and wraps them in
+ urwid.ListWalker that reads next items from a generator and wraps them in
ThreadlineWidget widgets for displaying
"""
_pipe = None
_dbman = None
- _pipe_eof = False
+ _iter_done = False
# list of the thread IDs
_tids = None
@@ -26,9 +26,9 @@ class PipeWalker(urwid.ListWalker):
_focus = None
- def __init__(self, pipe, dbman):
- self._pipe = pipe
- self._dbman = dbman
+ def __init__(self, threads, dbman):
+ self._threads = threads
+ self._dbman = dbman
self._tids = []
self._wgts = {}
@@ -38,7 +38,7 @@ class PipeWalker(urwid.ListWalker):
super().__init__()
def __len__(self):
- while not self._pipe_eof:
+ while not self._iter_done:
self._get_next_item()
return len(self._tids)
@@ -58,7 +58,7 @@ class PipeWalker(urwid.ListWalker):
if pos < 0:
raise IndexError
- while not self._pipe_eof and pos >= len(self._tids):
+ while not self._iter_done and pos >= len(self._tids):
self._get_next_item()
if pos >= len(self._tids):
@@ -80,13 +80,13 @@ class PipeWalker(urwid.ListWalker):
self._modified()
def _get_next_item(self):
- if self._pipe_eof:
+ if self._iter_done:
return None
try:
- self._tids.append(self._pipe.recv())
- except EOFError:
- self._pipe_eof = True
+ self._tids.append(next(self._threads))
+ except StopIteration:
+ self._iter_done = True
class SearchBuffer(Buffer):
"""shows a result list of threads for a query"""
@@ -102,7 +102,6 @@ class SearchBuffer(Buffer):
self.querystring = initialquery
default_order = settings.get('search_threads_sort_order')
self.sort_order = sort_order or default_order
- self.proc = None # process that fills our pipe
self.rebuild()
super().__init__()
@@ -132,21 +131,7 @@ class SearchBuffer(Buffer):
info['result_count_positive'] = 's' if info['result_count'] > 1 else ''
return info
- def cleanup(self):
- self.kill_filler_process()
-
- def kill_filler_process(self):
- """
- terminates the process that fills this buffers
- :class:`~alot.walker.PipeWalker`.
- """
- if self.proc:
- if self.proc.is_alive():
- self.proc.terminate()
-
def rebuild(self):
- self.kill_filler_process()
-
exclude_tags = settings.get_notmuch_setting('search', 'exclude_tags')
if exclude_tags:
exclude_tags = frozenset([t for t in exclude_tags.split(';') if t])
@@ -157,9 +142,8 @@ class SearchBuffer(Buffer):
self._thread_count_val = None
try:
- self.pipe, self.proc = self.dbman.get_threads(self.querystring,
- self.sort_order,
- exclude_tags)
+ threads = self.dbman.get_threads(self.querystring, self.sort_order,
+ exclude_tags)
except NotmuchError:
self.ui.notify('malformed query string: %s' % self.querystring,
'error')
@@ -167,7 +151,7 @@ class SearchBuffer(Buffer):
self.body = self.listbox
return
- self.threadlist = PipeWalker(self.pipe, self.dbman)
+ self.threadlist = IterWalker(threads, self.dbman)
self.listbox = urwid.ListBox(self.threadlist)
self.body = self.listbox
diff --git a/alot/db/manager.py b/alot/db/manager.py
index cf452c42..c902eb54 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -2,13 +2,8 @@
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
from collections import deque
-import errno
import logging
-import multiprocessing
import os
-import signal
-import sys
-import threading
from notmuch2 import Database, NotmuchError, XapianError
@@ -28,47 +23,6 @@ def _is_subdir_of(subpath, superpath):
# e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
return os.path.commonprefix([subpath, superpath]) == superpath
-class FillPipeProcess(multiprocessing.Process):
-
- def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)):
- multiprocessing.Process.__init__(self)
- self.it = it
- self.pipe = pipe[1]
- self.fun = fun
- self.keep_going = True
- self.stdout = stdout
- self.stderr = stderr
-
- def handle_sigterm(self, signo, frame):
- # this is used to suppress any EINTR errors at interpreter
- # shutdown
- self.keep_going = False
-
- # raises SystemExit to shut down the interpreter from the
- # signal handler
- sys.exit()
-
- def run(self):
- # replace filedescriptors 1 and 2 (stdout and stderr) with
- # pipes to the parent process
- os.dup2(self.stdout, 1)
- os.dup2(self.stderr, 2)
-
- # register a signal handler for SIGTERM
- signal.signal(signal.SIGTERM, self.handle_sigterm)
-
- for a in self.it:
- try:
- self.pipe.send(self.fun(a))
- except IOError as e:
- # suppress spurious EINTR errors at interpreter
- # shutdown
- if e.errno != errno.EINTR or self.keep_going:
- raise
-
- self.pipe.close()
-
-
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -188,15 +142,6 @@ class DBManager:
raise e
logging.debug('flush finished')
- def kill_search_processes(self):
- """
- terminate all search processes that originate from
- this managers :meth:`get_threads`.
- """
- for p in self.processes:
- p.terminate()
- self.processes = []
-
def tag(self, querystring, tags, afterwards=None, remove_rest=False):
"""
add tags to messages matching `querystring`.
@@ -287,73 +232,6 @@ class DBManager:
queries = filter(lambda k: k.startswith(q_prefix), db.config)
return { q[len(q_prefix):] : db.config[q] for q in queries }
- def async_(self, seq, fun):
- """
- return a pair (pipe, process) so that the process writes
- `fun(a)` to the pipe for each element `a` in the iterable returned
- by the callable `seq`.
-
- :param fun: an unary translation function
- :type fun: callable
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
- """
- # create two unix pipes to redirect the workers stdout and
- # stderr
- stdout = os.pipe()
- stderr = os.pipe()
-
- # create a multiprocessing pipe for the results
- pipe = multiprocessing.Pipe(False)
- receiver, sender = pipe
-
- process = FillPipeProcess(seq, stdout[1], stderr[1], pipe, fun)
- process.start()
- self.processes.append(process)
- logging.debug('Worker process %s spawned', process.pid)
-
- def threaded_wait():
- # wait(2) for the process to die
- process.join()
-
- if process.exitcode < 0:
- msg = 'received signal {0}'.format(-process.exitcode)
- elif process.exitcode > 0:
- msg = 'returned error code {0}'.format(process.exitcode)
- else:
- msg = 'exited successfully'
-
- logging.debug('Worker process %s %s', process.pid, msg)
- self.processes.remove(process)
-
- # XXX: it would be much nicer to run this as a coroutine than a thread,
- # except that this code is executed before the eventloop is started.
- #
- # spawn a thread to collect the worker process once it dies
- # preventing it from hanging around as zombie
- threading.Thread(target=threaded_wait).start()
-
- # TODO: avoid this if logging level > debug
- def threaded_reader(prefix, fd):
- with os.fdopen(fd) as handle:
- for line in handle:
- logging.debug('Worker process %s said on %s: %s',
- process.pid, prefix, line.rstrip())
-
- # spawn two threads that read from the stdout and stderr pipes
- # and write anything that appears there to the log
- threading.Thread(target=threaded_reader,
- args=('stdout', stdout[0])).start()
- os.close(stdout[1])
- threading.Thread(target=threaded_reader,
- args=('stderr', stderr[0])).start()
- os.close(stderr[1])
-
- # closing the sending end in this (receiving) process guarantees
- # that here the appropriate EOFError is raised upon .recv in the walker
- sender.close()
- return receiver, process
-
def get_threads(self, querystring, sort='newest_first', exclude_tags = frozenset()):
"""
asynchronously look up thread ids matching `querystring`.
@@ -366,10 +244,7 @@ class DBManager:
:param exclude_tags: Tags to exclude by default unless included in the
search
:type exclude_tags: set of str
- :returns: a pipe together with the process that asynchronously
- writes to it.
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
+ :returns: iterator over thread ids
"""
# TODO: use a symbolic constant for this
assert sort in self._sort_orders
@@ -378,8 +253,7 @@ class DBManager:
sort = self._sort_orders[sort]
exclude_tags = self._exclude_tags | exclude_tags
- return self.async_(db.threads(querystring, sort = sort, exclude_tags = exclude_tags),
- lambda t: t.threadid)
+ return (t.threadid for t in db.threads(querystring, sort = sort, exclude_tags = exclude_tags))
def add_message(self, path, tags):
"""
diff --git a/tests/db/test_manager.py b/tests/db/test_manager.py
index 0f3a64ac..dbef6c6c 100644
--- a/tests/db/test_manager.py
+++ b/tests/db/test_manager.py
@@ -71,7 +71,6 @@ class TestDBManager(utilities.TestCaseClassCleanup):
cls.manager = manager.DBManager(cls.dbpath)
# clean up temporary database
- cls.addClassCleanup(cls.manager.kill_search_processes)
cls.addClassCleanup(shutil.rmtree, cls.dbpath)
# let global settings manager read our temporary notmuch config