diff options
author | Patrick Totzke <patricktotzke@gmail.com> | 2011-11-10 21:52:18 +0000 |
---|---|---|
committer | Patrick Totzke <patricktotzke@gmail.com> | 2011-11-10 21:52:18 +0000 |
commit | 14cd032638cdc5adf5a68d04280a642ee8662f94 (patch) | |
tree | 2f3e3dcf5b9dcecc7a0f210797426a2c1e42de78 /alot/db.py | |
parent | 836bb9a6b87062bcdaa0833f53d40ecdf9c18fbf (diff) |
clean up piping architecture
Diffstat (limited to 'alot/db.py')
-rw-r--r-- | alot/db.py | 33 |
1 files changed, 17 insertions, 16 deletions
@@ -42,23 +42,22 @@ class DatabaseLockedError(DatabaseError): class FillPipeProcess(multiprocessing.Process): - def __init__(self, it, pipe_i, pipe_o, fun=(lambda x: x)): + def __init__(self, it, pipe, fun=(lambda x: x)): multiprocessing.Process.__init__(self) self.it = it - self.i = pipe_i - self.o = pipe_o + self.pipe = pipe[1] + #pipe[0].close() self.fun = fun def run(self): - try: - for a in self.it: - self.i.send(self.fun(a)) - self.o.close() - self.i.close() + #try: + for a in self.it: + self.pipe.send(self.fun(a)) + self.pipe.close() #self.terminate() - except IOError: + #except IOError: # looks like the main process exited, so we stop - pass + # pass class DBManager(object): @@ -196,12 +195,14 @@ class DBManager(object): def async(self, cbl, fun): """return a `Pipe` object to which `fun(a)` is written for each a in cbl""" - (i, o) = multiprocessing.Pipe(False) - t = FillPipeProcess(cbl(), o, i, fun) - t.start() - o.close() # guarantees that both ends are closed and i.close in child - # raises EOFError in parent - return (i, t) + pipe = multiprocessing.Pipe(False) + receiver, sender = pipe + process = FillPipeProcess(cbl(), pipe, fun) + process.start() + # closing the sending end in tis (receiving) process guarantees + # that here the apropriate EOFError is raisedupon .recv + sender.close() + return receiver, process def get_threads(self, querystring): """ |