summaryrefslogtreecommitdiff
path: root/alot/db/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'alot/db/manager.py')
-rw-r--r--alot/db/manager.py363
1 files changed, 363 insertions, 0 deletions
diff --git a/alot/db/manager.py b/alot/db/manager.py
new file mode 100644
index 00000000..c52da7b2
--- /dev/null
+++ b/alot/db/manager.py
@@ -0,0 +1,363 @@
+# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
+# This file is released under the GNU GPL, version 3 or a later revision.
+# For further details see the COPYING file
+from notmuch import Database, NotmuchError, XapianError
+import notmuch
+import multiprocessing
+import logging
+
+from collections import deque
+
+from message import Message
+from alot.settings import settings
+from thread import Thread
+from errors import DatabaseError
+from errors import DatabaseLockedError
+from errors import DatabaseROError
+from errors import NonexistantObjectError
+from alot.db import DB_ENC
+
+
+class FillPipeProcess(multiprocessing.Process):
+ def __init__(self, it, pipe, fun=(lambda x: x)):
+ multiprocessing.Process.__init__(self)
+ self.it = it
+ self.pipe = pipe[1]
+ self.fun = fun
+
+ def run(self):
+ for a in self.it:
+ self.pipe.send(self.fun(a))
+ self.pipe.close()
+
+
+class DBManager(object):
+ """
+ Keeps track of your index parameters, maintains a write-queue and
+ lets you look up threads and messages directly to the persistent wrapper
+ classes.
+ """
+ _sort_orders = {
+ 'oldest_first': notmuch.database.Query.SORT.OLDEST_FIRST,
+ 'newest_first': notmuch.database.Query.SORT.NEWEST_FIRST,
+ 'unsorted': notmuch.database.Query.SORT.UNSORTED,
+ 'message_id': notmuch.database.Query.SORT.MESSAGE_ID,
+ }
+ """constants representing sort orders"""
+
+ def __init__(self, path=None, ro=False):
+ """
+ :param path: absolute path to the notmuch index
+ :type path: str
+ :param ro: open the index in read-only mode
+ :type ro: bool
+ """
+ self.ro = ro
+ self.path = path
+ self.writequeue = deque([])
+ self.processes = []
+
+ def flush(self):
+ """
+ write out all queued write-commands in order, each one in a separate
+ :meth:`atomic <notmuch.Database.begin_atomic>` transaction.
+
+ If this fails the current action is rolled back, stays in the write
+ queue and an exception is raised.
+ You are responsible to retry flushing at a later time if you want to
+ ensure that the cached changes are applied to the database.
+
+ :exception: :exc:`~errors.DatabaseROError` if db is opened read-only
+ :exception: :exc:`~errors.DatabaseLockedError` if db is locked
+ """
+ if self.ro:
+ raise DatabaseROError()
+ if self.writequeue:
+ # read notmuch's config regarding imap flag synchronization
+ sync = settings.get_notmuch_setting('maildir', 'synchronize_flags')
+
+ # go through writequeue entries
+ while self.writequeue:
+ current_item = self.writequeue.popleft()
+ logging.debug('write-out item: %s' % str(current_item))
+
+ # watch out for notmuch errors to re-insert current_item
+ # to the queue on errors
+ try:
+ # the first two coordinants are cnmdname and post-callback
+ cmd, afterwards = current_item[:2]
+ logging.debug('cmd created')
+
+ # aquire a writeable db handler
+ try:
+ mode = Database.MODE.READ_WRITE
+ db = Database(path=self.path, mode=mode)
+ except NotmuchError:
+ raise DatabaseLockedError()
+ logging.debug('got write lock')
+
+ # make this a transaction
+ db.begin_atomic()
+ logging.debug('got atomic')
+
+ if cmd == 'add':
+ logging.debug('add')
+ path, tags = current_item[2:]
+ msg, status = db.add_message(path,
+ sync_maildir_flags=sync)
+ logging.debug('added msg')
+ msg.freeze()
+ logging.debug('freeze')
+ for tag in tags:
+ msg.add_tag(tag.encode(DB_ENC),
+ sync_maildir_flags=sync)
+ logging.debug('added tags ')
+ msg.thaw()
+ logging.debug('thaw')
+
+ elif cmd == 'remove':
+ path = current_item[2]
+ db.remove_message(path)
+
+ else: # tag/set/untag
+ querystring, tags = current_item[2:]
+ query = db.create_query(querystring)
+ for msg in query.search_messages():
+ msg.freeze()
+ if cmd == 'tag':
+ for tag in tags:
+ msg.add_tag(tag.encode(DB_ENC),
+ sync_maildir_flags=sync)
+ if cmd == 'set':
+ msg.remove_all_tags()
+ for tag in tags:
+ msg.add_tag(tag.encode(DB_ENC),
+ sync_maildir_flags=sync)
+ elif cmd == 'untag':
+ for tag in tags:
+ msg.remove_tag(tag.encode(DB_ENC),
+ sync_maildir_flags=sync)
+ msg.thaw()
+
+ logging.debug('ended atomic')
+ # end transaction and reinsert queue item on error
+ if db.end_atomic() != notmuch.STATUS.SUCCESS:
+ raise DatabaseError('end_atomic failed')
+ logging.debug('ended atomic')
+
+ # close db
+ db.close()
+ logging.debug('closed db')
+
+ # call post-callback
+ if callable(afterwards):
+ logging.debug(str(afterwards))
+ afterwards()
+ logging.debug('called callback')
+
+ # re-insert item to the queue upon Xapian/NotmuchErrors
+ except (XapianError, NotmuchError) as e:
+ logging.exception(e)
+ self.writequeue.appendleft(current_item)
+ raise DatabaseError(unicode(e))
+ except DatabaseLockedError as e:
+ logging.debug('index temporarily locked')
+ self.writequeue.appendleft(current_item)
+ raise e
+ logging.debug('flush finished')
+
+ def kill_search_processes(self):
+ """
+ terminate all search processes that originate from
+ this managers :meth:`get_threads`.
+ """
+ for p in self.processes:
+ p.terminate()
+ self.processes = []
+
+ def tag(self, querystring, tags, afterwards=None, remove_rest=False):
+ """
+ add tags to messages matching `querystring`.
+ This appends a tag operation to the write queue and raises
+ :exc:`~errors.DatabaseROError` if in read only mode.
+
+ :param querystring: notmuch search string
+ :type querystring: str
+ :param tags: a list of tags to be added
+ :type tags: list of str
+ :param afterwards: callback that gets called after successful
+ application of this tagging operation
+ :type afterwards: callable
+ :param remove_rest: remove tags from matching messages before tagging
+ :type remove_rest: bool
+ :exception: :exc:`~errors.DatabaseROError`
+
+ .. note::
+ This only adds the requested operation to the write queue.
+ You need to call :meth:`DBManager.flush` to actually write out.
+ """
+ if self.ro:
+ raise DatabaseROError()
+ if remove_rest:
+ self.writequeue.append(('set', afterwards, querystring, tags))
+ else:
+ self.writequeue.append(('tag', afterwards, querystring, tags))
+
+ def untag(self, querystring, tags, afterwards=None):
+ """
+ removes tags from messages that match `querystring`.
+ This appends an untag operation to the write queue and raises
+ :exc:`~errors.DatabaseROError` if in read only mode.
+
+ :param querystring: notmuch search string
+ :type querystring: str
+ :param tags: a list of tags to be added
+ :type tags: list of str
+ :param afterwards: callback that gets called after successful
+ application of this tagging operation
+ :type afterwards: callable
+ :exception: :exc:`~errors.DatabaseROError`
+
+ .. note::
+ This only adds the requested operation to the write queue.
+ You need to call :meth:`DBManager.flush` to actually write out.
+ """
+ if self.ro:
+ raise DatabaseROError()
+ self.writequeue.append(('untag', afterwards, querystring, tags))
+
+ def count_messages(self, querystring):
+ """returns number of messages that match `querystring`"""
+ return self.query(querystring).count_messages()
+
+ def count_threads(self, querystring):
+ """returns number of threads that match `querystring`"""
+ return self.query(querystring).count_threads()
+
+ def search_thread_ids(self, querystring):
+ """
+ returns the ids of all threads that match the `querystring`
+ This copies! all integer thread ids into an new list.
+
+ :returns: list of str
+ """
+
+ return self.query_threaded(querystring)
+
+ def _get_notmuch_thread(self, tid):
+ """returns :class:`notmuch.database.Thread` with given id"""
+ query = self.query('thread:' + tid)
+ try:
+ return query.search_threads().next()
+ except StopIteration:
+ errmsg = 'no thread with id %s exists!' % tid
+ raise NonexistantObjectError(errmsg)
+
+ def get_thread(self, tid):
+ """returns :class:`Thread` with given thread id (str)"""
+ return Thread(self, self._get_notmuch_thread(tid))
+
+ def _get_notmuch_message(self, mid):
+ """returns :class:`notmuch.database.Message` with given id"""
+ mode = Database.MODE.READ_ONLY
+ db = Database(path=self.path, mode=mode)
+ try:
+ return db.find_message(mid)
+ except:
+ errmsg = 'no message with id %s exists!' % mid
+ raise NonexistantObjectError(errmsg)
+
+ def get_message(self, mid):
+ """returns :class:`Message` with given message id (str)"""
+ return Message(self, self._get_notmuch_message(mid))
+
+ def get_all_tags(self):
+ """
+ returns all tagsstrings used in the database
+ :rtype: list of str
+ """
+ db = Database(path=self.path)
+ return [t for t in db.get_all_tags()]
+
+ def async(self, cbl, fun):
+ """
+ return a pair (pipe, process) so that the process writes
+ `fun(a)` to the pipe for each element `a` in the iterable returned
+ by the callable `cbl`.
+
+ :param cbl: a function returning something iterable
+ :type cbl: callable
+ :param fun: an unary translation function
+ :type fun: callable
+ :rtype: (:class:`multiprocessing.Pipe`,
+ :class:`multiprocessing.Process`)
+ """
+ pipe = multiprocessing.Pipe(False)
+ receiver, sender = pipe
+ process = FillPipeProcess(cbl(), pipe, fun)
+ process.start()
+ self.processes.append(process)
+ # closing the sending end in this (receiving) process guarantees
+ # that here the apropriate EOFError is raised upon .recv in the walker
+ sender.close()
+ return receiver, process
+
+ def get_threads(self, querystring, sort='newest_first'):
+ """
+ asynchronously look up thread ids matching `querystring`.
+
+ :param querystring: The query string to use for the lookup
+ :type querystring: str.
+ :param sort: Sort order. one of ['oldest_first', 'newest_first',
+ 'message_id', 'unsorted']
+ :type query: str
+ :returns: a pipe together with the process that asynchronously
+ writes to it.
+ :rtype: (:class:`multiprocessing.Pipe`,
+ :class:`multiprocessing.Process`)
+ """
+ assert sort in self._sort_orders.keys()
+ q = self.query(querystring)
+ q.set_sort(self._sort_orders[sort])
+ return self.async(q.search_threads, (lambda a: a.get_thread_id()))
+
+ def query(self, querystring):
+ """
+ creates :class:`notmuch.Query` objects on demand
+
+ :param querystring: The query string to use for the lookup
+ :type query: str.
+ :returns: :class:`notmuch.Query` -- the query object.
+ """
+ mode = Database.MODE.READ_ONLY
+ db = Database(path=self.path, mode=mode)
+ return db.create_query(querystring)
+
+ def add_message(self, path, tags=[], afterwards=None):
+ """
+ Adds a file to the notmuch index.
+
+ :param path: path to the file
+ :type path: str
+ :param tags: tagstrings to add
+ :type tags: list of str
+ :param afterwards: callback to trigger after adding
+ :type afterwards: callable or None
+ """
+ if self.ro:
+ raise DatabaseROError()
+ self.writequeue.append(('add', afterwards, path, tags))
+
+ def remove_message(self, message, afterwards=None):
+ """
+ Remove a message from the notmuch index
+
+ :param message: message to remove
+ :type message: :class:`Message`
+ :param afterwards: callback to trigger after removing
+ :type afterwards: callable or None
+ """
+ if self.ro:
+ raise DatabaseROError()
+ path = message.get_filename()
+ self.writequeue.append(('remove', afterwards, path))