# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file import email import email.charset as charset import email.policy import logging import os import subprocess from datetime import datetime from urwid.util import detected_encoding from ..mail.attachment import Attachment from .. import crypto from .. import helper from ..errors import GPGProblem from ..settings.const import settings from ..utils.mailcap import MailcapHandler charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') _APP_PGP_SIG = 'application/pgp-signature' _APP_PGP_ENC = 'application/pgp-encrypted' def _render_part_external(payload, ctype, params, filename): """ renders a non-multipart email part into displayable plaintext by piping its payload through an external script. The handler itself is determined by the mailcap entry for this part's ctype. """ h = MailcapHandler(payload, ctype, params, filename, 'copiousoutput') if not h or h.needs_terminal: return def decode(buf): return buf.decode(detected_encoding, errors = 'backslashreplace') with h: logging.debug('Rendering part %s: %s', ctype, h.cmd) try: result = subprocess.run(h.cmd, shell = True, check = True, capture_output = True, input = h.stdin, stdin = None if h.stdin else subprocess.DEVNULL) except subprocess.CalledProcessError as e: logging.error('Calling mailcap handler "%s" failed with code %d: %s', h.cmd, e.returncode, decode(e.stderr)) return None return decode(result.stdout) class _MessageHeaders: _msg = None def __init__(self, msg): self._msg = msg def __contains__(self, key): return key in self._msg def __getitem__(self, key): if not key in self._msg: raise KeyError(key) return self._msg.get_all(key) def keys(self): return self._msg.keys() def items(self): return self._msg.items() class _MimeTree: _part = None # content-type: (maintype, subtype) # may differ from that indicated in _part, # e.g. when it reports octet-stream, but we guess # something more specific _ctype_val = None is_signed = False is_encrypted = False sig_valid = None sig_trusted = None signer_id = None crypt_error = None is_alternative = False headers = None children = None attachment = None def __init__(self, part, session_keys = None): self._part = part self.headers = _MessageHeaders(self._part) if part.is_multipart(): st = part.get_content_subtype() # handle signed/encrypted messages if st == 'signed' and part.get_param('protocol') == _APP_PGP_SIG: return self._handle_signed_pgp(session_keys) elif st == 'encrypted': return self._handle_encrypted(session_keys) # for all other cases, we just put all the sub-parts into children # multipart/alternative are flagged as such # everything else is treated as multipart/mixed if st == 'alternative': self.is_alternative = True children = [] for part in part.iter_parts(): children.append(_MimeTree(part, session_keys)) self.children = children else: cd = part.get_content_disposition() fn = part.get_filename() if cd == 'attachment' or fn is not None: data = part.get_content() self.attachment = Attachment(data, self.content_type, fn, part.get_params()) def __str__(self): return 'MimePart(%s)' % self.content_type def render_str(self, alt_preference = None): if self.children is not None: if self.is_alternative and len(self.children) > 0: child = None for ch in self.children: if ch.content_type == alt_preference: child = ch break if child is None: child = self.children[0] return child.render_str(alt_preference) parts = [] for ch in self.children: part = ch.render_str(alt_preference) if part: parts.append(part) return '\n'.join(parts) content = self._part.get_content() # try procesing content with an external program rendered = _render_part_external(content, self.content_type, self._part.get_params(), 'copiousoutput') if rendered: return rendered # return text parts without a handler as-is if self.content_maintype == 'text': if not isinstance(content, str): content = content.decode('utf-8', errors = 'backslashreplace') return content return None @property def raw_data(self): return self._part.as_bytes() @property def _ctype(self): if self._ctype_val is None: ctype = self._part.get_content_type() # replace underspecified mime description by a better guess if ctype in ['octet/stream', 'application/octet-stream', 'application/octetstream']: ctype = helper.guess_mimetype(self._part.get_content()) logging.debug('Overriding octet-stream content type to %s', ctype) maintype, _, subtype = ctype.partition('/') self._ctype_val = (maintype, subtype) return self._ctype_val @property def content_type(self): return '/'.join(self._ctype) @property def content_maintype(self): return self._ctype[0] @property def content_subtype(self): return self._ctype[1] @property def filename(self): return self._part.get_filename() def _handle_signed_pgp(self, session_keys): """ Handle PGP-signed data. RFC 3156 is quite strict: * exactly two messages * the second is of type 'application/pgp-signature' * the second contains the detached signature """ self.is_signed = True payload = list(self._part.iter_parts()) if len(payload) != 2: self.crypt_error = 'expected exactly two messages, got %d' % len(payload) return self.children = [_MimeTree(payload[0], session_keys)] ct = payload[1].get_content_type() if ct != _APP_PGP_SIG: self.crypt_error = 'expected Content-Type: {0}, got: {1}'.format( _APP_PGP_SIG, ct) return # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message # with 'PGP-'. maybe we should be more permissive here, or maybe not, this # is crypto stuff... micalg = self._part.get_param('micalg', '') if not micalg.startswith('pgp-'): self.crypt_error = 'expected micalg=pgp-..., got: {0}'.format(micalg) return part_data = payload[0].as_bytes() sig_data = payload[1].get_content() # verify the signature sig = None gpg_err = None try: sigs = crypto.verify_detached(part_data, sig_data) self.sig_valid = True if len(sigs) == 1: sig = sigs[0] else: gpg_err = 'exactly one signature expected' except GPGProblem as e: self.sig_valid = False gpg_err = str(e) # get the signer if sig is not None: self.signer_id, self.sig_trusted = self._sig_check_key(sig.fpr) if gpg_err: self.crypt_error = gpg_err def _sig_check_key(self, fingerprint): try: key = crypto.get_key(fingerprint) for uid in key.uids: if crypto.check_uid_validity(key, uid.email): return uid.uid, True # No trusted uid found sig_from = key.uids[0].uid except GPGProblem: sig_from = fingerprint return sig_from, False def _handle_encrypted(self, session_keys): """ Handle encrypted messages. RFC 3156 is quite strict: * exactly two parts * the first is of type 'application/pgp-encrypted' * the first contains 'Version: 1' * the second is of type 'application/octet-stream' * the second contains the encrypted and possibly signed data """ self.is_encrypted = True payload = list(self._part.iter_parts()) if len(payload) != 2: self.crypt_error = 'expected exactly two messages, got %d' % len(payload) return part_control = payload[0] part_body = payload[1] ct = part_control.get_content_type() if ct != _APP_PGP_ENC: self.crypt_error = 'expected Content-Type: %s, got: %s' % (_APP_PGP_ENC, ct) return want = 'application/octet-stream' ct = part_body.get_content_type() if ct != want: self.crypt_error = 'expected Content-Type: %s, got: %s' % (want, ct) return payload = part_body.get_content() try: sigs, d = crypto.decrypt_verify(payload, session_keys) except GPGProblem as e: # signature verification failures end up here too if the combined # method is used, currently this prevents the interpretation of the # recovered plain text mail. maybe that's a feature. self.crypt_error = 'Failed to decrypt message: %s' % str(e) sigs = None d = None if d: child = email.message_from_bytes(d, policy = email.policy.SMTP) self.children = [_MimeTree(child, session_keys)] if sigs: self.is_signed = True self.sig_valid = True if len(sigs) == 1: self.signer_id, self.sig_trusted = self._sig_check_key(sigs[0].fpr) class Message: """ a persistent notmuch message object. It it uses a :class:`~alot.db.DBManager` for cached manipulation and lazy lookups. """ """the :class:`~alot.db.Thread` this Message belongs to""" thread = None """value of the Date header value as :class:`~datetime.datetime`""" date = None """value of the Message-Id header (str)""" id = None """Paths to all files corresponding to this message""" filenames = None """this message's depth in the thread tree""" depth = None """A list of replies to this message""" replies = None """ This message parent in the list (i.e. the message this message is a reply to). None when this message is top-level. """ parent = None """ The object providing access to the email's headers. """ headers = None """ A MimeTree object containing the body of the message. """ body = None def __init__(self, dbman, thread, msg, depth): """ :param dbman: db manager that is used for further lookups :type dbman: alot.db.DBManager :param thread: this messages thread :type thread: :class:`~alot.db.Thread` :param msg: the wrapped message :type msg: notmuch.database.Message :param depth: depth of this message in the thread tree (0 for toplevel messages, 1 for their replies etc.) :type depth int """ self._dbman = dbman self.id = msg.messageid self.thread = thread self.depth = depth try: self.date = datetime.fromtimestamp(msg.date) except ValueError: self.date = None self.filenames = list(msg.filenamesb()) if len(self.filenames) == 0: raise ValueError('No filenames for a message returned') session_keys = [] for name, value in msg.properties.getall('session-key', exact = True): session_keys.append(value) self._email = self._load_email(session_keys) self.body = _MimeTree(self._email, session_keys) self.headers = self.body.headers self._tags = set(msg.tags) sender = self._email.get('From') if sender is None: sender = self._email.get('Sender') if sender: self._from = sender elif 'draft' in self._tags: acc = settings.get_accounts()[0] self._from = '"{}" <{}>'.format(acc.realname, str(acc.address)) else: self._from = '"Unknown" <>' def __str__(self): """prettyprint the message""" aname, aaddress = self.get_author() if not aname: aname = aaddress return "%s (%s)" % (aname, self.get_datestring()) def __hash__(self): """needed for sets of Messages""" return hash(self.id) def __eq__(self, other): if isinstance(other, type(self)): return self.id == other.id return NotImplemented @property def filename(self): return self.filenames[0] def _load_email(self, session_keys): try: with open(self.filename, 'rb') as f: mail = email.message_from_bytes(f.read(), policy = email.policy.SMTP) except IOError: warning = b"Subject: Caution!\n"\ b"Message file is no longer accessible:\n%s" % self.filename mail = email.message_from_bytes(warning, policy = email.policy.SMTP) return mail def as_bytes(self): return self._email.as_bytes() def get_email(self): """returns :class:`email.email.EmailMessage` for this message""" return self._email def get_tags(self): """returns tags attached to this message as set of strings""" return self._tags def get_datestring(self): """ returns reformated datestring for this message. It uses :meth:`SettingsManager.represent_datetime` to represent this messages `Date` header :rtype: str """ if self.date is None: return None return settings.represent_datetime(self.date) def get_author(self): """ returns realname and address of this messages author :rtype: (str,str) """ return email.utils.parseaddr(self._from) def tags_add(self, tags): """ Asynchronously add tags to message :param tags: a set of tags to be added :type tags: set of str """ def myafterwards(fut): self._tags = self._tags.union(tags) fut = self._dbman.tags_add('id:' + self.id, tags) fut.add_done_callback(myafterwards) return fut def tags_set(self, tags): """ Asynchronously set tags for a message :param tags: a set of new tags to replace the existing set :type tags: set of str """ def myafterwards(fut): self._tags = set(tags) fut = self._dbman.tags_set('id:' + self.id, tags) fut.add_done_callback(myafterwards) return fut def tags_remove(self, tags): """Asynchronously remove tags from message :param tags: a set of tags to be removed :type tags: set of str """ def myafterwards(fut): self._tags = self._tags.difference(tags) fut = self._dbman.tags_remove('id:' + self.id, tags) fut.add_done_callback(myafterwards) return fut def iter_attachments(self): """ Iterate over all the attachments in this message. """ def tree_walk(mime_tree): if mime_tree.attachment: yield mime_tree.attachment if mime_tree.children is not None: for ch in mime_tree.children: yield from tree_walk(ch) yield from tree_walk(self.body) def get_body_text(self): """ Returns a string view of a Message. This consults :ref:`prefer_plaintext ` to determine if a "text/plain" alternative is preferred over a "text/html" part. :returns: The combined text of any parts to be used :rtype: str """ if settings.get('prefer_plaintext'): alt_preference = 'text/plain' else: alt_preference = 'text/html' return self.body.render_str(alt_preference) def matches(self, querystring): """tests if this messages is in the resultset for `querystring`""" searchfor = '( {} ) AND id:{}'.format(querystring, self.id) return self._dbman.count_messages(searchfor) > 0 def parents(self): """ A generator iterating over this message's parents up to the topmost level. """ m = self.parent while m: yield m m = m.parent