summaryrefslogtreecommitdiff
path: root/alot/db.py
diff options
context:
space:
mode:
authorPatrick Totzke <patricktotzke@gmail.com>2011-11-10 21:52:18 +0000
committerPatrick Totzke <patricktotzke@gmail.com>2011-11-10 21:52:18 +0000
commit14cd032638cdc5adf5a68d04280a642ee8662f94 (patch)
tree2f3e3dcf5b9dcecc7a0f210797426a2c1e42de78 /alot/db.py
parent836bb9a6b87062bcdaa0833f53d40ecdf9c18fbf (diff)
clean up piping architecture
Diffstat (limited to 'alot/db.py')
-rw-r--r--alot/db.py33
1 files changed, 17 insertions, 16 deletions
diff --git a/alot/db.py b/alot/db.py
index 244b5f9d..55d02149 100644
--- a/alot/db.py
+++ b/alot/db.py
@@ -42,23 +42,22 @@ class DatabaseLockedError(DatabaseError):
class FillPipeProcess(multiprocessing.Process):
- def __init__(self, it, pipe_i, pipe_o, fun=(lambda x: x)):
+ def __init__(self, it, pipe, fun=(lambda x: x)):
multiprocessing.Process.__init__(self)
self.it = it
- self.i = pipe_i
- self.o = pipe_o
+ self.pipe = pipe[1]
+ #pipe[0].close()
self.fun = fun
def run(self):
- try:
- for a in self.it:
- self.i.send(self.fun(a))
- self.o.close()
- self.i.close()
+ #try:
+ for a in self.it:
+ self.pipe.send(self.fun(a))
+ self.pipe.close()
#self.terminate()
- except IOError:
+ #except IOError:
# looks like the main process exited, so we stop
- pass
+ # pass
class DBManager(object):
@@ -196,12 +195,14 @@ class DBManager(object):
def async(self, cbl, fun):
"""return a `Pipe` object to which `fun(a)` is written for each a in
cbl"""
- (i, o) = multiprocessing.Pipe(False)
- t = FillPipeProcess(cbl(), o, i, fun)
- t.start()
- o.close() # guarantees that both ends are closed and i.close in child
- # raises EOFError in parent
- return (i, t)
+ pipe = multiprocessing.Pipe(False)
+ receiver, sender = pipe
+ process = FillPipeProcess(cbl(), pipe, fun)
+ process.start()
+ # closing the sending end in tis (receiving) process guarantees
+ # that here the apropriate EOFError is raisedupon .recv
+ sender.close()
+ return receiver, process
def get_threads(self, querystring):
"""