diff options
author | Anton Khirnov <anton@khirnov.net> | 2021-01-20 11:38:34 +0100 |
---|---|---|
committer | Anton Khirnov <anton@khirnov.net> | 2021-01-20 11:39:27 +0100 |
commit | d47a85bca83352300399f04e76abecea4d53a934 (patch) | |
tree | 741f5ce3857102f19e4d44c33dc93981b906268d /alot/db | |
parent | 3b78137e97565f90a48aad92dc471c47e63747eb (diff) |
db: make write operations async
Diffstat (limited to 'alot/db')
-rw-r--r-- | alot/db/errors.py | 6 | ||||
-rw-r--r-- | alot/db/manager.py | 332 | ||||
-rw-r--r-- | alot/db/message.py | 61 | ||||
-rw-r--r-- | alot/db/thread.py | 54 |
4 files changed, 225 insertions, 228 deletions
diff --git a/alot/db/errors.py b/alot/db/errors.py index 668ab34e..bd2564f9 100644 --- a/alot/db/errors.py +++ b/alot/db/errors.py @@ -13,12 +13,6 @@ class DatabaseROError(DatabaseError): pass -class DatabaseLockedError(DatabaseError): - - """cannot write to locked index""" - pass - - class NonexistantObjectError(DatabaseError): """requested thread or message does not exist in the index""" diff --git a/alot/db/manager.py b/alot/db/manager.py index f2828d65..ba9271fc 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -1,14 +1,15 @@ # Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file -from collections import deque + +import abc +import asyncio import logging import os -from notmuch2 import Database, NotmuchError, XapianError +from notmuch2 import Database, NotmuchError from .errors import DatabaseError -from .errors import DatabaseLockedError from .errors import DatabaseROError from .errors import NonexistantObjectError from .thread import Thread @@ -23,6 +24,90 @@ def _is_subdir_of(subpath, superpath): # e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b return os.path.commonprefix([subpath, superpath]) == superpath +# DB write operations +class _DBOperation(abc.ABC): + _dbman = None + _tags = None + + future = None + + def __init__(self, dbman, tags): + self._dbman = dbman + self._tags = tags + + self.future = dbman._loop.create_future() + + def apply(self): + logging.debug('Performing DB write: %s', self) + + try: + with Database(self._dbman.path, Database.MODE.READ_WRITE) as db: + logging.debug('got writeable DB') + + with db.atomic(): + self._apply(db) + except Exception as e: + logging.exception(e) + self.future.set_exception(e) + else: + logging.debug('DB write completed: %s', self) + self.future.set_result(True) + + @abc.abstractmethod + def _apply(self, db): + pass + + def __str__(self): + return '%s:%s' % (self.__class__.__name__, self._tags) + +class _DBOperationTagAdd(_DBOperation): + _query = None + + def __init__(self, dbman, tags, query): + self._query = query + + super().__init__(dbman, tags) + + def _apply(self, db): + for msg in db.messages(self._query): + msg_tags = msg.tags + msg_tags |= self._tags + + def __str__(self): + return '%s:%s' % (super().__str__(), self._query) + +class _DBOperationTagRemove(_DBOperationTagAdd): + def _apply(self, db): + for msg in db.messages(self._query): + msg_tags = msg.tags + msg_tags -= self._tags + +class _DBOperationTagSet(_DBOperationTagAdd): + def _apply(self, db): + for msg in db.messages(self._query): + msg_tags = msg.tags + property_tags = msg_tags & self._dbman._property_tags + + msg_tags.clear() + msg_tags |= self._tags | property_tags + +class _DBOperationMsgAdd(_DBOperation): + _path = None + + def __init__(self, dbman, tags, path): + self._path = path + + super().__init__(dbman, tags) + + def _apply(self, db): + msg, _ = db.add(self._path, self._dbman._sync_flags) + + msg_tags = msg.tags + msg_tags |= self._tags + + def __str__(self): + return '%s:%s' % (super().__str__(), self._path) + class DBManager: """ Keeps track of your index parameters, maintains a write-queue and @@ -37,10 +122,17 @@ class DBManager: } """constants representing sort orders""" + _loop = None + + _sync_flags = None + _exclude_tags = None _property_tags = None - def __init__(self, path=None, ro=False): + _write_task = None + _write_queue = None + + def __init__(self, loop, path=None, ro=False): """ :param path: absolute path to the notmuch index :type path: str @@ -49,7 +141,11 @@ class DBManager: """ self.ro = ro self.path = path - self.writequeue = deque([]) + + self._loop = loop + + # read notmuch's config regarding imap flag synchronization + self._sync_flags = settings.get_notmuch_setting('maildir', 'synchronize_flags') self._exclude_tags = frozenset(settings.get('exclude_tags')) self._property_tags = frozenset(settings.get('property_tags')) @@ -57,140 +153,7 @@ class DBManager: def _db_ro(self): return Database(path = self.path, mode = Database.MODE.READ_ONLY) - def flush(self): - """ - write out all queued write-commands in order, each one in a separate - :meth:`atomic <notmuch.Database.begin_atomic>` transaction. - - If this fails the current action is rolled back, stays in the write - queue and an exception is raised. - You are responsible to retry flushing at a later time if you want to - ensure that the cached changes are applied to the database. - - :exception: :exc:`~errors.DatabaseROError` if db is opened read-only - :exception: :exc:`~errors.DatabaseLockedError` if db is locked - """ - if self.ro: - raise DatabaseROError() - if not self.writequeue: - return - - # read notmuch's config regarding imap flag synchronization - sync = settings.get_notmuch_setting('maildir', 'synchronize_flags') - - # go through writequeue entries - while self.writequeue: - current_item = self.writequeue.popleft() - logging.debug('write-out item: %s', str(current_item)) - - # watch out for notmuch errors to re-insert current_item - # to the queue on errors - try: - # the first two coordinants are cnmdname and post-callback - cmd, afterwards = current_item[:2] - logging.debug('cmd created') - - # acquire a writeable db handler - try: - mode = Database.MODE.READ_WRITE - db = Database(path=self.path, mode=mode) - except NotmuchError: - raise DatabaseLockedError() - logging.debug('got write lock') - - with db.atomic(): - if cmd == 'add': - path, op_tags = current_item[2:] - - msg, _ = db.add(path, sync_flags = sync) - msg_tags = msg.tags - - msg_tags |= op_tags - else: # tag/set/untag - querystring, op_tags = current_item[2:] - for msg in db.messages(querystring): - with msg.frozen(): - msg_tags = msg.tags - if cmd == 'tag': - msg_tags |= op_tags - if cmd == 'set': - property_tags = msg_tags & self._property_tags - msg_tags.clear() - msg_tags |= op_tags | property_tags - elif cmd == 'untag': - msg_tags -= op_tags - - # close db - db.close() - logging.debug('closed db') - - # call post-callback - if callable(afterwards): - logging.debug(str(afterwards)) - afterwards() - logging.debug('called callback') - - # re-insert item to the queue upon Xapian/NotmuchErrors - except (XapianError, NotmuchError) as e: - logging.exception(e) - self.writequeue.appendleft(current_item) - raise DatabaseError(str(e)) - except DatabaseLockedError as e: - logging.debug('index temporarily locked') - self.writequeue.appendleft(current_item) - raise e - logging.debug('flush finished') - - def tag(self, querystring, tags, afterwards=None, remove_rest=False): - """ - add tags to messages matching `querystring`. - This appends a tag operation to the write queue and raises - :exc:`~errors.DatabaseROError` if in read only mode. - - :param querystring: notmuch search string - :type querystring: str - :param tags: a set of tags to be added - :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :param remove_rest: remove tags from matching messages before tagging - :type remove_rest: bool - :exception: :exc:`~errors.DatabaseROError` - - .. note:: - This only adds the requested operation to the write queue. - You need to call :meth:`DBManager.flush` to actually write out. - """ - if self.ro: - raise DatabaseROError() - if remove_rest: - self.writequeue.append(('set', afterwards, querystring, tags)) - else: - self.writequeue.append(('tag', afterwards, querystring, tags)) - - def untag(self, querystring, tags, afterwards=None): - """ - removes tags from messages that match `querystring`. - This appends an untag operation to the write queue and raises - :exc:`~errors.DatabaseROError` if in read only mode. - :param querystring: notmuch search string - :type querystring: str - :param tags: a list of tags to be added - :type tags: list of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :exception: :exc:`~errors.DatabaseROError` - - .. note:: - This only adds the requested operation to the write queue. - You need to call :meth:`DBManager.flush` to actually write out. - """ - if self.ro: - raise DatabaseROError() - self.writequeue.append(('untag', afterwards, querystring, tags)) def count_messages(self, querystring): """returns number of messages that match `querystring`""" @@ -254,17 +217,96 @@ class DBManager: return (t.threadid for t in db.threads(querystring, sort = sort, exclude_tags = exclude_tags)) - def add_message(self, path, tags): + async def startup(self): + self._write_queue = asyncio.Queue() + self._write_task = asyncio.create_task(self._db_write_task()) + + async def shutdown(self): + if self._write_task: + await self._write_queue.put(None) + await self._write_task + + async def _db_write_task(self): + # this task serialises write operations on the database and + # sends them off to a thread so they do not block the event loop + while True: + cur_item = await self._write_queue.get() + if cur_item is None: + self._write_queue.task_done() + break + + logging.debug('submitting write task: %s', cur_item) + + await self._loop.run_in_executor(None, cur_item.apply) + self._write_queue.task_done() + + def tags_add(self, query, tags): + """ + Asynchronously add tags to messages matching `querystring`. + + :param querystring: notmuch search string + :type querystring: str + :param tags: a set of tags to be added + :type tags: set of str + """ + if self.ro: + raise DatabaseROError() + + if not tags: + ret = self._loop.create_future() + ret.set_result(True) + return ret + + op = _DBOperationTagAdd(self, tags, query) + self._write_queue.put_nowait(op) + return op.future + + def tags_remove(self, query, tags): """ - Adds a file to the notmuch index. + Asynchronously remove tags to messages matching `querystring`. + + :param querystring: notmuch search string + :type querystring: str + :param tags: a set of tags to be added + :type tags: set of str + """ + if self.ro: + raise DatabaseROError() + + if not tags: + ret = self._loop.create_future() + ret.set_result(True) + return ret + + op = _DBOperationTagRemove(self, tags, query) + self._write_queue.put_nowait(op) + return op.future + + def tags_set(self, query, tags): + """ + Asynchronously set tags to messages matching `querystring`. + + :param querystring: notmuch search string + :type querystring: str + :param tags: a set of tags to be added + :type tags: set of str + """ + if self.ro: + raise DatabaseROError() + + op = _DBOperationTagSet(self, tags, query) + self._write_queue.put_nowait(op) + return op.future + + def msg_add(self, path, tags): + """ + Asynchronously add a file to the notmuch index. :param path: path to the file :type path: str :param tags: tagstrings to add :type tags: list of str """ - tags = tags or [] - if self.ro: raise DatabaseROError() if not _is_subdir_of(path, self.path): @@ -272,5 +314,7 @@ class DBManager: msg += ' is not below notmuchs ' msg += 'root path (%s)' % self.path raise DatabaseError(msg) - else: - self.writequeue.append(('add', None, path, tags)) + + op = _DBOperationMsgAdd(self, tags, path) + self._write_queue.put_nowait(op) + return op.future diff --git a/alot/db/message.py b/alot/db/message.py index 21373e5e..836a9963 100644 --- a/alot/db/message.py +++ b/alot/db/message.py @@ -466,57 +466,46 @@ class Message: """ return email.utils.parseaddr(self._from) - def add_tags(self, tags, afterwards=None, remove_rest=False): + def tags_add(self, tags): """ - adds tags to message - - .. note:: - - This only adds the requested operation to this objects - :class:`DBManager's <alot.db.DBManager>` write queue. - You need to call :meth:`~alot.db.DBManager.flush` to write out. + Asynchronously add tags to message :param tags: a set of tags to be added :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :param remove_rest: remove all other tags - :type remove_rest: bool """ - def myafterwards(): - if remove_rest: - self._tags = set(tags) - else: - self._tags = self._tags.union(tags) - if callable(afterwards): - afterwards() + def myafterwards(fut): + self._tags = self._tags.union(tags) - self._dbman.tag('id:' + self.id, tags, afterwards=myafterwards, - remove_rest=remove_rest) - self._tags = self._tags.union(tags) + fut = self._dbman.tags_add('id:' + self.id, tags) + fut.add_done_callback(myafterwards) + return fut - def remove_tags(self, tags, afterwards=None): - """remove tags from message + def tags_set(self, tags): + """ + Asynchronously set tags for a message + + :param tags: a set of new tags to replace the existing set + :type tags: set of str + """ + def myafterwards(fut): + self._tags = set(tags) - .. note:: + fut = self._dbman.tags_set('id:' + self.id, tags) + fut.add_done_callback(myafterwards) + return fut - This only adds the requested operation to this objects - :class:`DBManager's <alot.db.DBManager>` write queue. - You need to call :meth:`~alot.db.DBManager.flush` to actually out. + def tags_remove(self, tags): + """Asynchronously remove tags from message :param tags: a set of tags to be removed :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable """ - def myafterwards(): + def myafterwards(fut): self._tags = self._tags.difference(tags) - if callable(afterwards): - afterwards() - self._dbman.untag('id:' + self.id, tags, myafterwards) + fut = self._dbman.tags_remove('id:' + self.id, tags) + fut.add_done_callback(myafterwards) + return fut def iter_attachments(self): """ diff --git a/alot/db/thread.py b/alot/db/thread.py index 2b580dcf..1f9ae7f0 100644 --- a/alot/db/thread.py +++ b/alot/db/thread.py @@ -63,7 +63,7 @@ class Thread: self.refresh(thread) - def refresh(self, thread=None): + def refresh(self, thread = None): """refresh thread metadata from the index""" if not thread: thread = self._dbman._get_notmuch_thread(self.id) @@ -140,58 +140,28 @@ class Thread: tags = tags.intersection(set(m.get_tags())) return tags - def add_tags(self, tags, afterwards=None, remove_rest=False): + def tags_add(self, tags): """ - add `tags` to all messages in this thread - - .. note:: - - This only adds the requested operation to this objects - :class:`DBManager's <alot.db.DBManager>` write queue. - You need to call :meth:`DBManager.flush <alot.db.DBManager.flush>` - to actually write out. + Asynchronously add `tags` to all messages in this thread :param tags: a set of tags to be added :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :param remove_rest: remove all other tags - :type remove_rest: bool """ - def myafterwards(): - self.refresh() - if callable(afterwards): - afterwards() - self._dbman.tag('thread:' + self.id, tags, afterwards=myafterwards, - remove_rest=remove_rest) + fut = self._dbman.tags_add('thread:' + self.id, tags) + fut.add_done_callback(lambda fut: self.refresh()) + return fut - def remove_tags(self, tags, afterwards=None): + def tags_remove(self, tags): """ - remove `tags` (list of str) from all messages in this thread - - .. note:: - - This only adds the requested operation to this objects - :class:`DBManager's <alot.db.DBManager>` write queue. - You need to call :meth:`DBManager.flush <alot.db.DBManager.flush>` - to actually write out. - + Asynchronously remove `tags` from all messages in this thread :param tags: a set of tags to be added :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable """ - rmtags = set(tags).intersection(self._tags) - if rmtags: - - def myafterwards(): - self.refresh() - if callable(afterwards): - afterwards() - self._dbman.untag('thread:' + self.id, tags, myafterwards) + fut = self._dbman.tags_remove('thread:' + self.id, tags) + fut.add_done_callback(lambda fut: self.refresh()) + return fut + def get_authors(self): """ |