From 14cd032638cdc5adf5a68d04280a642ee8662f94 Mon Sep 17 00:00:00 2001 From: Patrick Totzke Date: Thu, 10 Nov 2011 21:52:18 +0000 Subject: clean up piping architecture --- alot/buffers.py | 2 +- alot/db.py | 33 +++++++++++++++++---------------- alot/walker.py | 2 ++ 3 files changed, 20 insertions(+), 17 deletions(-) (limited to 'alot') diff --git a/alot/buffers.py b/alot/buffers.py index c3352289..351d84f6 100644 --- a/alot/buffers.py +++ b/alot/buffers.py @@ -172,7 +172,7 @@ class SearchBuffer(Buffer): self.result_count = self.dbman.count_messages(self.querystring) try: self.pipe, self.proc = self.dbman.get_threads(self.querystring) - except NotmuchError: #TODO: this never happens for malformed queries + except NotmuchError: self.ui.notify('malformed query string: %s' % self.querystring, 'error') self.listbox = urwid.ListBox(self.threadlist) diff --git a/alot/db.py b/alot/db.py index 244b5f9d..55d02149 100644 --- a/alot/db.py +++ b/alot/db.py @@ -42,23 +42,22 @@ class DatabaseLockedError(DatabaseError): class FillPipeProcess(multiprocessing.Process): - def __init__(self, it, pipe_i, pipe_o, fun=(lambda x: x)): + def __init__(self, it, pipe, fun=(lambda x: x)): multiprocessing.Process.__init__(self) self.it = it - self.i = pipe_i - self.o = pipe_o + self.pipe = pipe[1] + #pipe[0].close() self.fun = fun def run(self): - try: - for a in self.it: - self.i.send(self.fun(a)) - self.o.close() - self.i.close() + #try: + for a in self.it: + self.pipe.send(self.fun(a)) + self.pipe.close() #self.terminate() - except IOError: + #except IOError: # looks like the main process exited, so we stop - pass + # pass class DBManager(object): @@ -196,12 +195,14 @@ class DBManager(object): def async(self, cbl, fun): """return a `Pipe` object to which `fun(a)` is written for each a in cbl""" - (i, o) = multiprocessing.Pipe(False) - t = FillPipeProcess(cbl(), o, i, fun) - t.start() - o.close() # guarantees that both ends are closed and i.close in child - # raises EOFError in parent - return (i, t) + pipe = multiprocessing.Pipe(False) + receiver, sender = pipe + process = FillPipeProcess(cbl(), pipe, fun) + process.start() + # closing the sending end in tis (receiving) process guarantees + # that here the apropriate EOFError is raisedupon .recv + sender.close() + return receiver, process def get_threads(self, querystring): """ diff --git a/alot/walker.py b/alot/walker.py index 768485fe..ba2b2fe4 100644 --- a/alot/walker.py +++ b/alot/walker.py @@ -17,6 +17,7 @@ along with notmuch. If not, see . Copyright (C) 2011 Patrick Totzke """ import urwid +import logging class PipeWalker(urwid.ListWalker): @@ -75,6 +76,7 @@ class PipeWalker(urwid.ListWalker): next_widget = self.containerclass(next_obj, **self.kwargs) self.lines.append(next_widget) except EOFError: + logging.debug('EMPTY PIPE') next_widget = None self.empty = True return next_widget -- cgit v1.2.3