summaryrefslogtreecommitdiff
path: root/alot/db
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2021-01-20 11:38:34 +0100
committerAnton Khirnov <anton@khirnov.net>2021-01-20 11:39:27 +0100
commitd47a85bca83352300399f04e76abecea4d53a934 (patch)
tree741f5ce3857102f19e4d44c33dc93981b906268d /alot/db
parent3b78137e97565f90a48aad92dc471c47e63747eb (diff)
db: make write operations async
Diffstat (limited to 'alot/db')
-rw-r--r--alot/db/errors.py6
-rw-r--r--alot/db/manager.py332
-rw-r--r--alot/db/message.py61
-rw-r--r--alot/db/thread.py54
4 files changed, 225 insertions, 228 deletions
diff --git a/alot/db/errors.py b/alot/db/errors.py
index 668ab34e..bd2564f9 100644
--- a/alot/db/errors.py
+++ b/alot/db/errors.py
@@ -13,12 +13,6 @@ class DatabaseROError(DatabaseError):
pass
-class DatabaseLockedError(DatabaseError):
-
- """cannot write to locked index"""
- pass
-
-
class NonexistantObjectError(DatabaseError):
"""requested thread or message does not exist in the index"""
diff --git a/alot/db/manager.py b/alot/db/manager.py
index f2828d65..ba9271fc 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -1,14 +1,15 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
-from collections import deque
+
+import abc
+import asyncio
import logging
import os
-from notmuch2 import Database, NotmuchError, XapianError
+from notmuch2 import Database, NotmuchError
from .errors import DatabaseError
-from .errors import DatabaseLockedError
from .errors import DatabaseROError
from .errors import NonexistantObjectError
from .thread import Thread
@@ -23,6 +24,90 @@ def _is_subdir_of(subpath, superpath):
# e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
return os.path.commonprefix([subpath, superpath]) == superpath
+# DB write operations
+class _DBOperation(abc.ABC):
+ _dbman = None
+ _tags = None
+
+ future = None
+
+ def __init__(self, dbman, tags):
+ self._dbman = dbman
+ self._tags = tags
+
+ self.future = dbman._loop.create_future()
+
+ def apply(self):
+ logging.debug('Performing DB write: %s', self)
+
+ try:
+ with Database(self._dbman.path, Database.MODE.READ_WRITE) as db:
+ logging.debug('got writeable DB')
+
+ with db.atomic():
+ self._apply(db)
+ except Exception as e:
+ logging.exception(e)
+ self.future.set_exception(e)
+ else:
+ logging.debug('DB write completed: %s', self)
+ self.future.set_result(True)
+
+ @abc.abstractmethod
+ def _apply(self, db):
+ pass
+
+ def __str__(self):
+ return '%s:%s' % (self.__class__.__name__, self._tags)
+
+class _DBOperationTagAdd(_DBOperation):
+ _query = None
+
+ def __init__(self, dbman, tags, query):
+ self._query = query
+
+ super().__init__(dbman, tags)
+
+ def _apply(self, db):
+ for msg in db.messages(self._query):
+ msg_tags = msg.tags
+ msg_tags |= self._tags
+
+ def __str__(self):
+ return '%s:%s' % (super().__str__(), self._query)
+
+class _DBOperationTagRemove(_DBOperationTagAdd):
+ def _apply(self, db):
+ for msg in db.messages(self._query):
+ msg_tags = msg.tags
+ msg_tags -= self._tags
+
+class _DBOperationTagSet(_DBOperationTagAdd):
+ def _apply(self, db):
+ for msg in db.messages(self._query):
+ msg_tags = msg.tags
+ property_tags = msg_tags & self._dbman._property_tags
+
+ msg_tags.clear()
+ msg_tags |= self._tags | property_tags
+
+class _DBOperationMsgAdd(_DBOperation):
+ _path = None
+
+ def __init__(self, dbman, tags, path):
+ self._path = path
+
+ super().__init__(dbman, tags)
+
+ def _apply(self, db):
+ msg, _ = db.add(self._path, self._dbman._sync_flags)
+
+ msg_tags = msg.tags
+ msg_tags |= self._tags
+
+ def __str__(self):
+ return '%s:%s' % (super().__str__(), self._path)
+
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -37,10 +122,17 @@ class DBManager:
}
"""constants representing sort orders"""
+ _loop = None
+
+ _sync_flags = None
+
_exclude_tags = None
_property_tags = None
- def __init__(self, path=None, ro=False):
+ _write_task = None
+ _write_queue = None
+
+ def __init__(self, loop, path=None, ro=False):
"""
:param path: absolute path to the notmuch index
:type path: str
@@ -49,7 +141,11 @@ class DBManager:
"""
self.ro = ro
self.path = path
- self.writequeue = deque([])
+
+ self._loop = loop
+
+ # read notmuch's config regarding imap flag synchronization
+ self._sync_flags = settings.get_notmuch_setting('maildir', 'synchronize_flags')
self._exclude_tags = frozenset(settings.get('exclude_tags'))
self._property_tags = frozenset(settings.get('property_tags'))
@@ -57,140 +153,7 @@ class DBManager:
def _db_ro(self):
return Database(path = self.path, mode = Database.MODE.READ_ONLY)
- def flush(self):
- """
- write out all queued write-commands in order, each one in a separate
- :meth:`atomic <notmuch.Database.begin_atomic>` transaction.
-
- If this fails the current action is rolled back, stays in the write
- queue and an exception is raised.
- You are responsible to retry flushing at a later time if you want to
- ensure that the cached changes are applied to the database.
-
- :exception: :exc:`~errors.DatabaseROError` if db is opened read-only
- :exception: :exc:`~errors.DatabaseLockedError` if db is locked
- """
- if self.ro:
- raise DatabaseROError()
- if not self.writequeue:
- return
-
- # read notmuch's config regarding imap flag synchronization
- sync = settings.get_notmuch_setting('maildir', 'synchronize_flags')
-
- # go through writequeue entries
- while self.writequeue:
- current_item = self.writequeue.popleft()
- logging.debug('write-out item: %s', str(current_item))
-
- # watch out for notmuch errors to re-insert current_item
- # to the queue on errors
- try:
- # the first two coordinants are cnmdname and post-callback
- cmd, afterwards = current_item[:2]
- logging.debug('cmd created')
-
- # acquire a writeable db handler
- try:
- mode = Database.MODE.READ_WRITE
- db = Database(path=self.path, mode=mode)
- except NotmuchError:
- raise DatabaseLockedError()
- logging.debug('got write lock')
-
- with db.atomic():
- if cmd == 'add':
- path, op_tags = current_item[2:]
-
- msg, _ = db.add(path, sync_flags = sync)
- msg_tags = msg.tags
-
- msg_tags |= op_tags
- else: # tag/set/untag
- querystring, op_tags = current_item[2:]
- for msg in db.messages(querystring):
- with msg.frozen():
- msg_tags = msg.tags
- if cmd == 'tag':
- msg_tags |= op_tags
- if cmd == 'set':
- property_tags = msg_tags & self._property_tags
- msg_tags.clear()
- msg_tags |= op_tags | property_tags
- elif cmd == 'untag':
- msg_tags -= op_tags
-
- # close db
- db.close()
- logging.debug('closed db')
-
- # call post-callback
- if callable(afterwards):
- logging.debug(str(afterwards))
- afterwards()
- logging.debug('called callback')
-
- # re-insert item to the queue upon Xapian/NotmuchErrors
- except (XapianError, NotmuchError) as e:
- logging.exception(e)
- self.writequeue.appendleft(current_item)
- raise DatabaseError(str(e))
- except DatabaseLockedError as e:
- logging.debug('index temporarily locked')
- self.writequeue.appendleft(current_item)
- raise e
- logging.debug('flush finished')
-
- def tag(self, querystring, tags, afterwards=None, remove_rest=False):
- """
- add tags to messages matching `querystring`.
- This appends a tag operation to the write queue and raises
- :exc:`~errors.DatabaseROError` if in read only mode.
-
- :param querystring: notmuch search string
- :type querystring: str
- :param tags: a set of tags to be added
- :type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :param remove_rest: remove tags from matching messages before tagging
- :type remove_rest: bool
- :exception: :exc:`~errors.DatabaseROError`
-
- .. note::
- This only adds the requested operation to the write queue.
- You need to call :meth:`DBManager.flush` to actually write out.
- """
- if self.ro:
- raise DatabaseROError()
- if remove_rest:
- self.writequeue.append(('set', afterwards, querystring, tags))
- else:
- self.writequeue.append(('tag', afterwards, querystring, tags))
-
- def untag(self, querystring, tags, afterwards=None):
- """
- removes tags from messages that match `querystring`.
- This appends an untag operation to the write queue and raises
- :exc:`~errors.DatabaseROError` if in read only mode.
- :param querystring: notmuch search string
- :type querystring: str
- :param tags: a list of tags to be added
- :type tags: list of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :exception: :exc:`~errors.DatabaseROError`
-
- .. note::
- This only adds the requested operation to the write queue.
- You need to call :meth:`DBManager.flush` to actually write out.
- """
- if self.ro:
- raise DatabaseROError()
- self.writequeue.append(('untag', afterwards, querystring, tags))
def count_messages(self, querystring):
"""returns number of messages that match `querystring`"""
@@ -254,17 +217,96 @@ class DBManager:
return (t.threadid for t in db.threads(querystring, sort = sort, exclude_tags = exclude_tags))
- def add_message(self, path, tags):
+ async def startup(self):
+ self._write_queue = asyncio.Queue()
+ self._write_task = asyncio.create_task(self._db_write_task())
+
+ async def shutdown(self):
+ if self._write_task:
+ await self._write_queue.put(None)
+ await self._write_task
+
+ async def _db_write_task(self):
+ # this task serialises write operations on the database and
+ # sends them off to a thread so they do not block the event loop
+ while True:
+ cur_item = await self._write_queue.get()
+ if cur_item is None:
+ self._write_queue.task_done()
+ break
+
+ logging.debug('submitting write task: %s', cur_item)
+
+ await self._loop.run_in_executor(None, cur_item.apply)
+ self._write_queue.task_done()
+
+ def tags_add(self, query, tags):
+ """
+ Asynchronously add tags to messages matching `querystring`.
+
+ :param querystring: notmuch search string
+ :type querystring: str
+ :param tags: a set of tags to be added
+ :type tags: set of str
+ """
+ if self.ro:
+ raise DatabaseROError()
+
+ if not tags:
+ ret = self._loop.create_future()
+ ret.set_result(True)
+ return ret
+
+ op = _DBOperationTagAdd(self, tags, query)
+ self._write_queue.put_nowait(op)
+ return op.future
+
+ def tags_remove(self, query, tags):
"""
- Adds a file to the notmuch index.
+ Asynchronously remove tags to messages matching `querystring`.
+
+ :param querystring: notmuch search string
+ :type querystring: str
+ :param tags: a set of tags to be added
+ :type tags: set of str
+ """
+ if self.ro:
+ raise DatabaseROError()
+
+ if not tags:
+ ret = self._loop.create_future()
+ ret.set_result(True)
+ return ret
+
+ op = _DBOperationTagRemove(self, tags, query)
+ self._write_queue.put_nowait(op)
+ return op.future
+
+ def tags_set(self, query, tags):
+ """
+ Asynchronously set tags to messages matching `querystring`.
+
+ :param querystring: notmuch search string
+ :type querystring: str
+ :param tags: a set of tags to be added
+ :type tags: set of str
+ """
+ if self.ro:
+ raise DatabaseROError()
+
+ op = _DBOperationTagSet(self, tags, query)
+ self._write_queue.put_nowait(op)
+ return op.future
+
+ def msg_add(self, path, tags):
+ """
+ Asynchronously add a file to the notmuch index.
:param path: path to the file
:type path: str
:param tags: tagstrings to add
:type tags: list of str
"""
- tags = tags or []
-
if self.ro:
raise DatabaseROError()
if not _is_subdir_of(path, self.path):
@@ -272,5 +314,7 @@ class DBManager:
msg += ' is not below notmuchs '
msg += 'root path (%s)' % self.path
raise DatabaseError(msg)
- else:
- self.writequeue.append(('add', None, path, tags))
+
+ op = _DBOperationMsgAdd(self, tags, path)
+ self._write_queue.put_nowait(op)
+ return op.future
diff --git a/alot/db/message.py b/alot/db/message.py
index 21373e5e..836a9963 100644
--- a/alot/db/message.py
+++ b/alot/db/message.py
@@ -466,57 +466,46 @@ class Message:
"""
return email.utils.parseaddr(self._from)
- def add_tags(self, tags, afterwards=None, remove_rest=False):
+ def tags_add(self, tags):
"""
- adds tags to message
-
- .. note::
-
- This only adds the requested operation to this objects
- :class:`DBManager's <alot.db.DBManager>` write queue.
- You need to call :meth:`~alot.db.DBManager.flush` to write out.
+ Asynchronously add tags to message
:param tags: a set of tags to be added
:type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :param remove_rest: remove all other tags
- :type remove_rest: bool
"""
- def myafterwards():
- if remove_rest:
- self._tags = set(tags)
- else:
- self._tags = self._tags.union(tags)
- if callable(afterwards):
- afterwards()
+ def myafterwards(fut):
+ self._tags = self._tags.union(tags)
- self._dbman.tag('id:' + self.id, tags, afterwards=myafterwards,
- remove_rest=remove_rest)
- self._tags = self._tags.union(tags)
+ fut = self._dbman.tags_add('id:' + self.id, tags)
+ fut.add_done_callback(myafterwards)
+ return fut
- def remove_tags(self, tags, afterwards=None):
- """remove tags from message
+ def tags_set(self, tags):
+ """
+ Asynchronously set tags for a message
+
+ :param tags: a set of new tags to replace the existing set
+ :type tags: set of str
+ """
+ def myafterwards(fut):
+ self._tags = set(tags)
- .. note::
+ fut = self._dbman.tags_set('id:' + self.id, tags)
+ fut.add_done_callback(myafterwards)
+ return fut
- This only adds the requested operation to this objects
- :class:`DBManager's <alot.db.DBManager>` write queue.
- You need to call :meth:`~alot.db.DBManager.flush` to actually out.
+ def tags_remove(self, tags):
+ """Asynchronously remove tags from message
:param tags: a set of tags to be removed
:type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
"""
- def myafterwards():
+ def myafterwards(fut):
self._tags = self._tags.difference(tags)
- if callable(afterwards):
- afterwards()
- self._dbman.untag('id:' + self.id, tags, myafterwards)
+ fut = self._dbman.tags_remove('id:' + self.id, tags)
+ fut.add_done_callback(myafterwards)
+ return fut
def iter_attachments(self):
"""
diff --git a/alot/db/thread.py b/alot/db/thread.py
index 2b580dcf..1f9ae7f0 100644
--- a/alot/db/thread.py
+++ b/alot/db/thread.py
@@ -63,7 +63,7 @@ class Thread:
self.refresh(thread)
- def refresh(self, thread=None):
+ def refresh(self, thread = None):
"""refresh thread metadata from the index"""
if not thread:
thread = self._dbman._get_notmuch_thread(self.id)
@@ -140,58 +140,28 @@ class Thread:
tags = tags.intersection(set(m.get_tags()))
return tags
- def add_tags(self, tags, afterwards=None, remove_rest=False):
+ def tags_add(self, tags):
"""
- add `tags` to all messages in this thread
-
- .. note::
-
- This only adds the requested operation to this objects
- :class:`DBManager's <alot.db.DBManager>` write queue.
- You need to call :meth:`DBManager.flush <alot.db.DBManager.flush>`
- to actually write out.
+ Asynchronously add `tags` to all messages in this thread
:param tags: a set of tags to be added
:type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :param remove_rest: remove all other tags
- :type remove_rest: bool
"""
- def myafterwards():
- self.refresh()
- if callable(afterwards):
- afterwards()
- self._dbman.tag('thread:' + self.id, tags, afterwards=myafterwards,
- remove_rest=remove_rest)
+ fut = self._dbman.tags_add('thread:' + self.id, tags)
+ fut.add_done_callback(lambda fut: self.refresh())
+ return fut
- def remove_tags(self, tags, afterwards=None):
+ def tags_remove(self, tags):
"""
- remove `tags` (list of str) from all messages in this thread
-
- .. note::
-
- This only adds the requested operation to this objects
- :class:`DBManager's <alot.db.DBManager>` write queue.
- You need to call :meth:`DBManager.flush <alot.db.DBManager.flush>`
- to actually write out.
-
+ Asynchronously remove `tags` from all messages in this thread
:param tags: a set of tags to be added
:type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
"""
- rmtags = set(tags).intersection(self._tags)
- if rmtags:
-
- def myafterwards():
- self.refresh()
- if callable(afterwards):
- afterwards()
- self._dbman.untag('thread:' + self.id, tags, myafterwards)
+ fut = self._dbman.tags_remove('thread:' + self.id, tags)
+ fut.add_done_callback(lambda fut: self.refresh())
+ return fut
+
def get_authors(self):
"""