# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file from collections import deque import errno import logging import multiprocessing import os import signal import sys import threading from notmuch2 import Database, NotmuchError, XapianError from .errors import DatabaseError from .errors import DatabaseLockedError from .errors import DatabaseROError from .errors import NonexistantObjectError from .thread import Thread from ..settings.const import settings def _is_subdir_of(subpath, superpath): # make both absolute superpath = os.path.realpath(superpath) subpath = os.path.realpath(subpath) # return true, if the common prefix of both is equal to directory # e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b return os.path.commonprefix([subpath, superpath]) == superpath class FillPipeProcess(multiprocessing.Process): def __init__(self, it, stdout, stderr, pipe, fun=(lambda x: x)): multiprocessing.Process.__init__(self) self.it = it self.pipe = pipe[1] self.fun = fun self.keep_going = True self.stdout = stdout self.stderr = stderr def handle_sigterm(self, signo, frame): # this is used to suppress any EINTR errors at interpreter # shutdown self.keep_going = False # raises SystemExit to shut down the interpreter from the # signal handler sys.exit() def run(self): # replace filedescriptors 1 and 2 (stdout and stderr) with # pipes to the parent process os.dup2(self.stdout, 1) os.dup2(self.stderr, 2) # register a signal handler for SIGTERM signal.signal(signal.SIGTERM, self.handle_sigterm) for a in self.it: try: self.pipe.send(self.fun(a)) except IOError as e: # suppress spurious EINTR errors at interpreter # shutdown if e.errno != errno.EINTR or self.keep_going: raise self.pipe.close() class DBManager: """ Keeps track of your index parameters, maintains a write-queue and lets you look up threads and messages directly to the persistent wrapper classes. """ _sort_orders = { 'oldest_first': Database.SORT.OLDEST_FIRST, 'newest_first': Database.SORT.NEWEST_FIRST, 'unsorted': Database.SORT.UNSORTED, 'message_id': Database.SORT.MESSAGE_ID, } """constants representing sort orders""" _exclude_tags = None def __init__(self, path=None, ro=False): """ :param path: absolute path to the notmuch index :type path: str :param ro: open the index in read-only mode :type ro: bool """ self.ro = ro self.path = path self.writequeue = deque([]) self.processes = [] self._exclude_tags = frozenset(settings.get('exclude_tags')) def _db_ro(self): return Database(path = self.path, mode = Database.MODE.READ_ONLY) def flush(self): """ write out all queued write-commands in order, each one in a separate :meth:`atomic ` transaction. If this fails the current action is rolled back, stays in the write queue and an exception is raised. You are responsible to retry flushing at a later time if you want to ensure that the cached changes are applied to the database. :exception: :exc:`~errors.DatabaseROError` if db is opened read-only :exception: :exc:`~errors.DatabaseLockedError` if db is locked """ if self.ro: raise DatabaseROError() if not self.writequeue: return # read notmuch's config regarding imap flag synchronization sync = settings.get_notmuch_setting('maildir', 'synchronize_flags') # go through writequeue entries while self.writequeue: current_item = self.writequeue.popleft() logging.debug('write-out item: %s', str(current_item)) # watch out for notmuch errors to re-insert current_item # to the queue on errors try: # the first two coordinants are cnmdname and post-callback cmd, afterwards = current_item[:2] logging.debug('cmd created') # acquire a writeable db handler try: mode = Database.MODE.READ_WRITE db = Database(path=self.path, mode=mode) except NotmuchError: raise DatabaseLockedError() logging.debug('got write lock') with db.atomic(): if cmd == 'add': path, op_tags = current_item[2:] msg, _ = db.add(path, sync_flags = sync) msg_tags = msg.tags msg_tags |= op_tags else: # tag/set/untag querystring, op_tags = current_item[2:] for msg in db.messages(querystring): with msg.frozen(): msg_tags = msg.tags if cmd == 'tag': msg_tags |= op_tags if cmd == 'set': msg_tags.clear() msg_tags |= op_tags elif cmd == 'untag': msg_tags -= op_tags # close db db.close() logging.debug('closed db') # call post-callback if callable(afterwards): logging.debug(str(afterwards)) afterwards() logging.debug('called callback') # re-insert item to the queue upon Xapian/NotmuchErrors except (XapianError, NotmuchError) as e: logging.exception(e) self.writequeue.appendleft(current_item) raise DatabaseError(str(e)) except DatabaseLockedError as e: logging.debug('index temporarily locked') self.writequeue.appendleft(current_item) raise e logging.debug('flush finished') def kill_search_processes(self): """ terminate all search processes that originate from this managers :meth:`get_threads`. """ for p in self.processes: p.terminate() self.processes = [] def tag(self, querystring, tags, afterwards=None, remove_rest=False): """ add tags to messages matching `querystring`. This appends a tag operation to the write queue and raises :exc:`~errors.DatabaseROError` if in read only mode. :param querystring: notmuch search string :type querystring: str :param tags: a set of tags to be added :type tags: set of str :param afterwards: callback that gets called after successful application of this tagging operation :type afterwards: callable :param remove_rest: remove tags from matching messages before tagging :type remove_rest: bool :exception: :exc:`~errors.DatabaseROError` .. note:: This only adds the requested operation to the write queue. You need to call :meth:`DBManager.flush` to actually write out. """ if self.ro: raise DatabaseROError() if remove_rest: self.writequeue.append(('set', afterwards, querystring, tags)) else: self.writequeue.append(('tag', afterwards, querystring, tags)) def untag(self, querystring, tags, afterwards=None): """ removes tags from messages that match `querystring`. This appends an untag operation to the write queue and raises :exc:`~errors.DatabaseROError` if in read only mode. :param querystring: notmuch search string :type querystring: str :param tags: a list of tags to be added :type tags: list of str :param afterwards: callback that gets called after successful application of this tagging operation :type afterwards: callable :exception: :exc:`~errors.DatabaseROError` .. note:: This only adds the requested operation to the write queue. You need to call :meth:`DBManager.flush` to actually write out. """ if self.ro: raise DatabaseROError() self.writequeue.append(('untag', afterwards, querystring, tags)) def count_messages(self, querystring): """returns number of messages that match `querystring`""" return self._db_ro().count_messages(querystring, exclude_tags = self._exclude_tags) def count_threads(self, querystring): """returns number of threads that match `querystring`""" return self._db_ro().count_threads(querystring, exclude_tags = self._exclude_tags) def _get_notmuch_thread(self, tid): """returns :class:`notmuch.database.Thread` with given id""" querystr = 'thread:' + tid try: return next(self._db_ro().threads(querystr, exclude_tags = self._exclude_tags)) except StopIteration: errmsg = 'no thread with id %s exists!' % tid raise NonexistantObjectError(errmsg) def get_thread(self, tid): """returns :class:`Thread` with given thread id (str)""" return Thread(self, self._get_notmuch_thread(tid)) def get_all_tags(self): """ returns all tagsstrings used in the database :rtype: set of str """ return self._db_ro().tags def get_named_queries(self): """ returns the named queries stored in the database. :rtype: dict (str -> str) mapping alias to full query string """ q_prefix = 'query.' db = self._db_ro() queries = filter(lambda k: k.startswith(q_prefix), db.config) return { q[len(q_prefix):] : db.config[q] for q in queries } def async_(self, seq, fun): """ return a pair (pipe, process) so that the process writes `fun(a)` to the pipe for each element `a` in the iterable returned by the callable `seq`. :param fun: an unary translation function :type fun: callable :rtype: (:class:`multiprocessing.Pipe`, :class:`multiprocessing.Process`) """ # create two unix pipes to redirect the workers stdout and # stderr stdout = os.pipe() stderr = os.pipe() # create a multiprocessing pipe for the results pipe = multiprocessing.Pipe(False) receiver, sender = pipe process = FillPipeProcess(seq, stdout[1], stderr[1], pipe, fun) process.start() self.processes.append(process) logging.debug('Worker process %s spawned', process.pid) def threaded_wait(): # wait(2) for the process to die process.join() if process.exitcode < 0: msg = 'received signal {0}'.format(-process.exitcode) elif process.exitcode > 0: msg = 'returned error code {0}'.format(process.exitcode) else: msg = 'exited successfully' logging.debug('Worker process %s %s', process.pid, msg) self.processes.remove(process) # XXX: it would be much nicer to run this as a coroutine than a thread, # except that this code is executed before the eventloop is started. # # spawn a thread to collect the worker process once it dies # preventing it from hanging around as zombie threading.Thread(target=threaded_wait).start() # TODO: avoid this if logging level > debug def threaded_reader(prefix, fd): with os.fdopen(fd) as handle: for line in handle: logging.debug('Worker process %s said on %s: %s', process.pid, prefix, line.rstrip()) # spawn two threads that read from the stdout and stderr pipes # and write anything that appears there to the log threading.Thread(target=threaded_reader, args=('stdout', stdout[0])).start() os.close(stdout[1]) threading.Thread(target=threaded_reader, args=('stderr', stderr[0])).start() os.close(stderr[1]) # closing the sending end in this (receiving) process guarantees # that here the appropriate EOFError is raised upon .recv in the walker sender.close() return receiver, process def get_threads(self, querystring, sort='newest_first', exclude_tags = frozenset()): """ asynchronously look up thread ids matching `querystring`. :param querystring: The query string to use for the lookup :type querystring: str. :param sort: Sort order. one of ['oldest_first', 'newest_first', 'message_id', 'unsorted'] :type query: str :param exclude_tags: Tags to exclude by default unless included in the search :type exclude_tags: set of str :returns: a pipe together with the process that asynchronously writes to it. :rtype: (:class:`multiprocessing.Pipe`, :class:`multiprocessing.Process`) """ # TODO: use a symbolic constant for this assert sort in self._sort_orders db = self._db_ro() sort = self._sort_orders[sort] exclude_tags = self._exclude_tags | exclude_tags return self.async_(db.threads(querystring, sort = sort, exclude_tags = exclude_tags), lambda t: t.threadid) def add_message(self, path, tags): """ Adds a file to the notmuch index. :param path: path to the file :type path: str :param tags: tagstrings to add :type tags: list of str """ tags = tags or [] if self.ro: raise DatabaseROError() if not _is_subdir_of(path, self.path): msg = 'message path %s ' % path msg += ' is not below notmuchs ' msg += 'root path (%s)' % self.path raise DatabaseError(msg) else: self.writequeue.append(('add', None, path, tags))