diff options
author | Anton Khirnov <anton@khirnov.net> | 2019-12-21 09:29:49 +0100 |
---|---|---|
committer | Anton Khirnov <anton@khirnov.net> | 2020-01-02 17:18:56 +0100 |
commit | 09d7db6dc591fbe44058b0e01acae331c52f0b28 (patch) | |
tree | c1ec33174778ae0e593453d510a030fe2b469f5d /alot/db/manager.py | |
parent | f1ceccaa58bb36cac73e6886a0b14230e5518fda (diff) |
Revert "db/manager: Drop async method"
This reverts commit e7e0c52db9093a9ecd9dcaa0766e66515a546a75.
Diffstat (limited to 'alot/db/manager.py')
-rw-r--r-- | alot/db/manager.py | 129 |
1 files changed, 126 insertions, 3 deletions
diff --git a/alot/db/manager.py b/alot/db/manager.py index d95ea332..956ddcbc 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -1,9 +1,14 @@ # Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> -# Copyright © Dylan Baker # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file from collections import deque +import errno import logging +import multiprocessing +import os +import signal +import sys +import threading from notmuch import Database, NotmuchError, XapianError import notmuch @@ -18,6 +23,47 @@ from .utils import is_subdir_of from ..settings.const import settings +class FillPipeProcess(multiprocessing.Process): + + def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)): + multiprocessing.Process.__init__(self) + self.it = it + self.pipe = pipe[1] + self.fun = fun + self.keep_going = True + self.stdout = stdout + self.stderr = stderr + + def handle_sigterm(self, signo, frame): + # this is used to suppress any EINTR errors at interpreter + # shutdown + self.keep_going = False + + # raises SystemExit to shut down the interpreter from the + # signal handler + sys.exit() + + def run(self): + # replace filedescriptors 1 and 2 (stdout and stderr) with + # pipes to the parent process + os.dup2(self.stdout, 1) + os.dup2(self.stderr, 2) + + # register a signal handler for SIGTERM + signal.signal(signal.SIGTERM, self.handle_sigterm) + + for a in self.it: + try: + self.pipe.send(self.fun(a)) + except IOError as e: + # suppress spurious EINTR errors at interpreter + # shutdown + if e.errno != errno.EINTR or self.keep_going: + raise + + self.pipe.close() + + class DBManager: """ Keeps track of your index parameters, maintains a write-queue and @@ -152,6 +198,15 @@ class DBManager: raise e logging.debug('flush finished') + def kill_search_processes(self): + """ + terminate all search processes that originate from + this managers :meth:`get_threads`. + """ + for p in self.processes: + p.terminate() + self.processes = [] + def tag(self, querystring, tags, afterwards=None, remove_rest=False): """ add tags to messages matching `querystring`. @@ -254,6 +309,75 @@ class DBManager: db = Database(path=self.path) return {k[6:]: v for k, v in db.get_configs('query.')} + def async_(self, cbl, fun): + """ + return a pair (pipe, process) so that the process writes + `fun(a)` to the pipe for each element `a` in the iterable returned + by the callable `cbl`. + + :param cbl: a function returning something iterable + :type cbl: callable + :param fun: an unary translation function + :type fun: callable + :rtype: (:class:`multiprocessing.Pipe`, + :class:`multiprocessing.Process`) + """ + # create two unix pipes to redirect the workers stdout and + # stderr + stdout = os.pipe() + stderr = os.pipe() + + # create a multiprocessing pipe for the results + pipe = multiprocessing.Pipe(False) + receiver, sender = pipe + + process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun) + process.start() + self.processes.append(process) + logging.debug('Worker process %s spawned', process.pid) + + def threaded_wait(): + # wait(2) for the process to die + process.join() + + if process.exitcode < 0: + msg = 'received signal {0}'.format(-process.exitcode) + elif process.exitcode > 0: + msg = 'returned error code {0}'.format(process.exitcode) + else: + msg = 'exited successfully' + + logging.debug('Worker process %s %s', process.pid, msg) + self.processes.remove(process) + + # XXX: it would be much nicer to run this as a coroutine than a thread, + # except that this code is executed before the eventloop is started. + # + # spawn a thread to collect the worker process once it dies + # preventing it from hanging around as zombie + threading.Thread(target=threaded_wait).start() + + # TODO: avoid this if logging level > debug + def threaded_reader(prefix, fd): + with os.fdopen(fd) as handle: + for line in handle: + logging.debug('Worker process %s said on %s: %s', + process.pid, prefix, line.rstrip()) + + # spawn two threads that read from the stdout and stderr pipes + # and write anything that appears there to the log + threading.Thread(target=threaded_reader, + args=('stdout', stdout[0])).start() + os.close(stdout[1]) + threading.Thread(target=threaded_reader, + args=('stderr', stderr[0])).start() + os.close(stderr[1]) + + # closing the sending end in this (receiving) process guarantees + # that here the appropriate EOFError is raised upon .recv in the walker + sender.close() + return receiver, process + def get_threads(self, querystring, sort='newest_first', exclude_tags=None): """ asynchronously look up thread ids matching `querystring`. @@ -277,8 +401,7 @@ class DBManager: if exclude_tags: for tag in exclude_tags: q.exclude_tag(tag) - for t in q.search_threads(): - yield t.get_thread_id() + return self.async_(q.search_threads, (lambda a: a.get_thread_id())) def query(self, querystring): """ |