# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file import codecs import email import email.charset as charset import email.policy import logging import os import subprocess from datetime import datetime from urwid.util import detected_encoding from ..mail.attachment import Attachment from .. import crypto from .. import helper from ..errors import GPGProblem from ..settings.const import settings from ..utils.mailcap import MailcapHandler charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') _APP_PGP_SIG = 'application/pgp-signature' _APP_PGP_ENC = 'application/pgp-encrypted' def _render_part_external(payload, ctype, params, filename): """ renders a non-multipart email part into displayable plaintext by piping its payload through an external script. The handler itself is determined by the mailcap entry for this part's ctype. """ h = MailcapHandler(payload, ctype, params, filename, 'copiousoutput') if not h or h.needs_terminal: return def decode(buf): return buf.decode(detected_encoding, errors = 'backslashreplace') with h: logging.debug('Rendering part %s: %s', ctype, h.cmd) try: result = subprocess.run(h.cmd, shell = True, check = True, capture_output = True, input = h.stdin, stdin = None if h.stdin else subprocess.DEVNULL) except subprocess.CalledProcessError as e: logging.error('Calling mailcap handler "%s" failed with code %d: %s', h.cmd, e.returncode, decode(e.stderr)) return None return decode(result.stdout) class _MessageHeaders: _msg = None def __init__(self, msg): self._msg = msg def __contains__(self, key): return key in self._msg def __getitem__(self, key): if not key in self._msg: raise KeyError(key) return self._msg.get_all(key) def keys(self): return self._msg.keys() def items(self): return self._msg.items() class _MimeTree: _part = None # content-type: (maintype, subtype) # may differ from that indicated in _part, # e.g. when it reports octet-stream, but we guess # something more specific _ctype_val = None is_signed = False is_encrypted = False sig_valid = None sig_trusted = None signer_id = None crypt_error = None is_alternative = False headers = None children = None attachment = None def __init__(self, part, session_keys = None): self._part = part self.headers = _MessageHeaders(self._part) if part.is_multipart(): st = part.get_content_subtype() # handle signed/encrypted messages if st == 'signed' and part.get_param('protocol') == _APP_PGP_SIG: return self._handle_signed_pgp(session_keys) elif st == 'encrypted': return self._handle_encrypted(session_keys) # for all other cases, we just put all the sub-parts into children # multipart/alternative are flagged as such # everything else is treated as multipart/mixed if st == 'alternative': self.is_alternative = True children = [] for part in part.iter_parts(): children.append(_MimeTree(part, session_keys)) self.children = children else: cd = part.get_content_disposition() fn = part.get_filename() if cd == 'attachment' or fn is not None: data = part.get_content() self.attachment = Attachment(data, self.content_type, fn, part.get_params()) if self.content_maintype == 'text': self._fixup_charset() def __str__(self): return 'MimePart(%s)' % self.content_type def _fixup_charset(self): """ If a text MIME part declares an invalid charset, replace it with UTF-8 """ charset = self._part.get_param('charset') if charset is None: return try: codecs.lookup(charset) except LookupError: self._part.set_param('charset', 'utf-8') def render_str(self, alt_preference = None): if self.children is not None: if self.is_alternative and len(self.children) > 0: child = None for ch in self.children: if ch.content_type == alt_preference: child = ch break if child is None: child = self.children[0] return child.render_str(alt_preference) parts = [] for ch in self.children: part = ch.render_str(alt_preference) if part: parts.append(part) return '\n'.join(parts) content = self._part.get_content() # try procesing content with an external program rendered = _render_part_external(content, self.content_type, self._part.get_params(), 'copiousoutput') if rendered: return rendered # return text parts without a handler as-is if self.content_maintype == 'text': if not isinstance(content, str): content = content.decode('utf-8', errors = 'backslashreplace') return content return None @property def raw_data(self): return self._part.as_bytes() @property def _ctype(self): if self._ctype_val is None: ctype = self._part.get_content_type() # replace underspecified mime description by a better guess if ctype in ['octet/stream', 'application/octet-stream', 'application/octetstream']: ctype = helper.guess_mimetype(self._part.get_content()) logging.debug('Overriding octet-stream content type to %s', ctype) maintype, _, subtype = ctype.partition('/') self._ctype_val = (maintype, subtype) return self._ctype_val @property def content_type(self): return '/'.join(self._ctype) @property def content_maintype(self): return self._ctype[0] @property def content_subtype(self): return self._ctype[1] @property def filename(self): return self._part.get_filename() def _handle_signed_pgp(self, session_keys): """ Handle PGP-signed data. RFC 3156 is quite strict: * exactly two messages * the second is of type 'application/pgp-signature' * the second contains the detached signature """ self.is_signed = True payload = list(self._part.iter_parts()) if len(payload) != 2: self.crypt_error = 'expected exactly two messages, got %d' % len(payload) return self.children = [_MimeTree(payload[0], session_keys)] ct = payload[1].get_content_type() if ct != _APP_PGP_SIG: self.crypt_error = 'expected Content-Type: {0}, got: {1}'.format( _APP_PGP_SIG, ct) return # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message # with 'PGP-'. maybe we should be more permissive here, or maybe not, this # is crypto stuff... micalg = self._part.get_param('micalg', '') if not micalg.startswith('pgp-'): self.crypt_error = 'expected micalg=pgp-..., got: {0}'.format(micalg) return part_data = payload[0].as_bytes() sig_data = payload[1].get_content() # verify the signature sig = None gpg_err = None try: sigs = crypto.verify_detached(part_data, sig_data) self.sig_valid = True if len(sigs) == 1: sig = sigs[0] else: gpg_err = 'exactly one signature expected' except GPGProblem as e: self.sig_valid = False gpg_err = str(e) # get the signer if sig is not None: self.signer_id, self.sig_trusted = self._sig_check_key(sig.fpr) if gpg_err: self.crypt_error = gpg_err def _sig_check_key(self, fingerprint): try: key = crypto.get_key(fingerprint) for uid in key.uids: if crypto.check_uid_validity(key, uid.email): return uid.uid, True # No trusted uid found sig_from = key.uids[0].uid except GPGProblem: sig_from = fingerprint return sig_from, False def _handle_encrypted(self, session_keys): """ Handle encrypted messages. RFC 3156 is quite strict: * exactly two parts * the first is of type 'application/pgp-encrypted' * the first contains 'Version: 1' * the second is of type 'application/octet-stream' * the second contains the encrypted and possibly signed data """ self.is_encrypted = True payload = list(self._part.iter_parts()) if len(payload) != 2: self.crypt_error = 'expected exactly two messages, got %d' % len(payload) return part_control = payload[0] part_body = payload[1] ct = part_control.get_content_type() if ct != _APP_PGP_ENC: self.crypt_error = 'expected Content-Type: %s, got: %s' % (_APP_PGP_ENC, ct) return want = 'application/octet-stream' ct = part_body.get_content_type() if ct != want: self.crypt_error = 'expected Content-Type: %s, got: %s' % (want, ct) return payload = part_body.get_content() try: sigs, d = crypto.decrypt_verify(payload, session_keys) except GPGProblem as e: # signature verification failures end up here too if the combined # method is used, currently this prevents the interpretation of the # recovered plain text mail. maybe that's a feature. self.crypt_error = 'Failed to decrypt message: %s' % str(e) sigs = None d = None if d: child = email.message_from_bytes(d, policy = email.policy.SMTP) self.children = [_MimeTree(child, session_keys)] if sigs: self.is_signed = True self.sig_valid = True if len(sigs) == 1: self.signer_id, self.sig_trusted = self._sig_check_key(sigs[0].fpr) class Message: """ a persistent notmuch message object. It it uses a :class:`~alot.db.DBManager` for cached manipulation and lazy lookups. """ """the :class:`~alot.db.Thread` this Message belongs to""" thread = None """value of the Date header value as :class:`~datetime.datetime`""" date = None """value of the Message-Id header (str)""" id = None """Paths to all files corresponding to this message""" filenames = None """this message's depth in the thread tree""" depth = None """A list of replies to this message""" replies = None """ This message parent in the list (i.e. the message this message is a reply to). None when this message is top-level. """ parent = None """ The object providing access to the email's headers. """ headers = None """ A MimeTree object containing the body of the message. """ body = None def __init__(self, dbman, thread, msg, depth): """ :param dbman: db manager that is used for further lookups :type dbman: alot.db.DBManager :param thread: this messages thread :type thread: :class:`~alot.db.Thread` :param msg: the wrapped message :type msg: notmuch.database.Message :param depth: depth of this message in the thread tree (0 for toplevel messages, 1 for their replies etc.) :type depth int """ self._dbman = dbman self.id = msg.messageid self.thread = thread self.depth = depth try: self.date = datetime.fromtimestamp(msg.date) except ValueError: self.date = None self.filenames = list(msg.filenamesb()) if len(self.filenames) == 0: raise ValueError('No filenames for a message returned') session_keys = [] for name, value in msg.properties.getall('session-key', exact = True): session_keys.append(value) self._email = self._load_email(session_keys) self.body = _MimeTree(self._email, session_keys) self.headers = self.body.headers self._tags = set(msg.tags) sender = self._email.get('From') if sender is None: sender = self._email.get('Sender') if sender: self._from = sender elif 'draft' in self._tags: acc = settings.get_accounts()[0] self._from = '"{}" <{}>'.format(acc.realname, str(acc.address)) else: self._from = '"Unknown" <>' def __str__(self): """prettyprint the message""" aname, aaddress = self.get_author() if not aname: aname = aaddress return "%s (%s)" % (aname, self.get_datestring()) def __hash__(self): """needed for sets of Messages""" return hash(self.id) def __eq__(self, other): if isinstance(other, type(self)): return self.id == other.id return NotImplemented @property def filename(self): return self.filenames[0] def _load_email(self, session_keys): try: with open(self.filename, 'rb') as f: mail = email.message_from_bytes(f.read(), policy = email.policy.SMTP) except IOError: warning = b"Subject: Caution!\n"\ b"Message file is no longer accessible:\n%s" % self.filename mail = email.message_from_bytes(warning, policy = email.policy.SMTP) return mail def as_bytes(self): return self._email.as_bytes() def get_email(self): """returns :class:`email.email.EmailMessage` for this message""" return self._email def get_tags(self): """returns tags attached to this message as set of strings""" return self._tags def get_datestring(self): """ returns reformated datestring for this message. It uses :meth:`SettingsManager.represent_datetime` to represent this messages `Date` header :rtype: str """ if self.date is None: return None return settings.represent_datetime(self.date) def get_author(self): """ returns realname and address of this messages author :rtype: (str,str) """ return email.utils.parseaddr(self._from) def tags_add(self, tags): """ Asynchronously add tags to message :param tags: a set of tags to be added :type tags: set of str """ def myafterwards(fut): self._tags = self._tags.union(tags) fut = self._dbman.tags_add('id:' + self.id, tags) fut.add_done_callback(myafterwards) return fut def tags_set(self, tags): """ Asynchronously set tags for a message :param tags: a set of new tags to replace the existing set :type tags: set of str """ def myafterwards(fut): self._tags = set(tags) fut = self._dbman.tags_set('id:' + self.id, tags) fut.add_done_callback(myafterwards) return fut def tags_remove(self, tags): """Asynchronously remove tags from message :param tags: a set of tags to be removed :type tags: set of str """ def myafterwards(fut): self._tags = self._tags.difference(tags) fut = self._dbman.tags_remove('id:' + self.id, tags) fut.add_done_callback(myafterwards) return fut def iter_attachments(self): """ Iterate over all the attachments in this message. """ def tree_walk(mime_tree): if mime_tree.attachment: yield mime_tree.attachment if mime_tree.children is not None: for ch in mime_tree.children: yield from tree_walk(ch) yield from tree_walk(self.body) def get_body_text(self): """ Returns a string view of a Message. This consults :ref:`prefer_plaintext ` to determine if a "text/plain" alternative is preferred over a "text/html" part. :returns: The combined text of any parts to be used :rtype: str """ if settings.get('prefer_plaintext'): alt_preference = 'text/plain' else: alt_preference = 'text/html' return self.body.render_str(alt_preference) def matches(self, querystring): """tests if this messages is in the resultset for `querystring`""" searchfor = '( {} ) AND id:{}'.format(querystring, self.id) return self._dbman.count_messages(searchfor) > 0 def parents(self): """ A generator iterating over this message's parents up to the topmost level. """ m = self.parent while m: yield m m = m.parent