summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--alot/buffers/search.py27
-rw-r--r--alot/db/manager.py130
-rw-r--r--alot/walker.py30
-rw-r--r--tests/db/test_manager.py1
4 files changed, 30 insertions, 158 deletions
diff --git a/alot/buffers/search.py b/alot/buffers/search.py
index 203ef483..d64e857f 100644
--- a/alot/buffers/search.py
+++ b/alot/buffers/search.py
@@ -6,7 +6,7 @@ from notmuch import NotmuchError
from .buffer import Buffer
from ..settings.const import settings
-from ..walker import PipeWalker
+from ..walker import IterableWalker
from ..widgets.search import ThreadlineWidget
@@ -26,7 +26,6 @@ class SearchBuffer(Buffer):
self.sort_order = sort_order or default_order
self.result_count = 0
self.isinitialized = False
- self.proc = None # process that fills our pipe
self.rebuild()
Buffer.__init__(self, ui, self.body)
@@ -42,22 +41,9 @@ class SearchBuffer(Buffer):
info['result_count_positive'] = 's' if self.result_count > 1 else ''
return info
- def cleanup(self):
- self.kill_filler_process()
-
- def kill_filler_process(self):
- """
- terminates the process that fills this buffers
- :class:`~alot.walker.PipeWalker`.
- """
- if self.proc:
- if self.proc.is_alive():
- self.proc.terminate()
-
def rebuild(self, reverse=False):
self.isinitialized = True
self.reversed = reverse
- self.kill_filler_process()
if reverse:
order = self._REVERSE[self.sort_order]
@@ -70,9 +56,8 @@ class SearchBuffer(Buffer):
try:
self.result_count = self.dbman.count_messages(self.querystring)
- self.pipe, self.proc = self.dbman.get_threads(self.querystring,
- order,
- exclude_tags)
+ threads = self.dbman.get_threads(
+ self.querystring, order, exclude_tags)
except NotmuchError:
self.ui.notify('malformed query string: %s' % self.querystring,
'error')
@@ -80,9 +65,9 @@ class SearchBuffer(Buffer):
self.body = self.listbox
return
- self.threadlist = PipeWalker(self.pipe, ThreadlineWidget,
- dbman=self.dbman,
- reverse=reverse)
+ self.threadlist = IterableWalker(threads, ThreadlineWidget,
+ dbman=self.dbman,
+ reverse=reverse)
self.listbox = urwid.ListBox(self.threadlist)
self.body = self.listbox
diff --git a/alot/db/manager.py b/alot/db/manager.py
index 956ddcbc..af42a1f5 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -1,14 +1,10 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
+# Copyright © Dylan Baker
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
from collections import deque
-import errno
+import asyncio
import logging
-import multiprocessing
-import os
-import signal
-import sys
-import threading
from notmuch import Database, NotmuchError, XapianError
import notmuch
@@ -23,47 +19,6 @@ from .utils import is_subdir_of
from ..settings.const import settings
-class FillPipeProcess(multiprocessing.Process):
-
- def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)):
- multiprocessing.Process.__init__(self)
- self.it = it
- self.pipe = pipe[1]
- self.fun = fun
- self.keep_going = True
- self.stdout = stdout
- self.stderr = stderr
-
- def handle_sigterm(self, signo, frame):
- # this is used to suppress any EINTR errors at interpreter
- # shutdown
- self.keep_going = False
-
- # raises SystemExit to shut down the interpreter from the
- # signal handler
- sys.exit()
-
- def run(self):
- # replace filedescriptors 1 and 2 (stdout and stderr) with
- # pipes to the parent process
- os.dup2(self.stdout, 1)
- os.dup2(self.stderr, 2)
-
- # register a signal handler for SIGTERM
- signal.signal(signal.SIGTERM, self.handle_sigterm)
-
- for a in self.it:
- try:
- self.pipe.send(self.fun(a))
- except IOError as e:
- # suppress spurious EINTR errors at interpreter
- # shutdown
- if e.errno != errno.EINTR or self.keep_going:
- raise
-
- self.pipe.close()
-
-
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -198,15 +153,6 @@ class DBManager:
raise e
logging.debug('flush finished')
- def kill_search_processes(self):
- """
- terminate all search processes that originate from
- this managers :meth:`get_threads`.
- """
- for p in self.processes:
- p.terminate()
- self.processes = []
-
def tag(self, querystring, tags, afterwards=None, remove_rest=False):
"""
add tags to messages matching `querystring`.
@@ -309,75 +255,6 @@ class DBManager:
db = Database(path=self.path)
return {k[6:]: v for k, v in db.get_configs('query.')}
- def async_(self, cbl, fun):
- """
- return a pair (pipe, process) so that the process writes
- `fun(a)` to the pipe for each element `a` in the iterable returned
- by the callable `cbl`.
-
- :param cbl: a function returning something iterable
- :type cbl: callable
- :param fun: an unary translation function
- :type fun: callable
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
- """
- # create two unix pipes to redirect the workers stdout and
- # stderr
- stdout = os.pipe()
- stderr = os.pipe()
-
- # create a multiprocessing pipe for the results
- pipe = multiprocessing.Pipe(False)
- receiver, sender = pipe
-
- process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun)
- process.start()
- self.processes.append(process)
- logging.debug('Worker process %s spawned', process.pid)
-
- def threaded_wait():
- # wait(2) for the process to die
- process.join()
-
- if process.exitcode < 0:
- msg = 'received signal {0}'.format(-process.exitcode)
- elif process.exitcode > 0:
- msg = 'returned error code {0}'.format(process.exitcode)
- else:
- msg = 'exited successfully'
-
- logging.debug('Worker process %s %s', process.pid, msg)
- self.processes.remove(process)
-
- # XXX: it would be much nicer to run this as a coroutine than a thread,
- # except that this code is executed before the eventloop is started.
- #
- # spawn a thread to collect the worker process once it dies
- # preventing it from hanging around as zombie
- threading.Thread(target=threaded_wait).start()
-
- # TODO: avoid this if logging level > debug
- def threaded_reader(prefix, fd):
- with os.fdopen(fd) as handle:
- for line in handle:
- logging.debug('Worker process %s said on %s: %s',
- process.pid, prefix, line.rstrip())
-
- # spawn two threads that read from the stdout and stderr pipes
- # and write anything that appears there to the log
- threading.Thread(target=threaded_reader,
- args=('stdout', stdout[0])).start()
- os.close(stdout[1])
- threading.Thread(target=threaded_reader,
- args=('stderr', stderr[0])).start()
- os.close(stderr[1])
-
- # closing the sending end in this (receiving) process guarantees
- # that here the appropriate EOFError is raised upon .recv in the walker
- sender.close()
- return receiver, process
-
def get_threads(self, querystring, sort='newest_first', exclude_tags=None):
"""
asynchronously look up thread ids matching `querystring`.
@@ -401,7 +278,8 @@ class DBManager:
if exclude_tags:
for tag in exclude_tags:
q.exclude_tag(tag)
- return self.async_(q.search_threads, (lambda a: a.get_thread_id()))
+ for t in q.search_threads():
+ yield t.get_thread_id()
def query(self, querystring):
"""
diff --git a/alot/walker.py b/alot/walker.py
index e4264103..cfefc9e6 100644
--- a/alot/walker.py
+++ b/alot/walker.py
@@ -1,20 +1,30 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
+# Copyright © 2018 Dylan Baker
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
import logging
import urwid
-class PipeWalker(urwid.ListWalker):
- """urwid.ListWalker that reads next items from a pipe and wraps them in
- `containerclass` widgets for displaying
+class IterableWalker(urwid.ListWalker):
- Attributes that should be considered publicly readable:
- :attr lines: the lines obtained from the pipe
- :type lines: list(`containerclass`)
+ """An urwid walker for iterables.
+
+ Works like ListWalker, except it takes an iterable object instead of a
+ concrete type. This allows for lazy operations of very large sequences of
+ data, such as a sequences of threads with certain notmuch tags.
+
+ :param iterable: An iterator of objects to walk over
+ :type iterable: Iterable[T]
+ :param containerclass: An urwid widget to wrap each object in
+ :type containerclass: urwid.Widget
+ :param reverse: Reverse the order of the iterable
+ :type reverse: bool
+ :param **kwargs: Forwarded to container class.
"""
- def __init__(self, pipe, containerclass, reverse=False, **kwargs):
- self.pipe = pipe
+
+ def __init__(self, iterable, containerclass, reverse=False, **kwargs):
+ self.iterable = iterable
self.kwargs = kwargs
self.containerclass = containerclass
self.lines = []
@@ -71,10 +81,10 @@ class PipeWalker(urwid.ListWalker):
try:
# the next line blocks until it can read from the pipe or
# EOFError is raised. No races here.
- next_obj = self.pipe.recv()
+ next_obj = next(self.iterable)
next_widget = self.containerclass(next_obj, **self.kwargs)
self.lines.append(next_widget)
- except EOFError:
+ except StopIteration:
logging.debug('EMPTY PIPE')
next_widget = None
self.empty = True
diff --git a/tests/db/test_manager.py b/tests/db/test_manager.py
index e675aed4..f7a47a58 100644
--- a/tests/db/test_manager.py
+++ b/tests/db/test_manager.py
@@ -37,7 +37,6 @@ class TestDBManager(utilities.TestCaseClassCleanup):
cls.manager = DBManager(cls.dbpath)
# clean up temporary database
- cls.addClassCleanup(cls.manager.kill_search_processes)
cls.addClassCleanup(shutil.rmtree, cls.dbpath)
# let global settings manager read our temporary notmuch config