diff options
author | Anton Khirnov <anton@khirnov.net> | 2021-01-20 11:38:34 +0100 |
---|---|---|
committer | Anton Khirnov <anton@khirnov.net> | 2021-01-20 11:39:27 +0100 |
commit | d47a85bca83352300399f04e76abecea4d53a934 (patch) | |
tree | 741f5ce3857102f19e4d44c33dc93981b906268d | |
parent | 3b78137e97565f90a48aad92dc471c47e63747eb (diff) |
db: make write operations async
-rw-r--r-- | alot/__main__.py | 7 | ||||
-rw-r--r-- | alot/commands/envelope.py | 10 | ||||
-rw-r--r-- | alot/commands/globals.py | 57 | ||||
-rw-r--r-- | alot/commands/search.py | 34 | ||||
-rw-r--r-- | alot/commands/thread.py | 31 | ||||
-rw-r--r-- | alot/db/errors.py | 6 | ||||
-rw-r--r-- | alot/db/manager.py | 332 | ||||
-rw-r--r-- | alot/db/message.py | 61 | ||||
-rw-r--r-- | alot/db/thread.py | 54 | ||||
-rw-r--r-- | alot/defaults/alot.rc.spec | 4 | ||||
-rw-r--r-- | alot/defaults/default.bindings | 1 | ||||
-rw-r--r-- | alot/ui.py | 13 |
12 files changed, 257 insertions, 353 deletions
diff --git a/alot/__main__.py b/alot/__main__.py index d09a84f6..37d74f3b 100644 --- a/alot/__main__.py +++ b/alot/__main__.py @@ -2,6 +2,7 @@ # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file import argparse +import asyncio import logging import os import sys @@ -119,10 +120,12 @@ def main(): if options.colour_mode: settings.set('colourmode', options.colour_mode) + loop = asyncio.get_event_loop() + # get ourselves a database manager indexpath = settings.get_notmuch_setting('database', 'path') indexpath = options.mailindex_path or indexpath - dbman = DBManager(path=indexpath, ro=options.read_only) + dbman = DBManager(loop, path=indexpath, ro=options.read_only) # determine what to do if command is None: @@ -134,7 +137,7 @@ def main(): cmdstring = ' '.join(options.command) # set up and start interface - UI(dbman, cmdstring) + UI(loop, dbman, cmdstring) # run the exit hook exit_hook = settings.get_hook('exit') diff --git a/alot/commands/envelope.py b/alot/commands/envelope.py index 1f806581..457fe48a 100644 --- a/alot/commands/envelope.py +++ b/alot/commands/envelope.py @@ -140,8 +140,7 @@ class SaveCommand(Command): ui.notify(msg + ' to %s' % path) logging.debug('adding new mail to index') try: - ui.dbman.add_message(path, account.draft_tags + envelope.tags) - await ui.apply_command(globals.FlushCommand()) + await ui.dbman.msg_add(path, account.draft_tags | envelope.tags) await ui.apply_command(commands.globals.BufferCloseCommand()) except DatabaseError as e: logging.error(str(e)) @@ -295,9 +294,9 @@ class SendCommand(Command): ui.notify('mail sent successfully') if self.envelope is not None: if self.envelope.replied: - self.envelope.replied.add_tags(account.replied_tags) + await self.envelope.replied.tags_add(account.replied_tags) if self.envelope.passed: - self.envelope.passed.add_tags(account.passed_tags) + await self.envelope.passed.tags_add(account.passed_tags) # store mail locally # This can raise StoreMailError @@ -306,8 +305,7 @@ class SendCommand(Command): # add mail to index if maildir path available if path is not None: logging.debug('adding new mail to index') - ui.dbman.add_message(path, account.sent_tags | initial_tags) - await ui.apply_command(globals.FlushCommand()) + await ui.dbman.msg_add(path, account.sent_tags | initial_tags) @registerCommand(MODE, 'edit', arguments=[ diff --git a/alot/commands/globals.py b/alot/commands/globals.py index 97f82c99..ee49667d 100644 --- a/alot/commands/globals.py +++ b/alot/commands/globals.py @@ -29,7 +29,6 @@ from ..completion.contacts import ContactsCompleter from ..completion.accounts import AccountCompleter from ..completion.tags import TagsCompleter from ..widgets.utils import DialogBox -from ..db.errors import DatabaseLockedError from ..db.envelope import Envelope from ..settings.const import settings from ..settings.errors import ConfigError, NoMatchingAccount @@ -73,17 +72,9 @@ class ExitCommand(Command): b.cleanup() if ui.dbman.ro: ui.exit() - else: - await ui.apply_command(FlushCommand(callback=ui.exit)) - ui.cleanup() - - if ui.db_was_locked: - msg = 'Database locked. Exit without saving?' - response = await ui.choice(msg, msg_position='left', cancel='no') - if response == 'no': - return - ui.exit() + await ui.cleanup() + ui.exit() @registerCommand(MODE, 'search', usage='search query', arguments=[ (['--sort'], {'help': 'sort order', 'choices': [ @@ -586,50 +577,6 @@ class NamedQueriesCommand(Command): def apply(self, ui): ui.buffer_open(buffers.NamedQueriesBuffer(ui.dbman, self.filtfun)) - -@registerCommand(MODE, 'flush') -class FlushCommand(Command): - - """flush write operations or retry until committed""" - repeatable = True - - def __init__(self, callback=None, silent=False, **kwargs): - """ - :param callback: function to call after successful writeout - :type callback: callable - """ - Command.__init__(self, **kwargs) - self.callback = callback - self.silent = silent - - def apply(self, ui): - try: - ui.dbman.flush() - if callable(self.callback): - self.callback() - logging.debug('flush complete') - if ui.db_was_locked: - if not self.silent: - ui.notify('changes flushed') - ui.db_was_locked = False - ui.update() - - except DatabaseLockedError: - timeout = settings.get('flush_retry_timeout') - - if timeout > 0: - def f(*_): - self.apply(ui) - ui.mainloop.set_alarm_in(timeout, f) - if not ui.db_was_locked: - if not self.silent: - ui.notify('index locked, will try again in %d secs' - % timeout) - ui.db_was_locked = True - ui.update() - return - - # TODO: choices @registerCommand(MODE, 'help', arguments=[ (['commandname'], {'help': 'command or \'bindings\''})]) diff --git a/alot/commands/search.py b/alot/commands/search.py index 9fe265cb..d4c1a258 100644 --- a/alot/commands/search.py +++ b/alot/commands/search.py @@ -99,9 +99,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand) @registerCommand( MODE, 'tag', forced={'action': 'add'}, arguments=[ - (['--no-flush'], {'action': 'store_false', 'dest': 'flush', - 'default': 'True', - 'help': 'postpone a writeout to the index'}), (['--all'], {'action': 'store_true', 'dest': 'allmessages', 'default': False, 'help': 'tag all messages that match the current search query'}), @@ -111,9 +108,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand) @registerCommand( MODE, 'retag', forced={'action': 'set'}, arguments=[ - (['--no-flush'], {'action': 'store_false', 'dest': 'flush', - 'default': 'True', - 'help': 'postpone a writeout to the index'}), (['--all'], {'action': 'store_true', 'dest': 'allmessages', 'default': False, 'help': 'retag all messages that match the current query'}), @@ -123,9 +117,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand) @registerCommand( MODE, 'untag', forced={'action': 'remove'}, arguments=[ - (['--no-flush'], {'action': 'store_false', 'dest': 'flush', - 'default': 'True', - 'help': 'postpone a writeout to the index'}), (['--all'], {'action': 'store_true', 'dest': 'allmessages', 'default': False, 'help': 'untag all messages that match the current query'}), @@ -135,9 +126,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand) @registerCommand( MODE, 'toggletags', forced={'action': 'toggle'}, arguments=[ - (['--no-flush'], {'action': 'store_false', 'dest': 'flush', - 'default': 'True', - 'help': 'postpone a writeout to the index'}), (['tags'], {'help': 'comma separated list of tags'})], help='flip presence of tags on the selected thread: a tag is considered present ' 'and will be removed if at least one message in this thread is ' @@ -147,8 +135,7 @@ class TagCommand(Command): """manipulate message tags""" repeatable = True - def __init__(self, tags='', action='add', allmessages=False, flush=True, - **kwargs): + def __init__(self, tags='', action='add', allmessages=False, **kwargs): """ :param tags: comma separated list of tagstrings to set :type tags: str @@ -158,13 +145,10 @@ class TagCommand(Command): :type action: str :param allmessages: tag all messages in search result :type allmessages: bool - :param flush: immediately write out to the index - :type flush: bool """ self.tagsstring = tags self.action = action self.allm = allmessages - self.flush = flush Command.__init__(self, **kwargs) async def apply(self, ui): @@ -200,11 +184,11 @@ class TagCommand(Command): try: if self.action == 'add': - ui.dbman.tag(testquery, tags, remove_rest=False) + await ui.dbman.tags_add(testquery, tags) if self.action == 'set': - ui.dbman.tag(testquery, tags, remove_rest=True) + await ui.dbman.tags_set(testquery, tags) elif self.action == 'remove': - ui.dbman.untag(testquery, tags) + await ui.dbman.tags_remove(testquery, tags) elif self.action == 'toggle': if not self.allm: to_remove = set() @@ -214,17 +198,13 @@ class TagCommand(Command): to_remove.add(t) else: to_add.add(t) - thread.remove_tags(to_remove) - thread.add_tags(to_add, afterwards=refresh) + await thread.tags_remove(to_remove) + await thread.tags_add(to_add) except DatabaseROError: ui.notify('index in read-only mode', priority='error') return - # flush index - if self.flush: - await ui.apply_command( - commands.globals.FlushCommand(callback=refresh)) - + refresh() @registerCommand( MODE, 'move', help='move focus in search buffer', diff --git a/alot/commands/thread.py b/alot/commands/thread.py index 178264c0..7523f46b 100644 --- a/alot/commands/thread.py +++ b/alot/commands/thread.py @@ -18,7 +18,6 @@ from io import BytesIO from . import Command, registerCommand from .globals import ExternalCommand -from .globals import FlushCommand from .globals import ComposeCommand from .globals import MoveCommand from .globals import CommandCanceled @@ -1040,8 +1039,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand) arguments=[ (['--all'], {'action': 'store_true', 'help': 'tag all messages in thread'}), - (['--no-flush'], {'action': 'store_false', 'dest': 'flush', - 'help': 'postpone a writeout to the index'}), (['tags'], {'help': 'comma separated list of tags'})], help='add tags to message(s)', ) @@ -1050,8 +1047,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand) arguments=[ (['--all'], {'action': 'store_true', 'help': 'tag all messages in thread'}), - (['--no-flush'], {'action': 'store_false', 'dest': 'flush', - 'help': 'postpone a writeout to the index'}), (['tags'], {'help': 'comma separated list of tags'})], help='set message(s) tags.', ) @@ -1060,8 +1055,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand) arguments=[ (['--all'], {'action': 'store_true', 'help': 'tag all messages in thread'}), - (['--no-flush'], {'action': 'store_false', 'dest': 'flush', - 'help': 'postpone a writeout to the index'}), (['tags'], {'help': 'comma separated list of tags'})], help='remove tags from message(s)', ) @@ -1070,8 +1063,6 @@ RetagPromptCommand = registerCommand(MODE, 'retagprompt')(RetagPromptCommand) arguments=[ (['--all'], {'action': 'store_true', 'help': 'tag all messages in thread'}), - (['--no-flush'], {'action': 'store_false', 'dest': 'flush', - 'help': 'postpone a writeout to the index'}), (['tags'], {'help': 'comma separated list of tags'})], help='flip presence of tags on message(s)', ) @@ -1080,8 +1071,7 @@ class TagCommand(Command): """manipulate message tags""" repeatable = True - def __init__(self, tags='', action='add', all=False, flush=True, - **kwargs): + def __init__(self, tags='', action='add', all=False, **kwargs): """ :param tags: comma separated list of tagstrings to set :type tags: str @@ -1091,13 +1081,10 @@ class TagCommand(Command): :type action: str :param all: tag all messages in thread :type all: bool - :param flush: immediately write out to the index - :type flush: bool """ self.tagsstring = tags self.all = all self.action = action - self.flush = flush Command.__init__(self, **kwargs) async def apply(self, ui): @@ -1110,26 +1097,22 @@ class TagCommand(Command): try: for m in messages: if self.action == 'add': - m.add_tags(tags) + await m.tags_add(tags) if self.action == 'set': - m.add_tags(tags, remove_rest=True) + await m.tags_set(tags) elif self.action == 'remove': - m.remove_tags(tags) + await m.tags_remove(tags) elif self.action == 'toggle': to_remove = set() - to_add = () + to_add = set() for t in tags: if t in m.get_tags(): to_remove.add(t) else: to_add.add(t) - m.remove_tags(to_remove) - m.add_tags(to_add) + await m.tags_remove(to_remove) + await m.tags_add(to_add) except DatabaseROError: ui.notify('index in read-only mode', priority='error') return - - # flush index - if self.flush: - await ui.apply_command(FlushCommand()) diff --git a/alot/db/errors.py b/alot/db/errors.py index 668ab34e..bd2564f9 100644 --- a/alot/db/errors.py +++ b/alot/db/errors.py @@ -13,12 +13,6 @@ class DatabaseROError(DatabaseError): pass -class DatabaseLockedError(DatabaseError): - - """cannot write to locked index""" - pass - - class NonexistantObjectError(DatabaseError): """requested thread or message does not exist in the index""" diff --git a/alot/db/manager.py b/alot/db/manager.py index f2828d65..ba9271fc 100644 --- a/alot/db/manager.py +++ b/alot/db/manager.py @@ -1,14 +1,15 @@ # Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file -from collections import deque + +import abc +import asyncio import logging import os -from notmuch2 import Database, NotmuchError, XapianError +from notmuch2 import Database, NotmuchError from .errors import DatabaseError -from .errors import DatabaseLockedError from .errors import DatabaseROError from .errors import NonexistantObjectError from .thread import Thread @@ -23,6 +24,90 @@ def _is_subdir_of(subpath, superpath): # e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b return os.path.commonprefix([subpath, superpath]) == superpath +# DB write operations +class _DBOperation(abc.ABC): + _dbman = None + _tags = None + + future = None + + def __init__(self, dbman, tags): + self._dbman = dbman + self._tags = tags + + self.future = dbman._loop.create_future() + + def apply(self): + logging.debug('Performing DB write: %s', self) + + try: + with Database(self._dbman.path, Database.MODE.READ_WRITE) as db: + logging.debug('got writeable DB') + + with db.atomic(): + self._apply(db) + except Exception as e: + logging.exception(e) + self.future.set_exception(e) + else: + logging.debug('DB write completed: %s', self) + self.future.set_result(True) + + @abc.abstractmethod + def _apply(self, db): + pass + + def __str__(self): + return '%s:%s' % (self.__class__.__name__, self._tags) + +class _DBOperationTagAdd(_DBOperation): + _query = None + + def __init__(self, dbman, tags, query): + self._query = query + + super().__init__(dbman, tags) + + def _apply(self, db): + for msg in db.messages(self._query): + msg_tags = msg.tags + msg_tags |= self._tags + + def __str__(self): + return '%s:%s' % (super().__str__(), self._query) + +class _DBOperationTagRemove(_DBOperationTagAdd): + def _apply(self, db): + for msg in db.messages(self._query): + msg_tags = msg.tags + msg_tags -= self._tags + +class _DBOperationTagSet(_DBOperationTagAdd): + def _apply(self, db): + for msg in db.messages(self._query): + msg_tags = msg.tags + property_tags = msg_tags & self._dbman._property_tags + + msg_tags.clear() + msg_tags |= self._tags | property_tags + +class _DBOperationMsgAdd(_DBOperation): + _path = None + + def __init__(self, dbman, tags, path): + self._path = path + + super().__init__(dbman, tags) + + def _apply(self, db): + msg, _ = db.add(self._path, self._dbman._sync_flags) + + msg_tags = msg.tags + msg_tags |= self._tags + + def __str__(self): + return '%s:%s' % (super().__str__(), self._path) + class DBManager: """ Keeps track of your index parameters, maintains a write-queue and @@ -37,10 +122,17 @@ class DBManager: } """constants representing sort orders""" + _loop = None + + _sync_flags = None + _exclude_tags = None _property_tags = None - def __init__(self, path=None, ro=False): + _write_task = None + _write_queue = None + + def __init__(self, loop, path=None, ro=False): """ :param path: absolute path to the notmuch index :type path: str @@ -49,7 +141,11 @@ class DBManager: """ self.ro = ro self.path = path - self.writequeue = deque([]) + + self._loop = loop + + # read notmuch's config regarding imap flag synchronization + self._sync_flags = settings.get_notmuch_setting('maildir', 'synchronize_flags') self._exclude_tags = frozenset(settings.get('exclude_tags')) self._property_tags = frozenset(settings.get('property_tags')) @@ -57,140 +153,7 @@ class DBManager: def _db_ro(self): return Database(path = self.path, mode = Database.MODE.READ_ONLY) - def flush(self): - """ - write out all queued write-commands in order, each one in a separate - :meth:`atomic <notmuch.Database.begin_atomic>` transaction. - - If this fails the current action is rolled back, stays in the write - queue and an exception is raised. - You are responsible to retry flushing at a later time if you want to - ensure that the cached changes are applied to the database. - - :exception: :exc:`~errors.DatabaseROError` if db is opened read-only - :exception: :exc:`~errors.DatabaseLockedError` if db is locked - """ - if self.ro: - raise DatabaseROError() - if not self.writequeue: - return - - # read notmuch's config regarding imap flag synchronization - sync = settings.get_notmuch_setting('maildir', 'synchronize_flags') - - # go through writequeue entries - while self.writequeue: - current_item = self.writequeue.popleft() - logging.debug('write-out item: %s', str(current_item)) - - # watch out for notmuch errors to re-insert current_item - # to the queue on errors - try: - # the first two coordinants are cnmdname and post-callback - cmd, afterwards = current_item[:2] - logging.debug('cmd created') - - # acquire a writeable db handler - try: - mode = Database.MODE.READ_WRITE - db = Database(path=self.path, mode=mode) - except NotmuchError: - raise DatabaseLockedError() - logging.debug('got write lock') - - with db.atomic(): - if cmd == 'add': - path, op_tags = current_item[2:] - - msg, _ = db.add(path, sync_flags = sync) - msg_tags = msg.tags - - msg_tags |= op_tags - else: # tag/set/untag - querystring, op_tags = current_item[2:] - for msg in db.messages(querystring): - with msg.frozen(): - msg_tags = msg.tags - if cmd == 'tag': - msg_tags |= op_tags - if cmd == 'set': - property_tags = msg_tags & self._property_tags - msg_tags.clear() - msg_tags |= op_tags | property_tags - elif cmd == 'untag': - msg_tags -= op_tags - - # close db - db.close() - logging.debug('closed db') - - # call post-callback - if callable(afterwards): - logging.debug(str(afterwards)) - afterwards() - logging.debug('called callback') - - # re-insert item to the queue upon Xapian/NotmuchErrors - except (XapianError, NotmuchError) as e: - logging.exception(e) - self.writequeue.appendleft(current_item) - raise DatabaseError(str(e)) - except DatabaseLockedError as e: - logging.debug('index temporarily locked') - self.writequeue.appendleft(current_item) - raise e - logging.debug('flush finished') - - def tag(self, querystring, tags, afterwards=None, remove_rest=False): - """ - add tags to messages matching `querystring`. - This appends a tag operation to the write queue and raises - :exc:`~errors.DatabaseROError` if in read only mode. - - :param querystring: notmuch search string - :type querystring: str - :param tags: a set of tags to be added - :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :param remove_rest: remove tags from matching messages before tagging - :type remove_rest: bool - :exception: :exc:`~errors.DatabaseROError` - - .. note:: - This only adds the requested operation to the write queue. - You need to call :meth:`DBManager.flush` to actually write out. - """ - if self.ro: - raise DatabaseROError() - if remove_rest: - self.writequeue.append(('set', afterwards, querystring, tags)) - else: - self.writequeue.append(('tag', afterwards, querystring, tags)) - - def untag(self, querystring, tags, afterwards=None): - """ - removes tags from messages that match `querystring`. - This appends an untag operation to the write queue and raises - :exc:`~errors.DatabaseROError` if in read only mode. - :param querystring: notmuch search string - :type querystring: str - :param tags: a list of tags to be added - :type tags: list of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :exception: :exc:`~errors.DatabaseROError` - - .. note:: - This only adds the requested operation to the write queue. - You need to call :meth:`DBManager.flush` to actually write out. - """ - if self.ro: - raise DatabaseROError() - self.writequeue.append(('untag', afterwards, querystring, tags)) def count_messages(self, querystring): """returns number of messages that match `querystring`""" @@ -254,17 +217,96 @@ class DBManager: return (t.threadid for t in db.threads(querystring, sort = sort, exclude_tags = exclude_tags)) - def add_message(self, path, tags): + async def startup(self): + self._write_queue = asyncio.Queue() + self._write_task = asyncio.create_task(self._db_write_task()) + + async def shutdown(self): + if self._write_task: + await self._write_queue.put(None) + await self._write_task + + async def _db_write_task(self): + # this task serialises write operations on the database and + # sends them off to a thread so they do not block the event loop + while True: + cur_item = await self._write_queue.get() + if cur_item is None: + self._write_queue.task_done() + break + + logging.debug('submitting write task: %s', cur_item) + + await self._loop.run_in_executor(None, cur_item.apply) + self._write_queue.task_done() + + def tags_add(self, query, tags): + """ + Asynchronously add tags to messages matching `querystring`. + + :param querystring: notmuch search string + :type querystring: str + :param tags: a set of tags to be added + :type tags: set of str + """ + if self.ro: + raise DatabaseROError() + + if not tags: + ret = self._loop.create_future() + ret.set_result(True) + return ret + + op = _DBOperationTagAdd(self, tags, query) + self._write_queue.put_nowait(op) + return op.future + + def tags_remove(self, query, tags): """ - Adds a file to the notmuch index. + Asynchronously remove tags to messages matching `querystring`. + + :param querystring: notmuch search string + :type querystring: str + :param tags: a set of tags to be added + :type tags: set of str + """ + if self.ro: + raise DatabaseROError() + + if not tags: + ret = self._loop.create_future() + ret.set_result(True) + return ret + + op = _DBOperationTagRemove(self, tags, query) + self._write_queue.put_nowait(op) + return op.future + + def tags_set(self, query, tags): + """ + Asynchronously set tags to messages matching `querystring`. + + :param querystring: notmuch search string + :type querystring: str + :param tags: a set of tags to be added + :type tags: set of str + """ + if self.ro: + raise DatabaseROError() + + op = _DBOperationTagSet(self, tags, query) + self._write_queue.put_nowait(op) + return op.future + + def msg_add(self, path, tags): + """ + Asynchronously add a file to the notmuch index. :param path: path to the file :type path: str :param tags: tagstrings to add :type tags: list of str """ - tags = tags or [] - if self.ro: raise DatabaseROError() if not _is_subdir_of(path, self.path): @@ -272,5 +314,7 @@ class DBManager: msg += ' is not below notmuchs ' msg += 'root path (%s)' % self.path raise DatabaseError(msg) - else: - self.writequeue.append(('add', None, path, tags)) + + op = _DBOperationMsgAdd(self, tags, path) + self._write_queue.put_nowait(op) + return op.future diff --git a/alot/db/message.py b/alot/db/message.py index 21373e5e..836a9963 100644 --- a/alot/db/message.py +++ b/alot/db/message.py @@ -466,57 +466,46 @@ class Message: """ return email.utils.parseaddr(self._from) - def add_tags(self, tags, afterwards=None, remove_rest=False): + def tags_add(self, tags): """ - adds tags to message - - .. note:: - - This only adds the requested operation to this objects - :class:`DBManager's <alot.db.DBManager>` write queue. - You need to call :meth:`~alot.db.DBManager.flush` to write out. + Asynchronously add tags to message :param tags: a set of tags to be added :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :param remove_rest: remove all other tags - :type remove_rest: bool """ - def myafterwards(): - if remove_rest: - self._tags = set(tags) - else: - self._tags = self._tags.union(tags) - if callable(afterwards): - afterwards() + def myafterwards(fut): + self._tags = self._tags.union(tags) - self._dbman.tag('id:' + self.id, tags, afterwards=myafterwards, - remove_rest=remove_rest) - self._tags = self._tags.union(tags) + fut = self._dbman.tags_add('id:' + self.id, tags) + fut.add_done_callback(myafterwards) + return fut - def remove_tags(self, tags, afterwards=None): - """remove tags from message + def tags_set(self, tags): + """ + Asynchronously set tags for a message + + :param tags: a set of new tags to replace the existing set + :type tags: set of str + """ + def myafterwards(fut): + self._tags = set(tags) - .. note:: + fut = self._dbman.tags_set('id:' + self.id, tags) + fut.add_done_callback(myafterwards) + return fut - This only adds the requested operation to this objects - :class:`DBManager's <alot.db.DBManager>` write queue. - You need to call :meth:`~alot.db.DBManager.flush` to actually out. + def tags_remove(self, tags): + """Asynchronously remove tags from message :param tags: a set of tags to be removed :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable """ - def myafterwards(): + def myafterwards(fut): self._tags = self._tags.difference(tags) - if callable(afterwards): - afterwards() - self._dbman.untag('id:' + self.id, tags, myafterwards) + fut = self._dbman.tags_remove('id:' + self.id, tags) + fut.add_done_callback(myafterwards) + return fut def iter_attachments(self): """ diff --git a/alot/db/thread.py b/alot/db/thread.py index 2b580dcf..1f9ae7f0 100644 --- a/alot/db/thread.py +++ b/alot/db/thread.py @@ -63,7 +63,7 @@ class Thread: self.refresh(thread) - def refresh(self, thread=None): + def refresh(self, thread = None): """refresh thread metadata from the index""" if not thread: thread = self._dbman._get_notmuch_thread(self.id) @@ -140,58 +140,28 @@ class Thread: tags = tags.intersection(set(m.get_tags())) return tags - def add_tags(self, tags, afterwards=None, remove_rest=False): + def tags_add(self, tags): """ - add `tags` to all messages in this thread - - .. note:: - - This only adds the requested operation to this objects - :class:`DBManager's <alot.db.DBManager>` write queue. - You need to call :meth:`DBManager.flush <alot.db.DBManager.flush>` - to actually write out. + Asynchronously add `tags` to all messages in this thread :param tags: a set of tags to be added :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable - :param remove_rest: remove all other tags - :type remove_rest: bool """ - def myafterwards(): - self.refresh() - if callable(afterwards): - afterwards() - self._dbman.tag('thread:' + self.id, tags, afterwards=myafterwards, - remove_rest=remove_rest) + fut = self._dbman.tags_add('thread:' + self.id, tags) + fut.add_done_callback(lambda fut: self.refresh()) + return fut - def remove_tags(self, tags, afterwards=None): + def tags_remove(self, tags): """ - remove `tags` (list of str) from all messages in this thread - - .. note:: - - This only adds the requested operation to this objects - :class:`DBManager's <alot.db.DBManager>` write queue. - You need to call :meth:`DBManager.flush <alot.db.DBManager.flush>` - to actually write out. - + Asynchronously remove `tags` from all messages in this thread :param tags: a set of tags to be added :type tags: set of str - :param afterwards: callback that gets called after successful - application of this tagging operation - :type afterwards: callable """ - rmtags = set(tags).intersection(self._tags) - if rmtags: - - def myafterwards(): - self.refresh() - if callable(afterwards): - afterwards() - self._dbman.untag('thread:' + self.id, tags, myafterwards) + fut = self._dbman.tags_remove('thread:' + self.id, tags) + fut.add_done_callback(lambda fut: self.refresh()) + return fut + def get_authors(self): """ diff --git a/alot/defaults/alot.rc.spec b/alot/defaults/alot.rc.spec index 00e527b5..5a42119f 100644 --- a/alot/defaults/alot.rc.spec +++ b/alot/defaults/alot.rc.spec @@ -122,10 +122,6 @@ edit_headers_whitelist = force_list(default=list(*,)) # see :ref:`edit_headers_whitelist <edit-headers-whitelist>` edit_headers_blacklist = force_list(default=list(Content-Type,MIME-Version,References,In-Reply-To)) -# timeout in seconds after a failed attempt to writeout the database is -# repeated. Set to 0 for no retry. -flush_retry_timeout = integer(default=5) - # where to look up hooks hooksfile = string(default='~/.config/alot/hooks.py') diff --git a/alot/defaults/default.bindings b/alot/defaults/default.bindings index e28253ed..16d9d04a 100644 --- a/alot/defaults/default.bindings +++ b/alot/defaults/default.bindings @@ -20,7 +20,6 @@ U = search tag:unread tab = bnext \ = prompt 'search ' d = bclose -$ = flush m = compose o = prompt 'search ' q = exit @@ -104,7 +104,7 @@ class UI: # running asyncio event loop _loop = None - def __init__(self, dbman, initialcmdline): + def __init__(self, loop, dbman, initialcmdline): """ :param dbman: :class:`~alot.db.DBManager` :param initialcmdline: commandline applied after setting up interface @@ -118,8 +118,6 @@ class UI: """list of active buffers""" self.current_buffer = None """points to currently active :class:`~alot.buffers.Buffer`""" - self.db_was_locked = False - """flag used to prevent multiple 'index locked' notifications""" self.mode = 'global' """interface mode identifier - type of current buffer""" self.commandprompthistory = [] @@ -133,7 +131,7 @@ class UI: self.last_commandline = None """saves the last executed commandline""" - self._loop = asyncio.get_event_loop() + self._loop = loop # define empty notification pile self._notification_bar = urwid.Pile([]) @@ -202,6 +200,8 @@ class UI: logging.info('setup gui in %d colours', colourmode) self.mainloop.screen.set_terminal_properties(colors=colourmode) + self._loop.create_task(self.dbman.startup()) + logging.debug('fire first command') self._loop.create_task(self.apply_commandline(initialcmdline)) @@ -698,7 +698,6 @@ class UI: cb = self.current_buffer info = {} - info['pending_writes'] = len(self.dbman.writequeue) info['input_queue'] = ' '.join(self.input_queue) info['buffer_no'] = self.buffers.index(cb) info['buffer_type'] = cb.modename @@ -752,7 +751,7 @@ class UI: self.current_buffer.rebuild() self.update() - def cleanup(self): + async def cleanup(self): """Do the final clean up before shutting down.""" size = settings.get('history_size') self._save_history_to_file(self.commandprompthistory, @@ -762,6 +761,8 @@ class UI: self._save_history_to_file(self.recipienthistory, self._recipients_hist_file, size=size) + await self.dbman.shutdown() + @staticmethod def _load_history_from_file(path, size=-1): """Load a history list from a file and split it into lines. |