summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Totzke <patricktotzke@gmail.com>2013-01-24 13:42:18 +0000
committerPatrick Totzke <patricktotzke@gmail.com>2013-01-24 13:42:18 +0000
commit22b0170fd6176af78fc8d89172cf1ad837bd09bf (patch)
treeea883c10d1460de2bdf41de3cf9ca8c9e0ee3bb8
parentd99c7151ac1c32bbac4976096689be32ff79cb02 (diff)
parent83a963291f15a2787499a4218e37a27587308a93 (diff)
Merge branch '0.3.3-workers-love-555'
-rw-r--r--alot/db/manager.py43
1 files changed, 41 insertions, 2 deletions
diff --git a/alot/db/manager.py b/alot/db/manager.py
index bf1581ff..b3d31694 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -6,6 +6,7 @@ import notmuch
import multiprocessing
import logging
import sys
+import os
import errno
import signal
from twisted.internet import reactor
@@ -23,24 +24,39 @@ from alot.db import DB_ENC
class FillPipeProcess(multiprocessing.Process):
- def __init__(self, it, pipe, fun=(lambda x: x)):
+ def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)):
multiprocessing.Process.__init__(self)
self.it = it
self.pipe = pipe[1]
self.fun = fun
self.keep_going = True
+ self.stdout = stdout
+ self.stderr = stderr
def handle_sigterm(self, signo, frame):
+ # this is used to suppress any EINTR errors at interpreter
+ # shutdown
self.keep_going = False
+
+ # raises SystemExit to shut down the interpreter from the
+ # signal handler
sys.exit()
def run(self):
+ # replace filedescriptors 1 and 2 (stdout and stderr) with
+ # pipes to the parent process
+ os.dup2(self.stdout, 1)
+ os.dup2(self.stderr, 2)
+
+ # register a signal handler for SIGTERM
signal.signal(signal.SIGTERM, self.handle_sigterm)
for a in self.it:
try:
self.pipe.send(self.fun(a))
except IOError as e:
+ # suppress spurious EINTR errors at interpreter
+ # shutdown
if e.errno != errno.EINTR or self.keep_going:
raise
@@ -308,14 +324,22 @@ class DBManager(object):
:rtype: (:class:`multiprocessing.Pipe`,
:class:`multiprocessing.Process`)
"""
+ # create two unix pipes to redirect the workers stdout and
+ # stderr
+ stdout = os.pipe()
+ stderr = os.pipe()
+
+ # create a multiprocessing pipe for the results
pipe = multiprocessing.Pipe(False)
receiver, sender = pipe
- process = FillPipeProcess(cbl(), pipe, fun)
+
+ process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun)
process.start()
self.processes.append(process)
logging.debug('Worker process {0} spawned'.format(process.pid))
def threaded_wait():
+ # wait(2) for the process to die
process.join()
if process.exitcode < 0:
@@ -328,8 +352,23 @@ class DBManager(object):
logging.debug('Worker process {0} {1}'.format(process.pid, msg))
self.processes.remove(process)
+ # spawn a thread to collect the worker process once it dies
+ # preventing it from hanging around as zombie
reactor.callInThread(threaded_wait)
+ def threaded_reader(prefix, fd):
+ with os.fdopen(fd) as handle:
+ for line in handle:
+ logging.debug('Worker process {0} said on {1}: {2}'.format(
+ process.pid, prefix, line.rstrip()))
+
+ # spawn two threads that read from the stdout and stderr pipes
+ # and write anything that appears there to the log
+ reactor.callInThread(threaded_reader, 'stdout', stdout[0])
+ os.close(stdout[1])
+ reactor.callInThread(threaded_reader, 'stderr', stderr[0])
+ os.close(stderr[1])
+
# closing the sending end in this (receiving) process guarantees
# that here the apropriate EOFError is raised upon .recv in the walker
sender.close()