summaryrefslogtreecommitdiff
path: root/alot/db/manager.py
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2019-12-21 09:29:49 +0100
committerAnton Khirnov <anton@khirnov.net>2020-01-02 17:18:56 +0100
commit09d7db6dc591fbe44058b0e01acae331c52f0b28 (patch)
treec1ec33174778ae0e593453d510a030fe2b469f5d /alot/db/manager.py
parentf1ceccaa58bb36cac73e6886a0b14230e5518fda (diff)
Revert "db/manager: Drop async method"
This reverts commit e7e0c52db9093a9ecd9dcaa0766e66515a546a75.
Diffstat (limited to 'alot/db/manager.py')
-rw-r--r--alot/db/manager.py129
1 files changed, 126 insertions, 3 deletions
diff --git a/alot/db/manager.py b/alot/db/manager.py
index d95ea332..956ddcbc 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -1,9 +1,14 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
-# Copyright © Dylan Baker
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
from collections import deque
+import errno
import logging
+import multiprocessing
+import os
+import signal
+import sys
+import threading
from notmuch import Database, NotmuchError, XapianError
import notmuch
@@ -18,6 +23,47 @@ from .utils import is_subdir_of
from ..settings.const import settings
+class FillPipeProcess(multiprocessing.Process):
+
+ def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)):
+ multiprocessing.Process.__init__(self)
+ self.it = it
+ self.pipe = pipe[1]
+ self.fun = fun
+ self.keep_going = True
+ self.stdout = stdout
+ self.stderr = stderr
+
+ def handle_sigterm(self, signo, frame):
+ # this is used to suppress any EINTR errors at interpreter
+ # shutdown
+ self.keep_going = False
+
+ # raises SystemExit to shut down the interpreter from the
+ # signal handler
+ sys.exit()
+
+ def run(self):
+ # replace filedescriptors 1 and 2 (stdout and stderr) with
+ # pipes to the parent process
+ os.dup2(self.stdout, 1)
+ os.dup2(self.stderr, 2)
+
+ # register a signal handler for SIGTERM
+ signal.signal(signal.SIGTERM, self.handle_sigterm)
+
+ for a in self.it:
+ try:
+ self.pipe.send(self.fun(a))
+ except IOError as e:
+ # suppress spurious EINTR errors at interpreter
+ # shutdown
+ if e.errno != errno.EINTR or self.keep_going:
+ raise
+
+ self.pipe.close()
+
+
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -152,6 +198,15 @@ class DBManager:
raise e
logging.debug('flush finished')
+ def kill_search_processes(self):
+ """
+ terminate all search processes that originate from
+ this managers :meth:`get_threads`.
+ """
+ for p in self.processes:
+ p.terminate()
+ self.processes = []
+
def tag(self, querystring, tags, afterwards=None, remove_rest=False):
"""
add tags to messages matching `querystring`.
@@ -254,6 +309,75 @@ class DBManager:
db = Database(path=self.path)
return {k[6:]: v for k, v in db.get_configs('query.')}
+ def async_(self, cbl, fun):
+ """
+ return a pair (pipe, process) so that the process writes
+ `fun(a)` to the pipe for each element `a` in the iterable returned
+ by the callable `cbl`.
+
+ :param cbl: a function returning something iterable
+ :type cbl: callable
+ :param fun: an unary translation function
+ :type fun: callable
+ :rtype: (:class:`multiprocessing.Pipe`,
+ :class:`multiprocessing.Process`)
+ """
+ # create two unix pipes to redirect the workers stdout and
+ # stderr
+ stdout = os.pipe()
+ stderr = os.pipe()
+
+ # create a multiprocessing pipe for the results
+ pipe = multiprocessing.Pipe(False)
+ receiver, sender = pipe
+
+ process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun)
+ process.start()
+ self.processes.append(process)
+ logging.debug('Worker process %s spawned', process.pid)
+
+ def threaded_wait():
+ # wait(2) for the process to die
+ process.join()
+
+ if process.exitcode < 0:
+ msg = 'received signal {0}'.format(-process.exitcode)
+ elif process.exitcode > 0:
+ msg = 'returned error code {0}'.format(process.exitcode)
+ else:
+ msg = 'exited successfully'
+
+ logging.debug('Worker process %s %s', process.pid, msg)
+ self.processes.remove(process)
+
+ # XXX: it would be much nicer to run this as a coroutine than a thread,
+ # except that this code is executed before the eventloop is started.
+ #
+ # spawn a thread to collect the worker process once it dies
+ # preventing it from hanging around as zombie
+ threading.Thread(target=threaded_wait).start()
+
+ # TODO: avoid this if logging level > debug
+ def threaded_reader(prefix, fd):
+ with os.fdopen(fd) as handle:
+ for line in handle:
+ logging.debug('Worker process %s said on %s: %s',
+ process.pid, prefix, line.rstrip())
+
+ # spawn two threads that read from the stdout and stderr pipes
+ # and write anything that appears there to the log
+ threading.Thread(target=threaded_reader,
+ args=('stdout', stdout[0])).start()
+ os.close(stdout[1])
+ threading.Thread(target=threaded_reader,
+ args=('stderr', stderr[0])).start()
+ os.close(stderr[1])
+
+ # closing the sending end in this (receiving) process guarantees
+ # that here the appropriate EOFError is raised upon .recv in the walker
+ sender.close()
+ return receiver, process
+
def get_threads(self, querystring, sort='newest_first', exclude_tags=None):
"""
asynchronously look up thread ids matching `querystring`.
@@ -277,8 +401,7 @@ class DBManager:
if exclude_tags:
for tag in exclude_tags:
q.exclude_tag(tag)
- for t in q.search_threads():
- yield t.get_thread_id()
+ return self.async_(q.search_threads, (lambda a: a.get_thread_id()))
def query(self, querystring):
"""