summaryrefslogtreecommitdiff
path: root/alot/db/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'alot/db/__init__.py')
-rw-r--r--alot/db/__init__.py361
1 files changed, 2 insertions, 359 deletions
diff --git a/alot/db/__init__.py b/alot/db/__init__.py
index beb630d6..f61142ea 100644
--- a/alot/db/__init__.py
+++ b/alot/db/__init__.py
@@ -1,364 +1,7 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
-from notmuch import Database, NotmuchError, XapianError
-import notmuch
-import multiprocessing
-import logging
-from collections import deque
-
-from message import Message
-from alot.settings import settings
from thread import Thread
-from errors import DatabaseError
-from errors import DatabaseLockedError
-from errors import DatabaseROError
-from errors import NonexistantObjectError
-
-DB_ENC = 'utf-8'
-
-
-class FillPipeProcess(multiprocessing.Process):
- def __init__(self, it, pipe, fun=(lambda x: x)):
- multiprocessing.Process.__init__(self)
- self.it = it
- self.pipe = pipe[1]
- self.fun = fun
-
- def run(self):
- for a in self.it:
- self.pipe.send(self.fun(a))
- self.pipe.close()
-
-
-class DBManager(object):
- """
- Keeps track of your index parameters, maintains a write-queue and
- lets you look up threads and messages directly to the persistent wrapper
- classes.
- """
- _sort_orders = {
- 'oldest_first': notmuch.database.Query.SORT.OLDEST_FIRST,
- 'newest_first': notmuch.database.Query.SORT.NEWEST_FIRST,
- 'unsorted': notmuch.database.Query.SORT.UNSORTED,
- 'message_id': notmuch.database.Query.SORT.MESSAGE_ID,
- }
- """constants representing sort orders"""
-
- def __init__(self, path=None, ro=False):
- """
- :param path: absolute path to the notmuch index
- :type path: str
- :param ro: open the index in read-only mode
- :type ro: bool
- """
- self.ro = ro
- self.path = path
- self.writequeue = deque([])
- self.processes = []
-
- def flush(self):
- """
- write out all queued write-commands in order, each one in a separate
- :meth:`atomic <notmuch.Database.begin_atomic>` transaction.
-
- If this fails the current action is rolled back, stays in the write
- queue and an exception is raised.
- You are responsible to retry flushing at a later time if you want to
- ensure that the cached changes are applied to the database.
-
- :exception: :exc:`~errors.DatabaseROError` if db is opened read-only
- :exception: :exc:`~errors.DatabaseLockedError` if db is locked
- """
- if self.ro:
- raise DatabaseROError()
- if self.writequeue:
- # read notmuch's config regarding imap flag synchronization
- sync = settings.get_notmuch_setting('maildir', 'synchronize_flags')
-
- # go through writequeue entries
- while self.writequeue:
- current_item = self.writequeue.popleft()
- logging.debug('write-out item: %s' % str(current_item))
-
- # watch out for notmuch errors to re-insert current_item
- # to the queue on errors
- try:
- # the first two coordinants are cnmdname and post-callback
- cmd, afterwards = current_item[:2]
- logging.debug('cmd created')
-
- # aquire a writeable db handler
- try:
- mode = Database.MODE.READ_WRITE
- db = Database(path=self.path, mode=mode)
- except NotmuchError:
- raise DatabaseLockedError()
- logging.debug('got write lock')
-
- # make this a transaction
- db.begin_atomic()
- logging.debug('got atomic')
-
- if cmd == 'add':
- logging.debug('add')
- path, tags = current_item[2:]
- msg, status = db.add_message(path,
- sync_maildir_flags=sync)
- logging.debug('added msg')
- msg.freeze()
- logging.debug('freeze')
- for tag in tags:
- msg.add_tag(tag.encode(DB_ENC),
- sync_maildir_flags=sync)
- logging.debug('added tags ')
- msg.thaw()
- logging.debug('thaw')
-
- elif cmd == 'remove':
- path = current_item[2]
- db.remove_message(path)
-
- else: # tag/set/untag
- querystring, tags = current_item[2:]
- query = db.create_query(querystring)
- for msg in query.search_messages():
- msg.freeze()
- if cmd == 'tag':
- for tag in tags:
- msg.add_tag(tag.encode(DB_ENC),
- sync_maildir_flags=sync)
- if cmd == 'set':
- msg.remove_all_tags()
- for tag in tags:
- msg.add_tag(tag.encode(DB_ENC),
- sync_maildir_flags=sync)
- elif cmd == 'untag':
- for tag in tags:
- msg.remove_tag(tag.encode(DB_ENC),
- sync_maildir_flags=sync)
- msg.thaw()
-
- logging.debug('ended atomic')
- # end transaction and reinsert queue item on error
- if db.end_atomic() != notmuch.STATUS.SUCCESS:
- raise DatabaseError('end_atomic failed')
- logging.debug('ended atomic')
-
- # close db
- db.close()
- logging.debug('closed db')
-
- # call post-callback
- if callable(afterwards):
- logging.debug(str(afterwards))
- afterwards()
- logging.debug('called callback')
-
- # re-insert item to the queue upon Xapian/NotmuchErrors
- except (XapianError, NotmuchError) as e:
- logging.exception(e)
- self.writequeue.appendleft(current_item)
- raise DatabaseError(unicode(e))
- except DatabaseLockedError as e:
- logging.debug('index temporarily locked')
- self.writequeue.appendleft(current_item)
- raise e
- logging.debug('flush finished')
-
- def kill_search_processes(self):
- """
- terminate all search processes that originate from
- this managers :meth:`get_threads`.
- """
- for p in self.processes:
- p.terminate()
- self.processes = []
-
- def tag(self, querystring, tags, afterwards=None, remove_rest=False):
- """
- add tags to messages matching `querystring`.
- This appends a tag operation to the write queue and raises
- :exc:`~errors.DatabaseROError` if in read only mode.
-
- :param querystring: notmuch search string
- :type querystring: str
- :param tags: a list of tags to be added
- :type tags: list of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :param remove_rest: remove tags from matching messages before tagging
- :type remove_rest: bool
- :exception: :exc:`~errors.DatabaseROError`
-
- .. note::
- This only adds the requested operation to the write queue.
- You need to call :meth:`DBManager.flush` to actually write out.
- """
- if self.ro:
- raise DatabaseROError()
- if remove_rest:
- self.writequeue.append(('set', afterwards, querystring, tags))
- else:
- self.writequeue.append(('tag', afterwards, querystring, tags))
-
- def untag(self, querystring, tags, afterwards=None):
- """
- removes tags from messages that match `querystring`.
- This appends an untag operation to the write queue and raises
- :exc:`~errors.DatabaseROError` if in read only mode.
-
- :param querystring: notmuch search string
- :type querystring: str
- :param tags: a list of tags to be added
- :type tags: list of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :exception: :exc:`~errors.DatabaseROError`
-
- .. note::
- This only adds the requested operation to the write queue.
- You need to call :meth:`DBManager.flush` to actually write out.
- """
- if self.ro:
- raise DatabaseROError()
- self.writequeue.append(('untag', afterwards, querystring, tags))
-
- def count_messages(self, querystring):
- """returns number of messages that match `querystring`"""
- return self.query(querystring).count_messages()
-
- def count_threads(self, querystring):
- """returns number of threads that match `querystring`"""
- return self.query(querystring).count_threads()
-
- def search_thread_ids(self, querystring):
- """
- returns the ids of all threads that match the `querystring`
- This copies! all integer thread ids into an new list.
-
- :returns: list of str
- """
-
- return self.query_threaded(querystring)
-
- def _get_notmuch_thread(self, tid):
- """returns :class:`notmuch.database.Thread` with given id"""
- query = self.query('thread:' + tid)
- try:
- return query.search_threads().next()
- except StopIteration:
- errmsg = 'no thread with id %s exists!' % tid
- raise NonexistantObjectError(errmsg)
-
- def get_thread(self, tid):
- """returns :class:`Thread` with given thread id (str)"""
- return Thread(self, self._get_notmuch_thread(tid))
-
- def _get_notmuch_message(self, mid):
- """returns :class:`notmuch.database.Message` with given id"""
- mode = Database.MODE.READ_ONLY
- db = Database(path=self.path, mode=mode)
- try:
- return db.find_message(mid)
- except:
- errmsg = 'no message with id %s exists!' % mid
- raise NonexistantObjectError(errmsg)
-
- def get_message(self, mid):
- """returns :class:`Message` with given message id (str)"""
- return Message(self, self._get_notmuch_message(mid))
-
- def get_all_tags(self):
- """
- returns all tagsstrings used in the database
- :rtype: list of str
- """
- db = Database(path=self.path)
- return [t for t in db.get_all_tags()]
-
- def async(self, cbl, fun):
- """
- return a pair (pipe, process) so that the process writes
- `fun(a)` to the pipe for each element `a` in the iterable returned
- by the callable `cbl`.
-
- :param cbl: a function returning something iterable
- :type cbl: callable
- :param fun: an unary translation function
- :type fun: callable
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
- """
- pipe = multiprocessing.Pipe(False)
- receiver, sender = pipe
- process = FillPipeProcess(cbl(), pipe, fun)
- process.start()
- self.processes.append(process)
- # closing the sending end in this (receiving) process guarantees
- # that here the apropriate EOFError is raised upon .recv in the walker
- sender.close()
- return receiver, process
-
- def get_threads(self, querystring, sort='newest_first'):
- """
- asynchronously look up thread ids matching `querystring`.
-
- :param querystring: The query string to use for the lookup
- :type querystring: str.
- :param sort: Sort order. one of ['oldest_first', 'newest_first',
- 'message_id', 'unsorted']
- :type query: str
- :returns: a pipe together with the process that asynchronously
- writes to it.
- :rtype: (:class:`multiprocessing.Pipe`,
- :class:`multiprocessing.Process`)
- """
- assert sort in self._sort_orders.keys()
- q = self.query(querystring)
- q.set_sort(self._sort_orders[sort])
- return self.async(q.search_threads, (lambda a: a.get_thread_id()))
-
- def query(self, querystring):
- """
- creates :class:`notmuch.Query` objects on demand
-
- :param querystring: The query string to use for the lookup
- :type query: str.
- :returns: :class:`notmuch.Query` -- the query object.
- """
- mode = Database.MODE.READ_ONLY
- db = Database(path=self.path, mode=mode)
- return db.create_query(querystring)
-
- def add_message(self, path, tags=[], afterwards=None):
- """
- Adds a file to the notmuch index.
-
- :param path: path to the file
- :type path: str
- :param tags: tagstrings to add
- :type tags: list of str
- :param afterwards: callback to trigger after adding
- :type afterwards: callable or None
- """
- if self.ro:
- raise DatabaseROError()
- self.writequeue.append(('add', afterwards, path, tags))
-
- def remove_message(self, message, afterwards=None):
- """
- Remove a message from the notmuch index
-
- :param message: message to remove
- :type message: :class:`Message`
- :param afterwards: callback to trigger after removing
- :type afterwards: callable or None
- """
- if self.ro:
- raise DatabaseROError()
- path = message.get_filename()
- self.writequeue.append(('remove', afterwards, path))
+from message import Message
+DB_ENC = 'UTF-8'