summaryrefslogtreecommitdiff
path: root/alot/db
diff options
context:
space:
mode:
Diffstat (limited to 'alot/db')
-rw-r--r--alot/db/manager.py130
1 files changed, 2 insertions, 128 deletions
diff --git a/alot/db/manager.py b/alot/db/manager.py
index cf452c42..c902eb54 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -2,13 +2,8 @@
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
from collections import deque
-import errno
import logging
-import multiprocessing
import os
-import signal
-import sys
-import threading
from notmuch2 import Database, NotmuchError, XapianError
@@ -28,47 +23,6 @@ def _is_subdir_of(subpath, superpath):
# e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
return os.path.commonprefix([subpath, superpath]) == superpath
-class FillPipeProcess(multiprocessing.Process):
-
- def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)):
- multiprocessing.Process.__init__(self)
- self.it = it
- self.pipe = pipe[1]
- self.fun = fun
- self.keep_going = True
- self.stdout = stdout
- self.stderr = stderr
-
- def handle_sigterm(self, signo, frame):
- # this is used to suppress any EINTR errors at interpreter
- # shutdown
- self.keep_going = False
-
- # raises SystemExit to shut down the interpreter from the
- # signal handler
- sys.exit()
-
- def run(self):
- # replace filedescriptors 1 and 2 (stdout and stderr) with
- # pipes to the parent process
- os.dup2(self.stdout, 1)
- os.dup2(self.stderr, 2)
-
- # register a signal handler for SIGTERM
- signal.signal(signal.SIGTERM, self.handle_sigterm)
-
- for a in self.it:
- try:
- self.pipe.send(self.fun(a))
- except IOError as e:
- # suppress spurious EINTR errors at interpreter
- # shutdown
- if e.errno != errno.EINTR or self.keep_going:
- raise
-
- self.pipe.close()
-
-
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -188,15 +142,6 @@ class DBManager:
raise e
logging.debug('flush finished')
- def kill_search_processes(self):
- """
- terminate all search processes that originate from
- this managers :meth:`get_threads`.
- """
- for p in self.processes:
- p.terminate()
- self.processes = []
-
def tag(self, querystring, tags, afterwards=None, remove_rest=False):
"""
add tags to messages matching `querystring`.
@@ -287,73 +232,6 @@ class DBManager:
queries = filter(lambda k: k.startswith(q_prefix), db.config)
return { q[len(q_prefix):] : db.config[q] for q in queries }
- def async_(self, seq, fun):
- """
- return a pair (pipe, process) so that the process writes
- `fun(a)` to the pipe for each element `a` in the iterable returned
- by the callable `seq`.
-
- :param fun: an unary translation function
- :type fun: callable
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
- """
- # create two unix pipes to redirect the workers stdout and
- # stderr
- stdout = os.pipe()
- stderr = os.pipe()
-
- # create a multiprocessing pipe for the results
- pipe = multiprocessing.Pipe(False)
- receiver, sender = pipe
-
- process = FillPipeProcess(seq, stdout[1], stderr[1], pipe, fun)
- process.start()
- self.processes.append(process)
- logging.debug('Worker process %s spawned', process.pid)
-
- def threaded_wait():
- # wait(2) for the process to die
- process.join()
-
- if process.exitcode < 0:
- msg = 'received signal {0}'.format(-process.exitcode)
- elif process.exitcode > 0:
- msg = 'returned error code {0}'.format(process.exitcode)
- else:
- msg = 'exited successfully'
-
- logging.debug('Worker process %s %s', process.pid, msg)
- self.processes.remove(process)
-
- # XXX: it would be much nicer to run this as a coroutine than a thread,
- # except that this code is executed before the eventloop is started.
- #
- # spawn a thread to collect the worker process once it dies
- # preventing it from hanging around as zombie
- threading.Thread(target=threaded_wait).start()
-
- # TODO: avoid this if logging level > debug
- def threaded_reader(prefix, fd):
- with os.fdopen(fd) as handle:
- for line in handle:
- logging.debug('Worker process %s said on %s: %s',
- process.pid, prefix, line.rstrip())
-
- # spawn two threads that read from the stdout and stderr pipes
- # and write anything that appears there to the log
- threading.Thread(target=threaded_reader,
- args=('stdout', stdout[0])).start()
- os.close(stdout[1])
- threading.Thread(target=threaded_reader,
- args=('stderr', stderr[0])).start()
- os.close(stderr[1])
-
- # closing the sending end in this (receiving) process guarantees
- # that here the appropriate EOFError is raised upon .recv in the walker
- sender.close()
- return receiver, process
-
def get_threads(self, querystring, sort='newest_first', exclude_tags = frozenset()):
"""
asynchronously look up thread ids matching `querystring`.
@@ -366,10 +244,7 @@ class DBManager:
:param exclude_tags: Tags to exclude by default unless included in the
search
:type exclude_tags: set of str
- :returns: a pipe together with the process that asynchronously
- writes to it.
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
+ :returns: iterator over thread ids
"""
# TODO: use a symbolic constant for this
assert sort in self._sort_orders
@@ -378,8 +253,7 @@ class DBManager:
sort = self._sort_orders[sort]
exclude_tags = self._exclude_tags | exclude_tags
- return self.async_(db.threads(querystring, sort = sort, exclude_tags = exclude_tags),
- lambda t: t.threadid)
+ return (t.threadid for t in db.threads(querystring, sort = sort, exclude_tags = exclude_tags))
def add_message(self, path, tags):
"""