summaryrefslogtreecommitdiff
path: root/alot/db/manager.py
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2020-05-08 17:13:02 +0200
committerAnton Khirnov <anton@khirnov.net>2020-05-09 15:27:23 +0200
commit4c02a40d5dcec1fba988aa626da2dd0d9a058abd (patch)
tree67e43adad81d079560c016ce99b2708bcd6749aa /alot/db/manager.py
parent9a8621234211fd13f59374304fe57f6cf7bfd98e (diff)
Switch to the notmuch2 bindings.
They are supposed to replace the original notmuch python bindings, providing a safer and more pythonic interface.
Diffstat (limited to 'alot/db/manager.py')
-rw-r--r--alot/db/manager.py136
1 files changed, 55 insertions, 81 deletions
diff --git a/alot/db/manager.py b/alot/db/manager.py
index 410a9346..8ba6bd02 100644
--- a/alot/db/manager.py
+++ b/alot/db/manager.py
@@ -10,8 +10,7 @@ import signal
import sys
import threading
-from notmuch import Database, NotmuchError, XapianError
-import notmuch
+from notmuch2 import Database, NotmuchError, XapianError
from .errors import DatabaseError
from .errors import DatabaseLockedError
@@ -77,13 +76,15 @@ class DBManager:
classes.
"""
_sort_orders = {
- 'oldest_first': notmuch.database.Query.SORT.OLDEST_FIRST,
- 'newest_first': notmuch.database.Query.SORT.NEWEST_FIRST,
- 'unsorted': notmuch.database.Query.SORT.UNSORTED,
- 'message_id': notmuch.database.Query.SORT.MESSAGE_ID,
+ 'oldest_first': Database.SORT.OLDEST_FIRST,
+ 'newest_first': Database.SORT.NEWEST_FIRST,
+ 'unsorted': Database.SORT.UNSORTED,
+ 'message_id': Database.SORT.MESSAGE_ID,
}
"""constants representing sort orders"""
+ _exclude_tags = None
+
def __init__(self, path=None, ro=False):
"""
:param path: absolute path to the notmuch index
@@ -96,6 +97,11 @@ class DBManager:
self.writequeue = deque([])
self.processes = []
+ self._exclude_tags = frozenset(settings.get('exclude_tags'))
+
+ def _db_ro(self):
+ return Database(path = self.path, mode = Database.MODE.READ_ONLY)
+
def flush(self):
"""
write out all queued write-commands in order, each one in a separate
@@ -137,45 +143,26 @@ class DBManager:
raise DatabaseLockedError()
logging.debug('got write lock')
- # make this a transaction
- db.begin_atomic()
- logging.debug('got atomic')
-
- if cmd == 'add':
- logging.debug('add')
- path, tags = current_item[2:]
- msg, _ = db.add_message(path, sync_maildir_flags=sync)
- logging.debug('added msg')
- msg.freeze()
- logging.debug('freeze')
- for tag in tags:
- msg.add_tag(tag, sync_maildir_flags=sync)
- logging.debug('added tags ')
- msg.thaw()
- logging.debug('thaw')
-
-
- else: # tag/set/untag
- querystring, tags = current_item[2:]
- query = db.create_query(querystring)
- for msg in query.search_messages():
- msg.freeze()
- if cmd == 'tag':
- strategy = msg.add_tag
- if cmd == 'set':
- msg.remove_all_tags()
- strategy = msg.add_tag
- elif cmd == 'untag':
- strategy = msg.remove_tag
- for tag in tags:
- strategy(tag, sync_maildir_flags=sync)
- msg.thaw()
-
- logging.debug('ended atomic')
- # end transaction and reinsert queue item on error
- if db.end_atomic() != notmuch.STATUS.SUCCESS:
- raise DatabaseError('end_atomic failed')
- logging.debug('ended atomic')
+ with db.atomic():
+ if cmd == 'add':
+ path, op_tags = current_item[2:]
+
+ msg, _ = db.add(path, sync_flags = sync)
+ msg_tags = msg.tags
+
+ msg_tags |= op_tags
+ else: # tag/set/untag
+ querystring, op_tags = current_item[2:]
+ for msg in db.messages(querystring):
+ with msg.frozen():
+ msg_tags = msg.tags
+ if cmd == 'tag':
+ msg_tags |= op_tags
+ if cmd == 'set':
+ msg_tags.clear()
+ msg_tags |= op_tags
+ elif cmd == 'untag':
+ msg_tags -= op_tags
# close db
db.close()
@@ -260,17 +247,17 @@ class DBManager:
def count_messages(self, querystring):
"""returns number of messages that match `querystring`"""
- return self.query(querystring).count_messages()
+ return self._db_ro().count_messages(querystring, exclude_tags = self._exclude_tags)
def count_threads(self, querystring):
"""returns number of threads that match `querystring`"""
- return self.query(querystring).count_threads()
+ return self._db_ro().count_threads(querystring, exclude_tags = self._exclude_tags)
def _get_notmuch_thread(self, tid):
"""returns :class:`notmuch.database.Thread` with given id"""
- query = self.query('thread:' + tid)
+ querystr = 'thread:' + tid
try:
- return next(query.search_threads())
+ return next(self._db_ro().threads(querystr, exclude_tags = self._exclude_tags))
except StopIteration:
errmsg = 'no thread with id %s exists!' % tid
raise NonexistantObjectError(errmsg)
@@ -284,25 +271,26 @@ class DBManager:
returns all tagsstrings used in the database
:rtype: list of str
"""
- db = Database(path=self.path)
- return [t for t in db.get_all_tags()]
+ # XXX should be set
+ return list(self._db_ro().tags)
def get_named_queries(self):
"""
returns the named queries stored in the database.
:rtype: dict (str -> str) mapping alias to full query string
"""
- db = Database(path=self.path)
- return {k[6:]: v for k, v in db.get_configs('query.')}
+ q_prefix = 'query.'
+
+ db = self._db_ro()
+ queries = filter(lambda k: k.startswith(q_prefix), db.config)
+ return { q[len(q_prefix):] : db.config[q] for q in queries }
- def async_(self, cbl, fun):
+ def async_(self, seq, fun):
"""
return a pair (pipe, process) so that the process writes
`fun(a)` to the pipe for each element `a` in the iterable returned
- by the callable `cbl`.
+ by the callable `seq`.
- :param cbl: a function returning something iterable
- :type cbl: callable
:param fun: an unary translation function
:type fun: callable
:rtype: (:class:`multiprocessing.Pipe`,
@@ -317,7 +305,7 @@ class DBManager:
pipe = multiprocessing.Pipe(False)
receiver, sender = pipe
- process = FillPipeProcess(cbl(), stdout[1], stderr[1], pipe, fun)
+ process = FillPipeProcess(seq, stdout[1], stderr[1], pipe, fun)
process.start()
self.processes.append(process)
logging.debug('Worker process %s spawned', process.pid)
@@ -364,7 +352,7 @@ class DBManager:
sender.close()
return receiver, process
- def get_threads(self, querystring, sort='newest_first', exclude_tags=None):
+ def get_threads(self, querystring, sort='newest_first', exclude_tags = frozenset()):
"""
asynchronously look up thread ids matching `querystring`.
@@ -375,35 +363,21 @@ class DBManager:
:type query: str
:param exclude_tags: Tags to exclude by default unless included in the
search
- :type exclude_tags: list of str
+ :type exclude_tags: set of str
:returns: a pipe together with the process that asynchronously
writes to it.
:rtype: (:class:`multiprocessing.Pipe`,
:class:`multiprocessing.Process`)
"""
+ # TODO: use a symbolic constant for this
assert sort in self._sort_orders
- q = self.query(querystring)
- q.set_sort(self._sort_orders[sort])
- if exclude_tags:
- for tag in exclude_tags:
- q.exclude_tag(tag)
- return self.async_(q.search_threads, (lambda a: a.get_thread_id()))
-
- def query(self, querystring):
- """
- creates :class:`notmuch.Query` objects on demand
- :param querystring: The query string to use for the lookup
- :type query: str.
- :returns: :class:`notmuch.Query` -- the query object.
- """
- mode = Database.MODE.READ_ONLY
- db = Database(path=self.path, mode=mode)
- q = db.create_query(querystring)
- # add configured exclude tags
- for tag in settings.get('exclude_tags'):
- q.exclude_tag(tag)
- return q
+ db = self._db_ro()
+ sort = self._sort_orders[sort]
+ exclude_tags = self._exclude_tags | exclude_tags
+
+ return self.async_(db.threads(querystring, sort = sort, exclude_tags = exclude_tags),
+ lambda t: t.threadid)
def add_message(self, path, tags):
"""