# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file import email import email.charset as charset import email.policy from datetime import datetime from notmuch import NullPointerError from . import utils from .utils import extract_body from .utils import decode_header from .attachment import Attachment from .. import helper from ..settings.const import settings charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') class Message: """ a persistent notmuch message object. It it uses a :class:`~alot.db.DBManager` for cached manipulation and lazy lookups. """ """the :class:`~alot.db.Thread` this Message belongs to""" thread = None """value of the Date header value as :class:`~datetime.datetime`""" date = None """value of the Message-Id header (str)""" id = None """this message's depth in the thread tree""" depth = None """A list of replies to this message""" replies = None def __init__(self, dbman, thread, msg, depth): """ :param dbman: db manager that is used for further lookups :type dbman: alot.db.DBManager :param thread: this messages thread :type thread: :class:`~alot.db.Thread` :param msg: the wrapped message :type msg: notmuch.database.Message :param depth: depth of this message in the thread tree (0 for toplevel messages, 1 for their replies etc.) :type depth int """ self._dbman = dbman self.id = msg.get_message_id() self.thread = thread self.depth = depth try: self.date = datetime.fromtimestamp(msg.get_date()) except ValueError: self.date = None self._filename = msg.get_filename() self._email = None # will be read upon first use self._attachments = None # will be read upon first use self._tags = set(msg.get_tags()) self._session_keys = [] for name, value in msg.get_properties("session-key", exact=True): if name == "session-key": self._session_keys.append(value) try: sender = decode_header(msg.get_header('From')) if not sender: sender = decode_header(msg.get_header('Sender')) except NullPointerError: sender = None if sender: self._from = sender elif 'draft' in self._tags: acc = settings.get_accounts()[0] self._from = '"{}" <{}>'.format(acc.realname, str(acc.address)) else: self._from = '"Unknown" <>' def __str__(self): """prettyprint the message""" aname, aaddress = self.get_author() if not aname: aname = aaddress return "%s (%s)" % (aname, self.get_datestring()) def __hash__(self): """needed for sets of Messages""" return hash(self.id) def __eq__(self, other): if isinstance(other, type(self)): return self.id == other.id return NotImplemented def get_email(self): """returns :class:`email.email.EmailMessage` for this message""" path = self.get_filename() warning = "Subject: Caution!\n"\ "Message file is no longer accessible:\n%s" % path if not self._email: try: with open(path, 'rb') as f: self._email = utils.decrypted_message_from_bytes( f.read(), self._session_keys) except IOError: self._email = email.message_from_string( warning, policy=email.policy.SMTP) return self._email def get_filename(self): """returns absolute path of message files location""" return self._filename def get_message_parts(self): """yield all body parts of this message""" for msg in self.get_email().walk(): if not msg.is_multipart(): yield msg def get_tags(self): """returns tags attached to this message as list of strings""" return sorted(self._tags) def get_datestring(self): """ returns reformated datestring for this message. It uses :meth:`SettingsManager.represent_datetime` to represent this messages `Date` header :rtype: str """ if self.date is None: return None return settings.represent_datetime(self.date) def get_author(self): """ returns realname and address of this messages author :rtype: (str,str) """ return email.utils.parseaddr(self._from) def add_tags(self, tags, afterwards=None, remove_rest=False): """ adds tags to message .. note:: This only adds the requested operation to this objects :class:`DBManager's ` write queue. You need to call :meth:`~alot.db.DBManager.flush` to write out. :param tags: a list of tags to be added :type tags: list of str :param afterwards: callback that gets called after successful application of this tagging operation :type afterwards: callable :param remove_rest: remove all other tags :type remove_rest: bool """ def myafterwards(): if remove_rest: self._tags = set(tags) else: self._tags = self._tags.union(tags) if callable(afterwards): afterwards() self._dbman.tag('id:' + self.id, tags, afterwards=myafterwards, remove_rest=remove_rest) self._tags = self._tags.union(tags) def remove_tags(self, tags, afterwards=None): """remove tags from message .. note:: This only adds the requested operation to this objects :class:`DBManager's ` write queue. You need to call :meth:`~alot.db.DBManager.flush` to actually out. :param tags: a list of tags to be added :type tags: list of str :param afterwards: callback that gets called after successful application of this tagging operation :type afterwards: callable """ def myafterwards(): self._tags = self._tags.difference(tags) if callable(afterwards): afterwards() self._dbman.untag('id:' + self.id, tags, myafterwards) def get_attachments(self): """ returns messages attachments Derived from the leaves of the email mime tree that and are not part of :rfc:`2015` syntax for encrypted/signed mails and either have :mailheader:`Content-Disposition` `attachment` or have :mailheader:`Content-Disposition` `inline` but specify a filename (as parameter to `Content-Disposition`). :rtype: list of :class:`Attachment` """ if not self._attachments: self._attachments = [] for part in self.get_message_parts(): cd = part.get('Content-Disposition', '') filename = part.get_filename() ct = part.get_content_type() # replace underspecified mime description by a better guess if ct in ['octet/stream', 'application/octet-stream']: content = part.get_payload(decode=True) ct = helper.guess_mimetype(content) if (self._attachments and self._attachments[-1].get_content_type() == 'application/pgp-encrypted'): self._attachments.pop() if cd.lower().startswith('attachment'): if ct.lower() not in ['application/pgp-signature']: self._attachments.append(Attachment(part)) elif cd.lower().startswith('inline'): if (filename is not None and ct.lower() != 'application/pgp'): self._attachments.append(Attachment(part)) return self._attachments def get_body_text(self): """ returns bodystring extracted from this mail """ # TODO: allow toggle commands to decide which part is considered body return extract_body(self.get_email()) def matches(self, querystring): """tests if this messages is in the resultset for `querystring`""" searchfor = '( {} ) AND id:{}'.format(querystring, self.id) return self._dbman.count_messages(searchfor) > 0