From d1b5cd81902b8f206637ae9525357378fd62b506 Mon Sep 17 00:00:00 2001 From: Justus Winter <4winter@informatik.uni-hamburg.de> Date: Wed, 23 Jan 2013 17:20:13 +0100 Subject: Replace workers std{out,err} by pipes and log anything written to them This will help us identify problems in libnotmuch and reduces visual artifacts (unfortunately libnotmuch writes messages to stderr that used to clobber the curses interface). Signed-off-by: Justus Winter <4winter@informatik.uni-hamburg.de> --- alot/db/manager.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/alot/db/manager.py b/alot/db/manager.py index bf1581ff..99ae607c 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -6,6 +6,7 @@ import notmuch import multiprocessing import logging import sys +import os import errno import signal from twisted.internet import reactor @@ -23,18 +24,22 @@ from alot.db import DB_ENC class FillPipeProcess(multiprocessing.Process): - def __init__(self, it, pipe, fun=(lambda x: x)): + def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)): multiprocessing.Process.__init__(self) self.it = it self.pipe = pipe[1] self.fun = fun self.keep_going = True + self.stdout = stdout + self.stderr = stderr def handle_sigterm(self, signo, frame): self.keep_going = False sys.exit() def run(self): + os.dup2(self.stdout, 1) + os.dup2(self.stderr, 2) signal.signal(signal.SIGTERM, self.handle_sigterm) for a in self.it: @@ -308,9 +313,11 @@ class DBManager(object): :rtype: (:class:`multiprocessing.Pipe`, :class:`multiprocessing.Process`) """ + stdout = os.pipe() + stderr = os.pipe() pipe = multiprocessing.Pipe(False) receiver, sender = pipe - process = FillPipeProcess(cbl(), pipe, fun) + process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun) process.start() self.processes.append(process) logging.debug('Worker process {0} spawned'.format(process.pid)) @@ -330,6 +337,17 @@ class DBManager(object): reactor.callInThread(threaded_wait) + def threaded_reader(prefix, fd): + with os.fdopen(fd) as handle: + for line in handle: + logging.debug('Worker process {0} said on {1}: {2}'.format( + process.pid, prefix, line.rstrip())) + + reactor.callInThread(threaded_reader, 'stdout', stdout[0]) + os.close(stdout[1]) + reactor.callInThread(threaded_reader, 'stderr', stderr[0]) + os.close(stderr[1]) + # closing the sending end in this (receiving) process guarantees # that here the apropriate EOFError is raised upon .recv in the walker sender.close() -- cgit v1.2.3 From 83a963291f15a2787499a4218e37a27587308a93 Mon Sep 17 00:00:00 2001 From: Justus Winter <4winter@informatik.uni-hamburg.de> Date: Thu, 24 Jan 2013 13:46:16 +0100 Subject: Add some comments to the db manager async code Signed-off-by: Justus Winter <4winter@informatik.uni-hamburg.de> --- alot/db/manager.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/alot/db/manager.py b/alot/db/manager.py index 99ae607c..b3d31694 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -34,18 +34,29 @@ class FillPipeProcess(multiprocessing.Process): self.stderr = stderr def handle_sigterm(self, signo, frame): + # this is used to suppress any EINTR errors at interpreter + # shutdown self.keep_going = False + + # raises SystemExit to shut down the interpreter from the + # signal handler sys.exit() def run(self): + # replace filedescriptors 1 and 2 (stdout and stderr) with + # pipes to the parent process os.dup2(self.stdout, 1) os.dup2(self.stderr, 2) + + # register a signal handler for SIGTERM signal.signal(signal.SIGTERM, self.handle_sigterm) for a in self.it: try: self.pipe.send(self.fun(a)) except IOError as e: + # suppress spurious EINTR errors at interpreter + # shutdown if e.errno != errno.EINTR or self.keep_going: raise @@ -313,16 +324,22 @@ class DBManager(object): :rtype: (:class:`multiprocessing.Pipe`, :class:`multiprocessing.Process`) """ + # create two unix pipes to redirect the workers stdout and + # stderr stdout = os.pipe() stderr = os.pipe() + + # create a multiprocessing pipe for the results pipe = multiprocessing.Pipe(False) receiver, sender = pipe + process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun) process.start() self.processes.append(process) logging.debug('Worker process {0} spawned'.format(process.pid)) def threaded_wait(): + # wait(2) for the process to die process.join() if process.exitcode < 0: @@ -335,6 +352,8 @@ class DBManager(object): logging.debug('Worker process {0} {1}'.format(process.pid, msg)) self.processes.remove(process) + # spawn a thread to collect the worker process once it dies + # preventing it from hanging around as zombie reactor.callInThread(threaded_wait) def threaded_reader(prefix, fd): @@ -343,6 +362,8 @@ class DBManager(object): logging.debug('Worker process {0} said on {1}: {2}'.format( process.pid, prefix, line.rstrip())) + # spawn two threads that read from the stdout and stderr pipes + # and write anything that appears there to the log reactor.callInThread(threaded_reader, 'stdout', stdout[0]) os.close(stdout[1]) reactor.callInThread(threaded_reader, 'stderr', stderr[0]) -- cgit v1.2.3