diff options
author | Patrick Totzke <patricktotzke@gmail.com> | 2013-01-24 13:42:18 +0000 |
---|---|---|
committer | Patrick Totzke <patricktotzke@gmail.com> | 2013-01-24 13:42:18 +0000 |
commit | 22b0170fd6176af78fc8d89172cf1ad837bd09bf (patch) | |
tree | ea883c10d1460de2bdf41de3cf9ca8c9e0ee3bb8 | |
parent | d99c7151ac1c32bbac4976096689be32ff79cb02 (diff) | |
parent | 83a963291f15a2787499a4218e37a27587308a93 (diff) |
Merge branch '0.3.3-workers-love-555'
-rw-r--r-- | alot/db/manager.py | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/alot/db/manager.py b/alot/db/manager.py index bf1581ff..b3d31694 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -6,6 +6,7 @@ import notmuch import multiprocessing import logging import sys +import os import errno import signal from twisted.internet import reactor @@ -23,24 +24,39 @@ from alot.db import DB_ENC class FillPipeProcess(multiprocessing.Process): - def __init__(self, it, pipe, fun=(lambda x: x)): + def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)): multiprocessing.Process.__init__(self) self.it = it self.pipe = pipe[1] self.fun = fun self.keep_going = True + self.stdout = stdout + self.stderr = stderr def handle_sigterm(self, signo, frame): + # this is used to suppress any EINTR errors at interpreter + # shutdown self.keep_going = False + + # raises SystemExit to shut down the interpreter from the + # signal handler sys.exit() def run(self): + # replace filedescriptors 1 and 2 (stdout and stderr) with + # pipes to the parent process + os.dup2(self.stdout, 1) + os.dup2(self.stderr, 2) + + # register a signal handler for SIGTERM signal.signal(signal.SIGTERM, self.handle_sigterm) for a in self.it: try: self.pipe.send(self.fun(a)) except IOError as e: + # suppress spurious EINTR errors at interpreter + # shutdown if e.errno != errno.EINTR or self.keep_going: raise @@ -308,14 +324,22 @@ class DBManager(object): :rtype: (:class:`multiprocessing.Pipe`, :class:`multiprocessing.Process`) """ + # create two unix pipes to redirect the workers stdout and + # stderr + stdout = os.pipe() + stderr = os.pipe() + + # create a multiprocessing pipe for the results pipe = multiprocessing.Pipe(False) receiver, sender = pipe - process = FillPipeProcess(cbl(), pipe, fun) + + process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun) process.start() self.processes.append(process) logging.debug('Worker process {0} spawned'.format(process.pid)) def threaded_wait(): + # wait(2) for the process to die process.join() if process.exitcode < 0: @@ -328,8 +352,23 @@ class DBManager(object): logging.debug('Worker process {0} {1}'.format(process.pid, msg)) self.processes.remove(process) + # spawn a thread to collect the worker process once it dies + # preventing it from hanging around as zombie reactor.callInThread(threaded_wait) + def threaded_reader(prefix, fd): + with os.fdopen(fd) as handle: + for line in handle: + logging.debug('Worker process {0} said on {1}: {2}'.format( + process.pid, prefix, line.rstrip())) + + # spawn two threads that read from the stdout and stderr pipes + # and write anything that appears there to the log + reactor.callInThread(threaded_reader, 'stdout', stdout[0]) + os.close(stdout[1]) + reactor.callInThread(threaded_reader, 'stderr', stderr[0]) + os.close(stderr[1]) + # closing the sending end in this (receiving) process guarantees # that here the apropriate EOFError is raised upon .recv in the walker sender.close() |