summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2021-01-20 11:38:34 +0100
committerAnton Khirnov <anton@khirnov.net>2021-01-20 11:39:27 +0100
commitd47a85bca83352300399f04e76abecea4d53a934 (patch)
tree741f5ce3857102f19e4d44c33dc93981b906268d
parent3b78137e97565f90a48aad92dc471c47e63747eb (diff)
db: make write operations async
-rw-r--r--alot/__main__.py7
-rw-r--r--alot/commands/envelope.py10
-rw-r--r--alot/commands/globals.py57
-rw-r--r--alot/commands/search.py34
-rw-r--r--alot/commands/thread.py31
-rw-r--r--alot/db/errors.py6
-rw-r--r--alot/db/manager.py332
-rw-r--r--alot/db/message.py61
-rw-r--r--alot/db/thread.py54
-rw-r--r--alot/defaults/alot.rc.spec4
-rw-r--r--alot/defaults/default.bindings1
-rw-r--r--alot/ui.py13
12 files changed, 257 insertions, 353 deletions
diff --git a/alot/__main__.py b/alot/__main__.py
index d09a84f6..37d74f3b 100644
--- a/alot/__main__.py
+++ b/alot/__main__.py
@@ -2,6 +2,7 @@
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
import argparse
+import asyncio
import logging
import os
import sys
@@ -119,10 +120,12 @@ def main():
if options.colour_mode:
settings.set('colourmode', options.colour_mode)
+ loop = asyncio.get_event_loop()
+
# get ourselves a database manager
indexpath = settings.get_notmuch_setting('database', 'path')
indexpath = options.mailindex_path or indexpath
- dbman = DBManager(path=indexpath, ro=options.read_only)
+ dbman = DBManager(loop, path=indexpath, ro=options.read_only)
# determine what to do
if command is None:
@@ -134,7 +137,7 @@ def main():
cmdstring = ' '.join(options.command)
# set up and start interface
- UI(dbman, cmdstring)
+ UI(loop, dbman, cmdstring)
# run the exit hook
exit_hook = settings.get_hook('exit')
diff --git a/alot/commands/envelope.py b/alot/commands/envelope.py
index 1f806581..457fe48a 100644
--- a/alot/commands/envelope.py
+++ b/alot/commands/envelope.py
@@ -140,8 +140,7 @@ class SaveCommand(Command):
ui.notify(msg + ' to %s' % path)
logging.debug('adding new mail to index')
try:
- ui.dbman.add_message(path, account.draft_tags + envelope.tags)
- await ui.apply_command(globals.FlushCommand())
+ await ui.dbman.msg_add(path, account.draft_tags | envelope.tags)
await ui.apply_command(commands.globals.BufferCloseCommand())
except DatabaseError as e:
logging.error(str(e))
@@ -295,9 +294,9 @@ class SendCommand(Command):
ui.notify('mail sent successfully')
if self.envelope is not None:
if self.envelope.replied:
- self.envelope.replied.add_tags(account.replied_tags)
+ await self.envelope.replied.tags_add(account.replied_tags)
if self.envelope.passed:
- self.envelope.passed.add_tags(account.passed_tags)
+ await self.envelope.passed.tags_add(account.passed_tags)
# store mail locally
# This can raise StoreMailError
@@ -306,8 +305,7 @@ class SendCommand(Command):
# add mail to index if maildir path available
if path is not None:
logging.debug('adding new mail to index')
- ui.dbman.add_message(path, account.sent_tags | initial_tags)
- await ui.apply_command(globals.FlushCommand())
+ await ui.dbman.msg_add(path, account.sent_tags | initial_tags)
@registerCommand(MODE, 'edit', arguments=[
diff --git a/alot/commands/globals.py b/alot/commands/globals.py
index 97f82c99..ee49667d 100644
--- a/alot/commands/globals.py
+++ b/alot/commands/globals.py
@@ -29,7 +29,6 @@ from ..completion.contacts import ContactsCompleter
from ..completion.accounts import AccountCompleter
from ..completion.tags import TagsCompleter
from ..widgets.utils import DialogBox
-from ..db.errors import DatabaseLockedError
from ..db.envelope import Envelope
from ..settings.const import settings
from ..settings.errors import ConfigError, NoMatchingAccount
@@ -73,17 +72,9 @@ class ExitCommand(Command):
b.cleanup()
if ui.dbman.ro:
ui.exit()
- else:
- await ui.apply_command(FlushCommand(callback=ui.exit))
- ui.cleanup()
-
- if ui.db_was_locked:
- msg = 'Database locked. Exit without saving?'
- response = await ui.choice(msg, msg_position='left', cancel='no')
- if response == 'no':
- return
- ui.exit()
+ await ui.cleanup()
+ ui.exit()
@registerCommand(MODE, 'search', usage='search query', arguments=[
(['--sort'], {'help': 'sort order', 'choices': [
@@ -586,50 +577,6 @@ class NamedQueriesCommand(Command):
def apply(self, ui):
ui.buffer_open(buffers.NamedQueriesBuffer(ui.dbman, self.filtfun))
-
-@registerCommand(MODE, 'flush')
-class FlushCommand(Command):
-
- """flush write operations or retry until committed"""
- repeatable = True
-
- def __init__(self, callback=None, silent=False, **kwargs):
- """
- :param callback: function to call after successful writeout
- :type callback: callable
- """
- Command.__init__(self, **kwargs)
- self.callback = callback
- self.silent = silent
-
- def apply(self, ui):
- try:
- ui.dbman.flush()
- if callable(self.callback):
- self.callback()
- logging.debug('flush complete')
- if ui.db_was_locked:
- if not self.silent:
- ui.notify('changes flushed')
- ui.db_was_locked = False
- ui.update()
-
- except DatabaseLockedError:
- timeout = settings.get('flush_retry_timeout')
-
- if timeout > 0:
- def f(*_):
- self.apply(ui)
- ui.mainloop.set_alarm_in(timeout, f)
- if not ui.db_was_locked:
- if not self.silent:
- ui.notify('index locked, will try again in %d secs'
- % timeout)
- ui.db_was_locked = True
- ui.update()
- return
-
-
# TODO: choices
@registerCommand(MODE, 'help', arguments=[
(['commandname'], {'help': 'command or \'bindings\''})])
diff --git a/alot/commands/search.py b/alot/commands/search.py
index 9fe265cb..d4c1a258 100644
--- a/alot/commands/search.py
+++ b/alot/commands/search.py
@@ -99,9 +99,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand)
@registerCommand(
MODE, 'tag', forced={'action': 'add'},
arguments=[
- (['--no-flush'], {'action': 'store_false', 'dest': 'flush',
- 'default': 'True',
- 'help': 'postpone a writeout to the index'}),
(['--all'], {'action': 'store_true', 'dest': 'allmessages',
'default': False,
'help': 'tag all messages that match the current search query'}),
@@ -111,9 +108,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand)
@registerCommand(
MODE, 'retag', forced={'action': 'set'},
arguments=[
- (['--no-flush'], {'action': 'store_false', 'dest': 'flush',
- 'default': 'True',
- 'help': 'postpone a writeout to the index'}),
(['--all'], {'action': 'store_true', 'dest': 'allmessages',
'default': False,
'help': 'retag all messages that match the current query'}),
@@ -123,9 +117,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand)
@registerCommand(
MODE, 'untag', forced={'action': 'remove'},
arguments=[
- (['--no-flush'], {'action': 'store_false', 'dest': 'flush',
- 'default': 'True',
- 'help': 'postpone a writeout to the index'}),
(['--all'], {'action': 'store_true', 'dest': 'allmessages',
'default': False,
'help': 'untag all messages that match the current query'}),
@@ -135,9 +126,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand)
@registerCommand(
MODE, 'toggletags', forced={'action': 'toggle'},
arguments=[
- (['--no-flush'], {'action': 'store_false', 'dest': 'flush',
- 'default': 'True',
- 'help': 'postpone a writeout to the index'}),
(['tags'], {'help': 'comma separated list of tags'})],
help='flip presence of tags on the selected thread: a tag is considered present '
'and will be removed if at least one message in this thread is '
@@ -147,8 +135,7 @@ class TagCommand(Command):
"""manipulate message tags"""
repeatable = True
- def __init__(self, tags='', action='add', allmessages=False, flush=True,
- **kwargs):
+ def __init__(self, tags='', action='add', allmessages=False, **kwargs):
"""
:param tags: comma separated list of tagstrings to set
:type tags: str
@@ -158,13 +145,10 @@ class TagCommand(Command):
:type action: str
:param allmessages: tag all messages in search result
:type allmessages: bool
- :param flush: immediately write out to the index
- :type flush: bool
"""
self.tagsstring = tags
self.action = action
self.allm = allmessages
- self.flush = flush
Command.__init__(self, **kwargs)
async def apply(self, ui):
@@ -200,11 +184,11 @@ class TagCommand(Command):
try:
if self.action == 'add':
- ui.dbman.tag(testquery, tags, remove_rest=False)
+ await ui.dbman.tags_add(testquery, tags)
if self.action == 'set':
- ui.dbman.tag(testquery, tags, remove_rest=True)
+ await ui.dbman.tags_set(testquery, tags)
elif self.action == 'remove':
- ui.dbman.untag(testquery, tags)
+ await ui.dbman.tags_remove(testquery, tags)
elif self.action == 'toggle':
if not self.allm:
to_remove = set()
@@ -214,17 +198,13 @@ class TagCommand(Command):
to_remove.add(t)
else:
to_add.add(t)
- thread.remove_tags(to_remove)
- thread.add_tags(to_add, afterwards=refresh)
+ await thread.tags_remove(to_remove)
+ await thread.tags_add(to_add)
except DatabaseROError:
ui.notify('index in read-only mode', priority='error')
return
- # flush index
- if self.flush:
- await ui.apply_command(
- commands.globals.FlushCommand(callback=refresh))
-
+ refresh()
@registerCommand(
MODE, 'move', help='move focus in search buffer',
diff --git a/alot/commands/thread.py b/alot/commands/thread.py
index 178264c0..7523f46b 100644
--- a/alot/commands/thread.py
+++ b/alot/commands/thread.py
@@ -18,7 +18,6 @@ from io import BytesIO
from . import Command, registerCommand
from .globals import ExternalCommand
-from .globals import FlushCommand
from .globals import ComposeCommand
from .globals import MoveCommand
from .globals import CommandCanceled
@@ -1040,8 +1039,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand)
arguments=[
(['--all'], {'action': 'store_true',
'help': 'tag all messages in thread'}),
- (['--no-flush'], {'action': 'store_false', 'dest': 'flush',
- 'help': 'postpone a writeout to the index'}),
(['tags'], {'help': 'comma separated list of tags'})],
help='add tags to message(s)',
)
@@ -1050,8 +1047,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand)
arguments=[
(['--all'], {'action': 'store_true',
'help': 'tag all messages in thread'}),
- (['--no-flush'], {'action': 'store_false', 'dest': 'flush',
- 'help': 'postpone a writeout to the index'}),
(['tags'], {'help': 'comma separated list of tags'})],
help='set message(s) tags.',
)
@@ -1060,8 +1055,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand)
arguments=[
(['--all'], {'action': 'store_true',
'help': 'tag all messages in thread'}),
- (['--no-flush'], {'action': 'store_false', 'dest': 'flush',
- 'help': 'postpone a writeout to the index'}),
(['tags'], {'help': 'comma separated list of tags'})],
help='remove tags from message(s)',
)
@@ -1070,8 +1063,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand)
arguments=[
(['--all'], {'action': 'store_true',
'help': 'tag all messages in thread'}),
- (['--no-flush'], {'action': 'store_false', 'dest': 'flush',
- 'help': 'postpone a writeout to the index'}),
(['tags'], {'help': 'comma separated list of tags'})],
help='flip presence of tags on message(s)',
)
@@ -1080,8 +1071,7 @@ class TagCommand(Command):
"""manipulate message tags"""
repeatable = True
- def __init__(self, tags='', action='add', all=False, flush=True,
- **kwargs):
+ def __init__(self, tags='', action='add', all=False, **kwargs):
"""
:param tags: comma separated list of tagstrings to set
:type tags: str
@@ -1091,13 +1081,10 @@ class TagCommand(Command):
:type action: str
:param all: tag all messages in thread
:type all: bool
- :param flush: immediately write out to the index
- :type flush: bool
"""
self.tagsstring = tags
self.all = all
self.action = action
- self.flush = flush
Command.__init__(self, **kwargs)
async def apply(self, ui):
@@ -1110,26 +1097,22 @@ class TagCommand(Command):
try:
for m in messages:
if self.action == 'add':
- m.add_tags(tags)
+ await m.tags_add(tags)
if self.action == 'set':
- m.add_tags(tags, remove_rest=True)
+ await m.tags_set(tags)
elif self.action == 'remove':
- m.remove_tags(tags)
+ await m.tags_remove(tags)
elif self.action == 'toggle':
to_remove = set()
- to_add = ()
+ to_add = set()
for t in tags:
if t in m.get_tags():
to_remove.add(t)
else:
to_add.add(t)
- m.remove_tags(to_remove)
- m.add_tags(to_add)
+ await m.tags_remove(to_remove)
+ await m.tags_add(to_add)
except DatabaseROError:
ui.notify('index in read-only mode', priority='error')
return
-
- # flush index
- if self.flush:
- await ui.apply_command(FlushCommand())
diff --git a/alot/db/errors.py b/alot/db/errors.py
index 668ab34e..bd2564f9 100644
--- a/alot/db/errors.py
+++ b/alot/db/errors.py
@@ -13,12 +13,6 @@ class DatabaseROError(DatabaseError):
pass
-class DatabaseLockedError(DatabaseError):
-
- """cannot write to locked index"""
- pass
-
-
class NonexistantObjectError(DatabaseError):
"""requested thread or message does not exist in the index"""
diff --git a/alot/db/manager.py b/alot/db/manager.py
index f2828d65..ba9271fc 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -1,14 +1,15 @@
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
-from collections import deque
+
+import abc
+import asyncio
import logging
import os
-from notmuch2 import Database, NotmuchError, XapianError
+from notmuch2 import Database, NotmuchError
from .errors import DatabaseError
-from .errors import DatabaseLockedError
from .errors import DatabaseROError
from .errors import NonexistantObjectError
from .thread import Thread
@@ -23,6 +24,90 @@ def _is_subdir_of(subpath, superpath):
# e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
return os.path.commonprefix([subpath, superpath]) == superpath
+# DB write operations
+class _DBOperation(abc.ABC):
+ _dbman = None
+ _tags = None
+
+ future = None
+
+ def __init__(self, dbman, tags):
+ self._dbman = dbman
+ self._tags = tags
+
+ self.future = dbman._loop.create_future()
+
+ def apply(self):
+ logging.debug('Performing DB write: %s', self)
+
+ try:
+ with Database(self._dbman.path, Database.MODE.READ_WRITE) as db:
+ logging.debug('got writeable DB')
+
+ with db.atomic():
+ self._apply(db)
+ except Exception as e:
+ logging.exception(e)
+ self.future.set_exception(e)
+ else:
+ logging.debug('DB write completed: %s', self)
+ self.future.set_result(True)
+
+ @abc.abstractmethod
+ def _apply(self, db):
+ pass
+
+ def __str__(self):
+ return '%s:%s' % (self.__class__.__name__, self._tags)
+
+class _DBOperationTagAdd(_DBOperation):
+ _query = None
+
+ def __init__(self, dbman, tags, query):
+ self._query = query
+
+ super().__init__(dbman, tags)
+
+ def _apply(self, db):
+ for msg in db.messages(self._query):
+ msg_tags = msg.tags
+ msg_tags |= self._tags
+
+ def __str__(self):
+ return '%s:%s' % (super().__str__(), self._query)
+
+class _DBOperationTagRemove(_DBOperationTagAdd):
+ def _apply(self, db):
+ for msg in db.messages(self._query):
+ msg_tags = msg.tags
+ msg_tags -= self._tags
+
+class _DBOperationTagSet(_DBOperationTagAdd):
+ def _apply(self, db):
+ for msg in db.messages(self._query):
+ msg_tags = msg.tags
+ property_tags = msg_tags & self._dbman._property_tags
+
+ msg_tags.clear()
+ msg_tags |= self._tags | property_tags
+
+class _DBOperationMsgAdd(_DBOperation):
+ _path = None
+
+ def __init__(self, dbman, tags, path):
+ self._path = path
+
+ super().__init__(dbman, tags)
+
+ def _apply(self, db):
+ msg, _ = db.add(self._path, self._dbman._sync_flags)
+
+ msg_tags = msg.tags
+ msg_tags |= self._tags
+
+ def __str__(self):
+ return '%s:%s' % (super().__str__(), self._path)
+
class DBManager:
"""
Keeps track of your index parameters, maintains a write-queue and
@@ -37,10 +122,17 @@ class DBManager:
}
"""constants representing sort orders"""
+ _loop = None
+
+ _sync_flags = None
+
_exclude_tags = None
_property_tags = None
- def __init__(self, path=None, ro=False):
+ _write_task = None
+ _write_queue = None
+
+ def __init__(self, loop, path=None, ro=False):
"""
:param path: absolute path to the notmuch index
:type path: str
@@ -49,7 +141,11 @@ class DBManager:
"""
self.ro = ro
self.path = path
- self.writequeue = deque([])
+
+ self._loop = loop
+
+ # read notmuch's config regarding imap flag synchronization
+ self._sync_flags = settings.get_notmuch_setting('maildir', 'synchronize_flags')
self._exclude_tags = frozenset(settings.get('exclude_tags'))
self._property_tags = frozenset(settings.get('property_tags'))
@@ -57,140 +153,7 @@ class DBManager:
def _db_ro(self):
return Database(path = self.path, mode = Database.MODE.READ_ONLY)
- def flush(self):
- """
- write out all queued write-commands in order, each one in a separate
- :meth:`atomic <notmuch.Database.begin_atomic>` transaction.
-
- If this fails the current action is rolled back, stays in the write
- queue and an exception is raised.
- You are responsible to retry flushing at a later time if you want to
- ensure that the cached changes are applied to the database.
-
- :exception: :exc:`~errors.DatabaseROError` if db is opened read-only
- :exception: :exc:`~errors.DatabaseLockedError` if db is locked
- """
- if self.ro:
- raise DatabaseROError()
- if not self.writequeue:
- return
-
- # read notmuch's config regarding imap flag synchronization
- sync = settings.get_notmuch_setting('maildir', 'synchronize_flags')
-
- # go through writequeue entries
- while self.writequeue:
- current_item = self.writequeue.popleft()
- logging.debug('write-out item: %s', str(current_item))
-
- # watch out for notmuch errors to re-insert current_item
- # to the queue on errors
- try:
- # the first two coordinants are cnmdname and post-callback
- cmd, afterwards = current_item[:2]
- logging.debug('cmd created')
-
- # acquire a writeable db handler
- try:
- mode = Database.MODE.READ_WRITE
- db = Database(path=self.path, mode=mode)
- except NotmuchError:
- raise DatabaseLockedError()
- logging.debug('got write lock')
-
- with db.atomic():
- if cmd == 'add':
- path, op_tags = current_item[2:]
-
- msg, _ = db.add(path, sync_flags = sync)
- msg_tags = msg.tags
-
- msg_tags |= op_tags
- else: # tag/set/untag
- querystring, op_tags = current_item[2:]
- for msg in db.messages(querystring):
- with msg.frozen():
- msg_tags = msg.tags
- if cmd == 'tag':
- msg_tags |= op_tags
- if cmd == 'set':
- property_tags = msg_tags & self._property_tags
- msg_tags.clear()
- msg_tags |= op_tags | property_tags
- elif cmd == 'untag':
- msg_tags -= op_tags
-
- # close db
- db.close()
- logging.debug('closed db')
-
- # call post-callback
- if callable(afterwards):
- logging.debug(str(afterwards))
- afterwards()
- logging.debug('called callback')
-
- # re-insert item to the queue upon Xapian/NotmuchErrors
- except (XapianError, NotmuchError) as e:
- logging.exception(e)
- self.writequeue.appendleft(current_item)
- raise DatabaseError(str(e))
- except DatabaseLockedError as e:
- logging.debug('index temporarily locked')
- self.writequeue.appendleft(current_item)
- raise e
- logging.debug('flush finished')
-
- def tag(self, querystring, tags, afterwards=None, remove_rest=False):
- """
- add tags to messages matching `querystring`.
- This appends a tag operation to the write queue and raises
- :exc:`~errors.DatabaseROError` if in read only mode.
-
- :param querystring: notmuch search string
- :type querystring: str
- :param tags: a set of tags to be added
- :type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :param remove_rest: remove tags from matching messages before tagging
- :type remove_rest: bool
- :exception: :exc:`~errors.DatabaseROError`
-
- .. note::
- This only adds the requested operation to the write queue.
- You need to call :meth:`DBManager.flush` to actually write out.
- """
- if self.ro:
- raise DatabaseROError()
- if remove_rest:
- self.writequeue.append(('set', afterwards, querystring, tags))
- else:
- self.writequeue.append(('tag', afterwards, querystring, tags))
-
- def untag(self, querystring, tags, afterwards=None):
- """
- removes tags from messages that match `querystring`.
- This appends an untag operation to the write queue and raises
- :exc:`~errors.DatabaseROError` if in read only mode.
- :param querystring: notmuch search string
- :type querystring: str
- :param tags: a list of tags to be added
- :type tags: list of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :exception: :exc:`~errors.DatabaseROError`
-
- .. note::
- This only adds the requested operation to the write queue.
- You need to call :meth:`DBManager.flush` to actually write out.
- """
- if self.ro:
- raise DatabaseROError()
- self.writequeue.append(('untag', afterwards, querystring, tags))
def count_messages(self, querystring):
"""returns number of messages that match `querystring`"""
@@ -254,17 +217,96 @@ class DBManager:
return (t.threadid for t in db.threads(querystring, sort = sort, exclude_tags = exclude_tags))
- def add_message(self, path, tags):
+ async def startup(self):
+ self._write_queue = asyncio.Queue()
+ self._write_task = asyncio.create_task(self._db_write_task())
+
+ async def shutdown(self):
+ if self._write_task:
+ await self._write_queue.put(None)
+ await self._write_task
+
+ async def _db_write_task(self):
+ # this task serialises write operations on the database and
+ # sends them off to a thread so they do not block the event loop
+ while True:
+ cur_item = await self._write_queue.get()
+ if cur_item is None:
+ self._write_queue.task_done()
+ break
+
+ logging.debug('submitting write task: %s', cur_item)
+
+ await self._loop.run_in_executor(None, cur_item.apply)
+ self._write_queue.task_done()
+
+ def tags_add(self, query, tags):
+ """
+ Asynchronously add tags to messages matching `querystring`.
+
+ :param querystring: notmuch search string
+ :type querystring: str
+ :param tags: a set of tags to be added
+ :type tags: set of str
+ """
+ if self.ro:
+ raise DatabaseROError()
+
+ if not tags:
+ ret = self._loop.create_future()
+ ret.set_result(True)
+ return ret
+
+ op = _DBOperationTagAdd(self, tags, query)
+ self._write_queue.put_nowait(op)
+ return op.future
+
+ def tags_remove(self, query, tags):
"""
- Adds a file to the notmuch index.
+ Asynchronously remove tags to messages matching `querystring`.
+
+ :param querystring: notmuch search string
+ :type querystring: str
+ :param tags: a set of tags to be added
+ :type tags: set of str
+ """
+ if self.ro:
+ raise DatabaseROError()
+
+ if not tags:
+ ret = self._loop.create_future()
+ ret.set_result(True)
+ return ret
+
+ op = _DBOperationTagRemove(self, tags, query)
+ self._write_queue.put_nowait(op)
+ return op.future
+
+ def tags_set(self, query, tags):
+ """
+ Asynchronously set tags to messages matching `querystring`.
+
+ :param querystring: notmuch search string
+ :type querystring: str
+ :param tags: a set of tags to be added
+ :type tags: set of str
+ """
+ if self.ro:
+ raise DatabaseROError()
+
+ op = _DBOperationTagSet(self, tags, query)
+ self._write_queue.put_nowait(op)
+ return op.future
+
+ def msg_add(self, path, tags):
+ """
+ Asynchronously add a file to the notmuch index.
:param path: path to the file
:type path: str
:param tags: tagstrings to add
:type tags: list of str
"""
- tags = tags or []
-
if self.ro:
raise DatabaseROError()
if not _is_subdir_of(path, self.path):
@@ -272,5 +314,7 @@ class DBManager:
msg += ' is not below notmuchs '
msg += 'root path (%s)' % self.path
raise DatabaseError(msg)
- else:
- self.writequeue.append(('add', None, path, tags))
+
+ op = _DBOperationMsgAdd(self, tags, path)
+ self._write_queue.put_nowait(op)
+ return op.future
diff --git a/alot/db/message.py b/alot/db/message.py
index 21373e5e..836a9963 100644
--- a/alot/db/message.py
+++ b/alot/db/message.py
@@ -466,57 +466,46 @@ class Message:
"""
return email.utils.parseaddr(self._from)
- def add_tags(self, tags, afterwards=None, remove_rest=False):
+ def tags_add(self, tags):
"""
- adds tags to message
-
- .. note::
-
- This only adds the requested operation to this objects
- :class:`DBManager's <alot.db.DBManager>` write queue.
- You need to call :meth:`~alot.db.DBManager.flush` to write out.
+ Asynchronously add tags to message
:param tags: a set of tags to be added
:type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :param remove_rest: remove all other tags
- :type remove_rest: bool
"""
- def myafterwards():
- if remove_rest:
- self._tags = set(tags)
- else:
- self._tags = self._tags.union(tags)
- if callable(afterwards):
- afterwards()
+ def myafterwards(fut):
+ self._tags = self._tags.union(tags)
- self._dbman.tag('id:' + self.id, tags, afterwards=myafterwards,
- remove_rest=remove_rest)
- self._tags = self._tags.union(tags)
+ fut = self._dbman.tags_add('id:' + self.id, tags)
+ fut.add_done_callback(myafterwards)
+ return fut
- def remove_tags(self, tags, afterwards=None):
- """remove tags from message
+ def tags_set(self, tags):
+ """
+ Asynchronously set tags for a message
+
+ :param tags: a set of new tags to replace the existing set
+ :type tags: set of str
+ """
+ def myafterwards(fut):
+ self._tags = set(tags)
- .. note::
+ fut = self._dbman.tags_set('id:' + self.id, tags)
+ fut.add_done_callback(myafterwards)
+ return fut
- This only adds the requested operation to this objects
- :class:`DBManager's <alot.db.DBManager>` write queue.
- You need to call :meth:`~alot.db.DBManager.flush` to actually out.
+ def tags_remove(self, tags):
+ """Asynchronously remove tags from message
:param tags: a set of tags to be removed
:type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
"""
- def myafterwards():
+ def myafterwards(fut):
self._tags = self._tags.difference(tags)
- if callable(afterwards):
- afterwards()
- self._dbman.untag('id:' + self.id, tags, myafterwards)
+ fut = self._dbman.tags_remove('id:' + self.id, tags)
+ fut.add_done_callback(myafterwards)
+ return fut
def iter_attachments(self):
"""
diff --git a/alot/db/thread.py b/alot/db/thread.py
index 2b580dcf..1f9ae7f0 100644
--- a/alot/db/thread.py
+++ b/alot/db/thread.py
@@ -63,7 +63,7 @@ class Thread:
self.refresh(thread)
- def refresh(self, thread=None):
+ def refresh(self, thread = None):
"""refresh thread metadata from the index"""
if not thread:
thread = self._dbman._get_notmuch_thread(self.id)
@@ -140,58 +140,28 @@ class Thread:
tags = tags.intersection(set(m.get_tags()))
return tags
- def add_tags(self, tags, afterwards=None, remove_rest=False):
+ def tags_add(self, tags):
"""
- add `tags` to all messages in this thread
-
- .. note::
-
- This only adds the requested operation to this objects
- :class:`DBManager's <alot.db.DBManager>` write queue.
- You need to call :meth:`DBManager.flush <alot.db.DBManager.flush>`
- to actually write out.
+ Asynchronously add `tags` to all messages in this thread
:param tags: a set of tags to be added
:type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
- :param remove_rest: remove all other tags
- :type remove_rest: bool
"""
- def myafterwards():
- self.refresh()
- if callable(afterwards):
- afterwards()
- self._dbman.tag('thread:' + self.id, tags, afterwards=myafterwards,
- remove_rest=remove_rest)
+ fut = self._dbman.tags_add('thread:' + self.id, tags)
+ fut.add_done_callback(lambda fut: self.refresh())
+ return fut
- def remove_tags(self, tags, afterwards=None):
+ def tags_remove(self, tags):
"""
- remove `tags` (list of str) from all messages in this thread
-
- .. note::
-
- This only adds the requested operation to this objects
- :class:`DBManager's <alot.db.DBManager>` write queue.
- You need to call :meth:`DBManager.flush <alot.db.DBManager.flush>`
- to actually write out.
-
+ Asynchronously remove `tags` from all messages in this thread
:param tags: a set of tags to be added
:type tags: set of str
- :param afterwards: callback that gets called after successful
- application of this tagging operation
- :type afterwards: callable
"""
- rmtags = set(tags).intersection(self._tags)
- if rmtags:
-
- def myafterwards():
- self.refresh()
- if callable(afterwards):
- afterwards()
- self._dbman.untag('thread:' + self.id, tags, myafterwards)
+ fut = self._dbman.tags_remove('thread:' + self.id, tags)
+ fut.add_done_callback(lambda fut: self.refresh())
+ return fut
+
def get_authors(self):
"""
diff --git a/alot/defaults/alot.rc.spec b/alot/defaults/alot.rc.spec
index 00e527b5..5a42119f 100644
--- a/alot/defaults/alot.rc.spec
+++ b/alot/defaults/alot.rc.spec
@@ -122,10 +122,6 @@ edit_headers_whitelist = force_list(default=list(*,))
# see :ref:`edit_headers_whitelist <edit-headers-whitelist>`
edit_headers_blacklist = force_list(default=list(Content-Type,MIME-Version,References,In-Reply-To))
-# timeout in seconds after a failed attempt to writeout the database is
-# repeated. Set to 0 for no retry.
-flush_retry_timeout = integer(default=5)
-
# where to look up hooks
hooksfile = string(default='~/.config/alot/hooks.py')
diff --git a/alot/defaults/default.bindings b/alot/defaults/default.bindings
index e28253ed..16d9d04a 100644
--- a/alot/defaults/default.bindings
+++ b/alot/defaults/default.bindings
@@ -20,7 +20,6 @@ U = search tag:unread
tab = bnext
\ = prompt 'search '
d = bclose
-$ = flush
m = compose
o = prompt 'search '
q = exit
diff --git a/alot/ui.py b/alot/ui.py
index e361fee8..550ca150 100644
--- a/alot/ui.py
+++ b/alot/ui.py
@@ -104,7 +104,7 @@ class UI:
# running asyncio event loop
_loop = None
- def __init__(self, dbman, initialcmdline):
+ def __init__(self, loop, dbman, initialcmdline):
"""
:param dbman: :class:`~alot.db.DBManager`
:param initialcmdline: commandline applied after setting up interface
@@ -118,8 +118,6 @@ class UI:
"""list of active buffers"""
self.current_buffer = None
"""points to currently active :class:`~alot.buffers.Buffer`"""
- self.db_was_locked = False
- """flag used to prevent multiple 'index locked' notifications"""
self.mode = 'global'
"""interface mode identifier - type of current buffer"""
self.commandprompthistory = []
@@ -133,7 +131,7 @@ class UI:
self.last_commandline = None
"""saves the last executed commandline"""
- self._loop = asyncio.get_event_loop()
+ self._loop = loop
# define empty notification pile
self._notification_bar = urwid.Pile([])
@@ -202,6 +200,8 @@ class UI:
logging.info('setup gui in %d colours', colourmode)
self.mainloop.screen.set_terminal_properties(colors=colourmode)
+ self._loop.create_task(self.dbman.startup())
+
logging.debug('fire first command')
self._loop.create_task(self.apply_commandline(initialcmdline))
@@ -698,7 +698,6 @@ class UI:
cb = self.current_buffer
info = {}
- info['pending_writes'] = len(self.dbman.writequeue)
info['input_queue'] = ' '.join(self.input_queue)
info['buffer_no'] = self.buffers.index(cb)
info['buffer_type'] = cb.modename
@@ -752,7 +751,7 @@ class UI:
self.current_buffer.rebuild()
self.update()
- def cleanup(self):
+ async def cleanup(self):
"""Do the final clean up before shutting down."""
size = settings.get('history_size')
self._save_history_to_file(self.commandprompthistory,
@@ -762,6 +761,8 @@ class UI:
self._save_history_to_file(self.recipienthistory,
self._recipients_hist_file, size=size)
+ await self.dbman.shutdown()
+
@staticmethod
def _load_history_from_file(path, size=-1):
"""Load a history list from a file and split it into lines.