# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file import email import email.charset as charset import email.policy import logging import mailcap import os import tempfile from datetime import datetime from .attachment import Attachment from .. import crypto from .. import helper from ..errors import GPGProblem from ..helper import parse_mailcap_nametemplate from ..helper import split_commandstring from ..settings.const import settings charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') _APP_PGP_SIG = 'application/pgp-signature' _APP_PGP_ENC = 'application/pgp-encrypted' _TEXT_PLAIN = 'text/plain' def _render_part(part, field_key='copiousoutput'): """ renders a non-multipart email part into displayable plaintext by piping its payload through an external script. The handler itself is determined by the mailcap entry for this part's ctype. """ ctype = part.get_content_type() raw_payload = part.get_content() rendered_payload = None # get mime handler _, entry = settings.mailcap_find_match(ctype, key=field_key) if entry is not None: if isinstance(raw_payload, str): raw_payload = raw_payload.encode('utf-8') tempfile_name = None stdin = None handler_raw_commandstring = entry['view'] # in case the mailcap defined command contains no '%s', # we pipe the files content to the handling command via stdin if '%s' in handler_raw_commandstring: # open tempfile, respect mailcaps nametemplate nametemplate = entry.get('nametemplate', '%s') prefix, suffix = parse_mailcap_nametemplate(nametemplate) with tempfile.NamedTemporaryFile( delete=False, prefix=prefix, suffix=suffix) \ as tmpfile: tmpfile.write(raw_payload) tempfile_name = tmpfile.name else: stdin = raw_payload # read parameter, create handler command parms = tuple('='.join(p) for p in part.get_params()) # create and call external command cmd = mailcap.subst(entry['view'], ctype, filename=tempfile_name, plist=parms) logging.debug('command: %s', cmd) logging.debug('parms: %s', str(parms)) cmdlist = split_commandstring(cmd) # call handler stdout, _, _ = helper.call_cmd(cmdlist, stdin=stdin) if stdout: rendered_payload = stdout # remove tempfile if tempfile_name: os.unlink(tempfile_name) elif part.get_content_maintype() == 'text': # return text parts without a handler as-is rendered_payload = raw_payload if not isinstance(rendered_payload, str): rendered_payload = rendered_payload.decode('utf-8', errors = 'backslashreplace') return rendered_payload class _MessageHeaders: _msg = None def __init__(self, msg): self._msg = msg def __contains__(self, key): return key in self._msg def __getitem__(self, key): if not key in self._msg: raise KeyError(key) return self._msg.get_all(key) def keys(self): return self._msg.keys() def items(self): return self._msg.items() class _MimeTree: _part = None is_signed = False is_encrypted = False sig_valid = None sig_trusted = None signer_id = None crypt_error = None is_alternative = False headers = None children = None attachment = None def __init__(self, part, session_keys = None): self._part = part self.headers = _MessageHeaders(self._part) if part.is_multipart(): st = part.get_content_subtype() # handle signed/encrypted messages if st == 'signed' and part.get_param('protocol') == _APP_PGP_SIG: return self._handle_signed_pgp(session_keys) elif st == 'encrypted': return self._handle_encrypted(session_keys) # for all other cases, we just put all the sub-parts into children # multipart/alternative are flagged as such # everything else is treated as multipart/mixed if st == 'alternative': self.is_alternative = True children = [] for part in part.iter_parts(): children.append(_MimeTree(part, session_keys)) self.children = children else: cd = part.get_content_disposition() fn = part.get_filename() if cd == 'attachment' or fn is not None: self.attachment = Attachment(part) def render_str(self, alt_preference = None): if self.children is not None: if self.is_alternative and len(self.children) > 0: child = None for ch in self.children: if ch.content_type == alt_preference: child = ch break if child is None: child = self.children[0] return child.render_str(alt_preference) parts = [] for ch in self.children: part = ch.render_str(alt_preference) if part: parts.append(part) return '\n'.join(parts) content = self._part.get_content() # no processing for plaintext # XXX we may want to process plaintext as well if self.content_type == _TEXT_PLAIN: return content return _render_part(self._part) @property def raw_data(self): return self._part.as_bytes() @property def content_type(self): return self._part.get_content_type() @property def content_maintype(self): return self._part.get_content_maintype() @property def content_subtype(self): return self._part.get_content_subtype() def _handle_signed_pgp(self, session_keys): """ Handle PGP-signed data. RFC 3156 is quite strict: * exactly two messages * the second is of type 'application/pgp-signature' * the second contains the detached signature """ self.is_signed = True payload = list(self._part.iter_parts()) if len(payload) != 2: self.crypt_error = 'expected exactly two messages, got %d' % len(payload) return self.children = [_MimeTree(payload[0], session_keys)] ct = payload[1].get_content_type() if ct != _APP_PGP_SIG: self.crypt_error = 'expected Content-Type: {0}, got: {1}'.format( _APP_PGP_SIG, ct) return # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message # with 'PGP-'. maybe we should be more permissive here, or maybe not, this # is crypto stuff... micalg = self._part.get_param('micalg', '') if not micalg.startswith('pgp-'): self.crypt_error = 'expected micalg=pgp-..., got: {0}'.format(micalg) return part_data = payload[0].as_bytes() sig_data = payload[1].get_content() # verify the signature sig = None gpg_err = None try: sigs = crypto.verify_detached(part_data, sig_data) self.sig_valid = True if len(sigs) == 1: sig = sigs[0] else: gpg_err = 'exactly one signature expected' except GPGProblem as e: self.sig_valid = False gpg_err = str(e) # get the signer if sig is not None: self.signer_id, self.sig_trusted = self._sig_check_key(sig.fpr) if gpg_err: self.crypt_error = gpg_err def _sig_check_key(self, fingerprint): try: key = crypto.get_key(fingerprint) for uid in key.uids: if crypto.check_uid_validity(key, uid.email): return uid.uid, True # No trusted uid found sig_from = key.uids[0].uid except GPGProblem: sig_from = fingerprint return sig_from, False def _handle_encrypted(self, session_keys): """ Handle encrypted messages. RFC 3156 is quite strict: * exactly two parts * the first is of type 'application/pgp-encrypted' * the first contains 'Version: 1' * the second is of type 'application/octet-stream' * the second contains the encrypted and possibly signed data """ self.is_encrypted = True payload = list(self._part.iter_parts()) if len(payload) != 2: self.crypt_error = 'expected exactly two messages, got %d' % len(payload) return part_control = payload[0] part_body = payload[1] ct = part_control.get_content_type() if ct != _APP_PGP_ENC: self.crypt_error = 'expected Content-Type: %s, got: %s' % (_APP_PGP_ENC, ct) return want = 'application/octet-stream' ct = part_body.get_content_type() if ct != want: self.crypt_error = 'expected Content-Type: %s, got: %s' % (want, ct) return payload = part_body.get_content() try: sigs, d = crypto.decrypt_verify(payload, session_keys) except GPGProblem as e: # signature verification failures end up here too if the combined # method is used, currently this prevents the interpretation of the # recovered plain text mail. maybe that's a feature. self.crypt_error = 'Failed to decrypt message: %s' % str(e) sigs = None d = None if d: child = email.message_from_bytes(d, policy = email.policy.SMTP) self.children = [_MimeTree(child, session_keys)] if sigs: self.is_signed = True self.sig_valid = True if len(sigs) == 1: self.signer_id, self.sig_trusted = self._sig_check_key(sigs[0].fpr) class Message: """ a persistent notmuch message object. It it uses a :class:`~alot.db.DBManager` for cached manipulation and lazy lookups. """ """the :class:`~alot.db.Thread` this Message belongs to""" thread = None """value of the Date header value as :class:`~datetime.datetime`""" date = None """value of the Message-Id header (str)""" id = None """Paths to all files corresponding to this message""" filenames = None """this message's depth in the thread tree""" depth = None """A list of replies to this message""" replies = None """ This message parent in the list (i.e. the message this message is a reply to). None when this message is top-level. """ parent = None """ The object providing access to the email's headers. """ headers = None """ A MimeTree object containing the body of the message. """ body = None def __init__(self, dbman, thread, msg, depth): """ :param dbman: db manager that is used for further lookups :type dbman: alot.db.DBManager :param thread: this messages thread :type thread: :class:`~alot.db.Thread` :param msg: the wrapped message :type msg: notmuch.database.Message :param depth: depth of this message in the thread tree (0 for toplevel messages, 1 for their replies etc.) :type depth int """ self._dbman = dbman self.id = msg.messageid self.thread = thread self.depth = depth try: self.date = datetime.fromtimestamp(msg.date) except ValueError: self.date = None self.filenames = list(msg.filenamesb()) if len(self.filenames) == 0: raise ValueError('No filenames for a message returned') session_keys = [] for name, value in msg.properties.getall('session-key', exact = True): session_keys.append(value) self._email = self._load_email(session_keys) self.body = _MimeTree(self._email, session_keys) self.headers = self.body.headers self._tags = set(msg.tags) sender = self._email.get('From') if sender is None: sender = self._email.get('Sender') if sender: self._from = sender elif 'draft' in self._tags: acc = settings.get_accounts()[0] self._from = '"{}" <{}>'.format(acc.realname, str(acc.address)) else: self._from = '"Unknown" <>' def __str__(self): """prettyprint the message""" aname, aaddress = self.get_author() if not aname: aname = aaddress return "%s (%s)" % (aname, self.get_datestring()) def __hash__(self): """needed for sets of Messages""" return hash(self.id) def __eq__(self, other): if isinstance(other, type(self)): return self.id == other.id return NotImplemented @property def filename(self): return self.filenames[0] def _load_email(self, session_keys): warning = b"Subject: Caution!\n"\ b"Message file is no longer accessible:\n%s" % self.filename try: with open(self.filename, 'rb') as f: mail = email.message_from_bytes(f.read(), policy = email.policy.SMTP) except IOError: mail = email.message_from_string( warning, policy=email.policy.SMTP) return mail def as_bytes(self): return self._email.as_bytes() def get_email(self): """returns :class:`email.email.EmailMessage` for this message""" return self._email def get_tags(self): """returns tags attached to this message as set of strings""" return self._tags def get_datestring(self): """ returns reformated datestring for this message. It uses :meth:`SettingsManager.represent_datetime` to represent this messages `Date` header :rtype: str """ if self.date is None: return None return settings.represent_datetime(self.date) def get_author(self): """ returns realname and address of this messages author :rtype: (str,str) """ return email.utils.parseaddr(self._from) def add_tags(self, tags, afterwards=None, remove_rest=False): """ adds tags to message .. note:: This only adds the requested operation to this objects :class:`DBManager's ` write queue. You need to call :meth:`~alot.db.DBManager.flush` to write out. :param tags: a set of tags to be added :type tags: set of str :param afterwards: callback that gets called after successful application of this tagging operation :type afterwards: callable :param remove_rest: remove all other tags :type remove_rest: bool """ def myafterwards(): if remove_rest: self._tags = set(tags) else: self._tags = self._tags.union(tags) if callable(afterwards): afterwards() self._dbman.tag('id:' + self.id, tags, afterwards=myafterwards, remove_rest=remove_rest) self._tags = self._tags.union(tags) def remove_tags(self, tags, afterwards=None): """remove tags from message .. note:: This only adds the requested operation to this objects :class:`DBManager's ` write queue. You need to call :meth:`~alot.db.DBManager.flush` to actually out. :param tags: a set of tags to be removed :type tags: set of str :param afterwards: callback that gets called after successful application of this tagging operation :type afterwards: callable """ def myafterwards(): self._tags = self._tags.difference(tags) if callable(afterwards): afterwards() self._dbman.untag('id:' + self.id, tags, myafterwards) def iter_attachments(self): """ Iterate over all the attachments in this message. """ def tree_walk(mime_tree): if mime_tree.attachment: yield mime_tree.attachment if mime_tree.children is not None: for ch in mime_tree.children: yield from tree_walk(ch) yield from tree_walk(self.body) def get_body_text(self): """ Returns a string view of a Message. This consults :ref:`prefer_plaintext ` to determine if a "text/plain" alternative is preferred over a "text/html" part. :returns: The combined text of any parts to be used :rtype: str """ if settings.get('prefer_plaintext'): alt_preference = 'text/plain' else: alt_preference = 'text/html' return self.body.render_str(alt_preference) def matches(self, querystring): """tests if this messages is in the resultset for `querystring`""" searchfor = '( {} ) AND id:{}'.format(querystring, self.id) return self._dbman.count_messages(searchfor) > 0 def parents(self): """ A generator iterating over this message's parents up to the topmost level. """ m = self.parent while m: yield m m = m.parent