from notmuch import Database, NotmuchError, XapianError import notmuch import multiprocessing import logging from collections import deque from message import Message from alot.settings import settings from thread import Thread DB_ENC = 'utf-8' class DatabaseError(Exception): pass class DatabaseROError(DatabaseError): """cannot write to read-only database""" pass class DatabaseLockedError(DatabaseError): """cannot write to locked index""" pass class NonexistantObjectError(DatabaseError): """requested thread or message does not exist in the index""" pass class FillPipeProcess(multiprocessing.Process): def __init__(self, it, pipe, fun=(lambda x: x)): multiprocessing.Process.__init__(self) self.it = it self.pipe = pipe[1] self.fun = fun def run(self): for a in self.it: self.pipe.send(self.fun(a)) self.pipe.close() class DBManager(object): """ Keeps track of your index parameters, maintains a write-queue and lets you look up threads and messages directly to the persistent wrapper classes. """ _sort_orders = { 'oldest_first': notmuch.database.Query.SORT.OLDEST_FIRST, 'newest_first': notmuch.database.Query.SORT.NEWEST_FIRST, 'unsorted': notmuch.database.Query.SORT.UNSORTED, 'message_id': notmuch.database.Query.SORT.MESSAGE_ID, } """constants representing sort orders""" def __init__(self, path=None, ro=False): """ :param path: absolute path to the notmuch index :type path: str :param ro: open the index in read-only mode :type ro: bool """ self.ro = ro self.path = path self.writequeue = deque([]) self.processes = [] def flush(self): """ write out all queued write-commands in order, each one in a separate :meth:`atomic ` transaction. If this fails the current action is rolled back, stays in the write queue and an exception is raised. You are responsible to retry flushing at a later time if you want to ensure that the cached changes are applied to the database. :exception: :exc:`DatabaseROError` if db is opened in read-only mode :exception: :exc:`DatabaseLockedError` if db is locked """ if self.ro: raise DatabaseROError() if self.writequeue: # aquire a writeable db handler try: mode = Database.MODE.READ_WRITE db = Database(path=self.path, mode=mode) except NotmuchError: raise DatabaseLockedError() # read notmuch's config regarding imap flag synchronization sync = settings.get_notmuch_setting('maildir', 'synchronize_flags') # go through writequeue entries while self.writequeue: current_item = self.writequeue.popleft() logging.debug('write-out item: %s' % str(current_item)) # watch out for notmuch errors to re-insert current_item # to the queue on errors try: # the first two coordinants are cnmdname and post-callback cmd, afterwards = current_item[:2] # make this a transaction db.begin_atomic() if cmd == 'add': path, tags = current_item[2:] msg, status = db.add_message(path, sync_maildir_flags=sync) msg.freeze() for tag in tags: msg.add_tag(tag.encode(DB_ENC), sync_maildir_flags=sync) msg.thaw() elif cmd == 'remove': path = current_item[2] db.remove_message(path) else: # tag/set/untag querystring, tags = current_item[2:] query = db.create_query(querystring) for msg in query.search_messages(): msg.freeze() if cmd == 'tag': for tag in tags: msg.add_tag(tag.encode(DB_ENC), sync_maildir_flags=sync) if cmd == 'set': msg.remove_all_tags() for tag in tags: msg.add_tag(tag.encode(DB_ENC), sync_maildir_flags=sync) elif cmd == 'untag': for tag in tags: msg.remove_tag(tag.encode(DB_ENC), sync_maildir_flags=sync) msg.thaw() # end transaction and reinsert queue item on error if db.end_atomic() != notmuch.STATUS.SUCCESS: raise DatabaseError('fail-status from end_atomic') # call post-callback if callable(afterwards): afterwards() # re-insert item to the queue upon Xapian/NotmuchErrors except (XapianError, NotmuchError) as e: self.writequeue.appendleft(current_item) raise DatabaseError(unicode(e)) def kill_search_processes(self): """ terminate all search processes that originate from this managers :meth:`get_threads`. """ for p in self.processes: p.terminate() self.processes = [] def tag(self, querystring, tags, afterwards=None, remove_rest=False): """ add tags to messages matching `querystring`. This appends a tag operation to the write queue and raises :exc:`DatabaseROError` if in read only mode. :param querystring: notmuch search string :type querystring: str :param tags: a list of tags to be added :type tags: list of str :param afterwards: callback that gets called after successful application of this tagging operation :type afterwards: callable :param remove_rest: remove tags from matching messages before tagging :type remove_rest: bool :exception: :exc:`DatabaseROError` .. note:: You need to call :meth:`DBManager.flush` to actually write out. """ if self.ro: raise DatabaseROError() if remove_rest: self.writequeue.append(('set', afterwards, querystring, tags)) else: self.writequeue.append(('tag', afterwards, querystring, tags)) def untag(self, querystring, tags, afterwards=None): """ removes tags from messages that match `querystring`. This appends an untag operation to the write queue and raises :exc:`DatabaseROError` if in read only mode. :param querystring: notmuch search string :type querystring: str :param tags: a list of tags to be added :type tags: list of str :param afterwards: callback that gets called after successful application of this tagging operation :type afterwards: callable :exception: :exc:`DatabaseROError` .. note:: You need to call :meth:`DBManager.flush` to actually write out. """ if self.ro: raise DatabaseROError() self.writequeue.append(('untag', afterwards, querystring, tags)) def count_messages(self, querystring): """returns number of messages that match `querystring`""" return self.query(querystring).count_messages() def search_thread_ids(self, querystring): """ returns the ids of all threads that match the `querystring` This copies! all integer thread ids into an new list. :returns: list of str """ return self.query_threaded(querystring) def _get_notmuch_thread(self, tid): """returns :class:`notmuch.database.Thread` with given id""" query = self.query('thread:' + tid) try: return query.search_threads().next() except StopIteration: errmsg = 'no thread with id %s exists!' % tid raise NonexistantObjectError(errmsg) def get_thread(self, tid): """returns :class:`Thread` with given thread id (str)""" return Thread(self, self._get_notmuch_thread(tid)) def _get_notmuch_message(self, mid): """returns :class:`notmuch.database.Message` with given id""" mode = Database.MODE.READ_ONLY db = Database(path=self.path, mode=mode) try: return db.find_message(mid) except: errmsg = 'no message with id %s exists!' % mid raise NonexistantObjectError(errmsg) def get_message(self, mid): """returns :class:`Message` with given message id (str)""" return Message(self, self._get_notmuch_message(mid)) def get_all_tags(self): """ returns all tagsstrings used in the database :rtype: list of str """ db = Database(path=self.path) return [t for t in db.get_all_tags()] def async(self, cbl, fun): """ return a pair (pipe, process) so that the process writes `fun(a)` to the pipe for each element `a` in the iterable returned by the callable `cbl`. :param cbl: a function returning something iterable :type cbl: callable :param fun: an unary translation function :type fun: callable :rtype: (:class:`multiprocessing.Pipe`, :class:`multiprocessing.Process`) """ pipe = multiprocessing.Pipe(False) receiver, sender = pipe process = FillPipeProcess(cbl(), pipe, fun) process.start() self.processes.append(process) # closing the sending end in this (receiving) process guarantees # that here the apropriate EOFError is raised upon .recv in the walker sender.close() return receiver, process def get_threads(self, querystring, sort='newest_first'): """ asynchronously look up thread ids matching `querystring`. :param querystring: The query string to use for the lookup :type querystring: str. :param sort: Sort order. one of ['oldest_first', 'newest_first', 'message_id', 'unsorted'] :type query: str :returns: a pipe together with the process that asynchronously writes to it. :rtype: (:class:`multiprocessing.Pipe`, :class:`multiprocessing.Process`) """ assert sort in self._sort_orders.keys() q = self.query(querystring) q.set_sort(self._sort_orders[sort]) return self.async(q.search_threads, (lambda a: a.get_thread_id())) def query(self, querystring): """ creates :class:`notmuch.Query` objects on demand :param querystring: The query string to use for the lookup :type query: str. :returns: :class:`notmuch.Query` -- the query object. """ mode = Database.MODE.READ_ONLY db = Database(path=self.path, mode=mode) return db.create_query(querystring) def add_message(self, path, tags=[], afterwards=None): """ Adds a file to the notmuch index. :param path: path to the file :type path: str :param tags: tagstrings to add :type tags: list of str :param afterwards: callback to trigger after adding :type afterwards: callable or None """ if self.ro: raise DatabaseROError() self.writequeue.append(('add', afterwards, path, tags)) def remove_message(self, message, afterwards=None): """ Remove a message from the notmuch index :param message: message to remove :type message: :class:`Message` :param afterwards: callback to trigger after removing :type afterwards: callable or None """ if self.ro: raise DatabaseROError() path = message.get_filename() self.writequeue.append(('remove', afterwards, path))