From 50293143ea58eca364d32bae14c422d4ba782d9f Mon Sep 17 00:00:00 2001 From: Patrick Totzke Date: Sun, 5 Aug 2012 12:58:07 +0100 Subject: cleanup: move DBManager to its own file --- alot/db/__init__.py | 361 +-------------------------------------------------- alot/db/manager.py | 363 ++++++++++++++++++++++++++++++++++++++++++++++++++++ alot/init.py | 2 +- 3 files changed, 365 insertions(+), 361 deletions(-) create mode 100644 alot/db/manager.py diff --git a/alot/db/__init__.py b/alot/db/__init__.py index beb630d6..ae067ce3 100644 --- a/alot/db/__init__.py +++ b/alot/db/__init__.py @@ -1,364 +1,5 @@ # Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file -from notmuch import Database, NotmuchError, XapianError -import notmuch -import multiprocessing -import logging -from collections import deque - -from message import Message -from alot.settings import settings -from thread import Thread -from errors import DatabaseError -from errors import DatabaseLockedError -from errors import DatabaseROError -from errors import NonexistantObjectError - -DB_ENC = 'utf-8' - - -class FillPipeProcess(multiprocessing.Process): - def __init__(self, it, pipe, fun=(lambda x: x)): - multiprocessing.Process.__init__(self) - self.it = it - self.pipe = pipe[1] - self.fun = fun - - def run(self): - for a in self.it: - self.pipe.send(self.fun(a)) - self.pipe.close() - - -class DBManager(object): - """ - Keeps track of your index parameters, maintains a write-queue and - lets you look up threads and messages directly to the persistent wrapper - classes. - """ - _sort_orders = { - 'oldest_first': notmuch.database.Query.SORT.OLDEST_FIRST, - 'newest_first': notmuch.database.Query.SORT.NEWEST_FIRST, - 'unsorted': notmuch.database.Query.SORT.UNSORTED, - 'message_id': notmuch.database.Query.SORT.MESSAGE_ID, - } - """constants representing sort orders""" - - def __init__(self, path=None, ro=False): - """ - :param path: absolute path to the notmuch index - :type path: str - :param ro: open the index in read-only mode - :type ro: bool - """ - self.ro = ro - self.path = path - self.writequeue = deque([]) - self.processes = [] - - def flush(self): - """ - write out all queued write-commands in order, each one in a separate - :meth:`atomic ` transaction. - - If this fails the current action is rolled back, stays in the write - queue and an exception is raised. - You are responsible to retry flushing at a later time if you want to - ensure that the cached changes are applied to the database. - - :exception: :exc:`~errors.DatabaseROError` if db is opened read-only - :exception: :exc:`~errors.DatabaseLockedError` if db is locked - """ - if self.ro: - raise DatabaseROError() - if self.writequeue: - # read notmuch's config regarding imap flag synchronization - sync = settings.get_notmuch_setting('maildir', 'synchronize_flags') - - # go through writequeue entries - while self.writequeue: - current_item = self.writequeue.popleft() - logging.debug('write-out item: %s' % str(current_item)) - - # watch out for notmuch errors to re-insert current_item - # to the queue on errors - try: - # the first two coordinants are cnmdname and post-callback - cmd, afterwards = current_item[:2] - logging.debug('cmd created') - - # aquire a writeable db handler - try: - mode = Database.MODE.READ_WRITE - db = Database(path=self.path, mode=mode) - except NotmuchError: - raise DatabaseLockedError() - logging.debug('got write lock') - - # make this a transaction - db.begin_atomic() - logging.debug('got atomic') - - if cmd == 'add': - logging.debug('add') - path, tags = current_item[2:] - msg, status = db.add_message(path, - sync_maildir_flags=sync) - logging.debug('added msg') - msg.freeze() - logging.debug('freeze') - for tag in tags: - msg.add_tag(tag.encode(DB_ENC), - sync_maildir_flags=sync) - logging.debug('added tags ') - msg.thaw() - logging.debug('thaw') - - elif cmd == 'remove': - path = current_item[2] - db.remove_message(path) - - else: # tag/set/untag - querystring, tags = current_item[2:] - query = db.create_query(querystring) - for msg in query.search_messages(): - msg.freeze() - if cmd == 'tag': - for tag in tags: - msg.add_tag(tag.encode(DB_ENC), - sync_maildir_flags=sync) - if cmd == 'set': - msg.remove_all_tags() - for tag in tags: - msg.add_tag(tag.encode(DB_ENC), - sync_maildir_flags=sync) - elif cmd == 'untag': - for tag in tags: - msg.remove_tag(tag.encode(DB_ENC), - sync_maildir_flags=sync) - msg.thaw() - - logging.debug('ended atomic') - # end transaction and reinsert queue item on error - if db.end_atomic() != notmuch.STATUS.SUCCESS: - raise DatabaseError('end_atomic failed') - logging.debug('ended atomic') - - # close db - db.close() - logging.debug('closed db') - - # call post-callback - if callable(afterwards): - logging.debug(str(afterwards)) - afterwards() - logging.debug('called callback') - - # re-insert item to the queue upon Xapian/NotmuchErrors - except (XapianError, NotmuchError) as e: - logging.exception(e) - self.writequeue.appendleft(current_item) - raise DatabaseError(unicode(e)) - except DatabaseLockedError as e: - logging.debug('index temporarily locked') - self.writequeue.appendleft(current_item) - raise e - logging.debug('flush finished') - - def kill_search_processes(self): - """ - terminate all search processes that originate from - this managers :meth:`get_threads`. - """ - for p in self.processes: - p.terminate() - self.processes = [] - - def tag(self, querystring, tags, afterwards=None, remove_rest=False): - """ - add tags to messages matching `querystring`. - This appends a tag operation to the write queue and raises - :exc:`~errors.DatabaseROError` if in read only mode. - - :param querystring: notmuch search string - :type querystring: str - :param tags: a list of tags to be added - :type tags: list of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :param remove_rest: remove tags from matching messages before tagging - :type remove_rest: bool - :exception: :exc:`~errors.DatabaseROError` - - .. note:: - This only adds the requested operation to the write queue. - You need to call :meth:`DBManager.flush` to actually write out. - """ - if self.ro: - raise DatabaseROError() - if remove_rest: - self.writequeue.append(('set', afterwards, querystring, tags)) - else: - self.writequeue.append(('tag', afterwards, querystring, tags)) - - def untag(self, querystring, tags, afterwards=None): - """ - removes tags from messages that match `querystring`. - This appends an untag operation to the write queue and raises - :exc:`~errors.DatabaseROError` if in read only mode. - - :param querystring: notmuch search string - :type querystring: str - :param tags: a list of tags to be added - :type tags: list of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :exception: :exc:`~errors.DatabaseROError` - - .. note:: - This only adds the requested operation to the write queue. - You need to call :meth:`DBManager.flush` to actually write out. - """ - if self.ro: - raise DatabaseROError() - self.writequeue.append(('untag', afterwards, querystring, tags)) - - def count_messages(self, querystring): - """returns number of messages that match `querystring`""" - return self.query(querystring).count_messages() - - def count_threads(self, querystring): - """returns number of threads that match `querystring`""" - return self.query(querystring).count_threads() - - def search_thread_ids(self, querystring): - """ - returns the ids of all threads that match the `querystring` - This copies! all integer thread ids into an new list. - - :returns: list of str - """ - - return self.query_threaded(querystring) - - def _get_notmuch_thread(self, tid): - """returns :class:`notmuch.database.Thread` with given id""" - query = self.query('thread:' + tid) - try: - return query.search_threads().next() - except StopIteration: - errmsg = 'no thread with id %s exists!' % tid - raise NonexistantObjectError(errmsg) - - def get_thread(self, tid): - """returns :class:`Thread` with given thread id (str)""" - return Thread(self, self._get_notmuch_thread(tid)) - - def _get_notmuch_message(self, mid): - """returns :class:`notmuch.database.Message` with given id""" - mode = Database.MODE.READ_ONLY - db = Database(path=self.path, mode=mode) - try: - return db.find_message(mid) - except: - errmsg = 'no message with id %s exists!' % mid - raise NonexistantObjectError(errmsg) - - def get_message(self, mid): - """returns :class:`Message` with given message id (str)""" - return Message(self, self._get_notmuch_message(mid)) - - def get_all_tags(self): - """ - returns all tagsstrings used in the database - :rtype: list of str - """ - db = Database(path=self.path) - return [t for t in db.get_all_tags()] - - def async(self, cbl, fun): - """ - return a pair (pipe, process) so that the process writes - `fun(a)` to the pipe for each element `a` in the iterable returned - by the callable `cbl`. - - :param cbl: a function returning something iterable - :type cbl: callable - :param fun: an unary translation function - :type fun: callable - :rtype: (:class:`multiprocessing.Pipe`, - :class:`multiprocessing.Process`) - """ - pipe = multiprocessing.Pipe(False) - receiver, sender = pipe - process = FillPipeProcess(cbl(), pipe, fun) - process.start() - self.processes.append(process) - # closing the sending end in this (receiving) process guarantees - # that here the apropriate EOFError is raised upon .recv in the walker - sender.close() - return receiver, process - - def get_threads(self, querystring, sort='newest_first'): - """ - asynchronously look up thread ids matching `querystring`. - - :param querystring: The query string to use for the lookup - :type querystring: str. - :param sort: Sort order. one of ['oldest_first', 'newest_first', - 'message_id', 'unsorted'] - :type query: str - :returns: a pipe together with the process that asynchronously - writes to it. - :rtype: (:class:`multiprocessing.Pipe`, - :class:`multiprocessing.Process`) - """ - assert sort in self._sort_orders.keys() - q = self.query(querystring) - q.set_sort(self._sort_orders[sort]) - return self.async(q.search_threads, (lambda a: a.get_thread_id())) - - def query(self, querystring): - """ - creates :class:`notmuch.Query` objects on demand - - :param querystring: The query string to use for the lookup - :type query: str. - :returns: :class:`notmuch.Query` -- the query object. - """ - mode = Database.MODE.READ_ONLY - db = Database(path=self.path, mode=mode) - return db.create_query(querystring) - - def add_message(self, path, tags=[], afterwards=None): - """ - Adds a file to the notmuch index. - - :param path: path to the file - :type path: str - :param tags: tagstrings to add - :type tags: list of str - :param afterwards: callback to trigger after adding - :type afterwards: callable or None - """ - if self.ro: - raise DatabaseROError() - self.writequeue.append(('add', afterwards, path, tags)) - - def remove_message(self, message, afterwards=None): - """ - Remove a message from the notmuch index - - :param message: message to remove - :type message: :class:`Message` - :param afterwards: callback to trigger after removing - :type afterwards: callable or None - """ - if self.ro: - raise DatabaseROError() - path = message.get_filename() - self.writequeue.append(('remove', afterwards, path)) +DB_ENC = 'UTF-8' diff --git a/alot/db/manager.py b/alot/db/manager.py new file mode 100644 index 00000000..c52da7b2 --- /dev/null +++ b/alot/db/manager.py @@ -0,0 +1,363 @@ +# Copyright (C) 2011-2012 Patrick Totzke +# This file is released under the GNU GPL, version 3 or a later revision. +# For further details see the COPYING file +from notmuch import Database, NotmuchError, XapianError +import notmuch +import multiprocessing +import logging + +from collections import deque + +from message import Message +from alot.settings import settings +from thread import Thread +from errors import DatabaseError +from errors import DatabaseLockedError +from errors import DatabaseROError +from errors import NonexistantObjectError +from alot.db import DB_ENC + + +class FillPipeProcess(multiprocessing.Process): + def __init__(self, it, pipe, fun=(lambda x: x)): + multiprocessing.Process.__init__(self) + self.it = it + self.pipe = pipe[1] + self.fun = fun + + def run(self): + for a in self.it: + self.pipe.send(self.fun(a)) + self.pipe.close() + + +class DBManager(object): + """ + Keeps track of your index parameters, maintains a write-queue and + lets you look up threads and messages directly to the persistent wrapper + classes. + """ + _sort_orders = { + 'oldest_first': notmuch.database.Query.SORT.OLDEST_FIRST, + 'newest_first': notmuch.database.Query.SORT.NEWEST_FIRST, + 'unsorted': notmuch.database.Query.SORT.UNSORTED, + 'message_id': notmuch.database.Query.SORT.MESSAGE_ID, + } + """constants representing sort orders""" + + def __init__(self, path=None, ro=False): + """ + :param path: absolute path to the notmuch index + :type path: str + :param ro: open the index in read-only mode + :type ro: bool + """ + self.ro = ro + self.path = path + self.writequeue = deque([]) + self.processes = [] + + def flush(self): + """ + write out all queued write-commands in order, each one in a separate + :meth:`atomic ` transaction. + + If this fails the current action is rolled back, stays in the write + queue and an exception is raised. + You are responsible to retry flushing at a later time if you want to + ensure that the cached changes are applied to the database. + + :exception: :exc:`~errors.DatabaseROError` if db is opened read-only + :exception: :exc:`~errors.DatabaseLockedError` if db is locked + """ + if self.ro: + raise DatabaseROError() + if self.writequeue: + # read notmuch's config regarding imap flag synchronization + sync = settings.get_notmuch_setting('maildir', 'synchronize_flags') + + # go through writequeue entries + while self.writequeue: + current_item = self.writequeue.popleft() + logging.debug('write-out item: %s' % str(current_item)) + + # watch out for notmuch errors to re-insert current_item + # to the queue on errors + try: + # the first two coordinants are cnmdname and post-callback + cmd, afterwards = current_item[:2] + logging.debug('cmd created') + + # aquire a writeable db handler + try: + mode = Database.MODE.READ_WRITE + db = Database(path=self.path, mode=mode) + except NotmuchError: + raise DatabaseLockedError() + logging.debug('got write lock') + + # make this a transaction + db.begin_atomic() + logging.debug('got atomic') + + if cmd == 'add': + logging.debug('add') + path, tags = current_item[2:] + msg, status = db.add_message(path, + sync_maildir_flags=sync) + logging.debug('added msg') + msg.freeze() + logging.debug('freeze') + for tag in tags: + msg.add_tag(tag.encode(DB_ENC), + sync_maildir_flags=sync) + logging.debug('added tags ') + msg.thaw() + logging.debug('thaw') + + elif cmd == 'remove': + path = current_item[2] + db.remove_message(path) + + else: # tag/set/untag + querystring, tags = current_item[2:] + query = db.create_query(querystring) + for msg in query.search_messages(): + msg.freeze() + if cmd == 'tag': + for tag in tags: + msg.add_tag(tag.encode(DB_ENC), + sync_maildir_flags=sync) + if cmd == 'set': + msg.remove_all_tags() + for tag in tags: + msg.add_tag(tag.encode(DB_ENC), + sync_maildir_flags=sync) + elif cmd == 'untag': + for tag in tags: + msg.remove_tag(tag.encode(DB_ENC), + sync_maildir_flags=sync) + msg.thaw() + + logging.debug('ended atomic') + # end transaction and reinsert queue item on error + if db.end_atomic() != notmuch.STATUS.SUCCESS: + raise DatabaseError('end_atomic failed') + logging.debug('ended atomic') + + # close db + db.close() + logging.debug('closed db') + + # call post-callback + if callable(afterwards): + logging.debug(str(afterwards)) + afterwards() + logging.debug('called callback') + + # re-insert item to the queue upon Xapian/NotmuchErrors + except (XapianError, NotmuchError) as e: + logging.exception(e) + self.writequeue.appendleft(current_item) + raise DatabaseError(unicode(e)) + except DatabaseLockedError as e: + logging.debug('index temporarily locked') + self.writequeue.appendleft(current_item) + raise e + logging.debug('flush finished') + + def kill_search_processes(self): + """ + terminate all search processes that originate from + this managers :meth:`get_threads`. + """ + for p in self.processes: + p.terminate() + self.processes = [] + + def tag(self, querystring, tags, afterwards=None, remove_rest=False): + """ + add tags to messages matching `querystring`. + This appends a tag operation to the write queue and raises + :exc:`~errors.DatabaseROError` if in read only mode. + + :param querystring: notmuch search string + :type querystring: str + :param tags: a list of tags to be added + :type tags: list of str + :param afterwards: callback that gets called after successful + application of this tagging operation + :type afterwards: callable + :param remove_rest: remove tags from matching messages before tagging + :type remove_rest: bool + :exception: :exc:`~errors.DatabaseROError` + + .. note:: + This only adds the requested operation to the write queue. + You need to call :meth:`DBManager.flush` to actually write out. + """ + if self.ro: + raise DatabaseROError() + if remove_rest: + self.writequeue.append(('set', afterwards, querystring, tags)) + else: + self.writequeue.append(('tag', afterwards, querystring, tags)) + + def untag(self, querystring, tags, afterwards=None): + """ + removes tags from messages that match `querystring`. + This appends an untag operation to the write queue and raises + :exc:`~errors.DatabaseROError` if in read only mode. + + :param querystring: notmuch search string + :type querystring: str + :param tags: a list of tags to be added + :type tags: list of str + :param afterwards: callback that gets called after successful + application of this tagging operation + :type afterwards: callable + :exception: :exc:`~errors.DatabaseROError` + + .. note:: + This only adds the requested operation to the write queue. + You need to call :meth:`DBManager.flush` to actually write out. + """ + if self.ro: + raise DatabaseROError() + self.writequeue.append(('untag', afterwards, querystring, tags)) + + def count_messages(self, querystring): + """returns number of messages that match `querystring`""" + return self.query(querystring).count_messages() + + def count_threads(self, querystring): + """returns number of threads that match `querystring`""" + return self.query(querystring).count_threads() + + def search_thread_ids(self, querystring): + """ + returns the ids of all threads that match the `querystring` + This copies! all integer thread ids into an new list. + + :returns: list of str + """ + + return self.query_threaded(querystring) + + def _get_notmuch_thread(self, tid): + """returns :class:`notmuch.database.Thread` with given id""" + query = self.query('thread:' + tid) + try: + return query.search_threads().next() + except StopIteration: + errmsg = 'no thread with id %s exists!' % tid + raise NonexistantObjectError(errmsg) + + def get_thread(self, tid): + """returns :class:`Thread` with given thread id (str)""" + return Thread(self, self._get_notmuch_thread(tid)) + + def _get_notmuch_message(self, mid): + """returns :class:`notmuch.database.Message` with given id""" + mode = Database.MODE.READ_ONLY + db = Database(path=self.path, mode=mode) + try: + return db.find_message(mid) + except: + errmsg = 'no message with id %s exists!' % mid + raise NonexistantObjectError(errmsg) + + def get_message(self, mid): + """returns :class:`Message` with given message id (str)""" + return Message(self, self._get_notmuch_message(mid)) + + def get_all_tags(self): + """ + returns all tagsstrings used in the database + :rtype: list of str + """ + db = Database(path=self.path) + return [t for t in db.get_all_tags()] + + def async(self, cbl, fun): + """ + return a pair (pipe, process) so that the process writes + `fun(a)` to the pipe for each element `a` in the iterable returned + by the callable `cbl`. + + :param cbl: a function returning something iterable + :type cbl: callable + :param fun: an unary translation function + :type fun: callable + :rtype: (:class:`multiprocessing.Pipe`, + :class:`multiprocessing.Process`) + """ + pipe = multiprocessing.Pipe(False) + receiver, sender = pipe + process = FillPipeProcess(cbl(), pipe, fun) + process.start() + self.processes.append(process) + # closing the sending end in this (receiving) process guarantees + # that here the apropriate EOFError is raised upon .recv in the walker + sender.close() + return receiver, process + + def get_threads(self, querystring, sort='newest_first'): + """ + asynchronously look up thread ids matching `querystring`. + + :param querystring: The query string to use for the lookup + :type querystring: str. + :param sort: Sort order. one of ['oldest_first', 'newest_first', + 'message_id', 'unsorted'] + :type query: str + :returns: a pipe together with the process that asynchronously + writes to it. + :rtype: (:class:`multiprocessing.Pipe`, + :class:`multiprocessing.Process`) + """ + assert sort in self._sort_orders.keys() + q = self.query(querystring) + q.set_sort(self._sort_orders[sort]) + return self.async(q.search_threads, (lambda a: a.get_thread_id())) + + def query(self, querystring): + """ + creates :class:`notmuch.Query` objects on demand + + :param querystring: The query string to use for the lookup + :type query: str. + :returns: :class:`notmuch.Query` -- the query object. + """ + mode = Database.MODE.READ_ONLY + db = Database(path=self.path, mode=mode) + return db.create_query(querystring) + + def add_message(self, path, tags=[], afterwards=None): + """ + Adds a file to the notmuch index. + + :param path: path to the file + :type path: str + :param tags: tagstrings to add + :type tags: list of str + :param afterwards: callback to trigger after adding + :type afterwards: callable or None + """ + if self.ro: + raise DatabaseROError() + self.writequeue.append(('add', afterwards, path, tags)) + + def remove_message(self, message, afterwards=None): + """ + Remove a message from the notmuch index + + :param message: message to remove + :type message: :class:`Message` + :param afterwards: callback to trigger after removing + :type afterwards: callable or None + """ + if self.ro: + raise DatabaseROError() + path = message.get_filename() + self.writequeue.append(('remove', afterwards, path)) diff --git a/alot/init.py b/alot/init.py index 732a674d..449eea30 100755 --- a/alot/init.py +++ b/alot/init.py @@ -7,7 +7,7 @@ import os from alot.settings import settings from alot.settings.errors import ConfigError -from alot.db import DBManager +from alot.db.manager import DBManager from alot.ui import UI import alot.commands as commands from alot.commands import * -- cgit v1.2.3