diff options
author | Anton Khirnov <anton@khirnov.net> | 2020-03-05 08:05:29 +0100 |
---|---|---|
committer | Anton Khirnov <anton@khirnov.net> | 2020-03-05 08:05:29 +0100 |
commit | 618a235fa30a6a83aebd1f4bda526deb9e9b630a (patch) | |
tree | 90c353429daa995ec8f7cc88eba31457c1690d06 /alot/db/message.py | |
parent | d98af2606e888f4c09c1deb13c6276556d896af5 (diff) |
db/utils: move message decryption code into db/message
It is only called from there, so there is no reason to keep it
elsewhere.
Diffstat (limited to 'alot/db/message.py')
-rw-r--r-- | alot/db/message.py | 237 |
1 files changed, 235 insertions, 2 deletions
diff --git a/alot/db/message.py b/alot/db/message.py index 7b7187e0..566a848b 100644 --- a/alot/db/message.py +++ b/alot/db/message.py @@ -12,11 +12,245 @@ from notmuch import NullPointerError from . import utils from .utils import extract_body from .attachment import Attachment +from .. import crypto from .. import helper +from ..errors import GPGProblem from ..settings.const import settings charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') +X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid' +X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message' + +_APP_PGP_SIG = 'application/pgp-signature' +_APP_PGP_ENC = 'application/pgp-encrypted' + +def _add_signature_headers(mail, sigs, error_msg): + '''Add pseudo headers to the mail indicating whether the signature + verification was successful. + + :param mail: :class:`email.message.Message` the message to entitle + :param sigs: list of :class:`gpg.results.Signature` + :param error_msg: An error message if there is one, or None + :type error_msg: :class:`str` or `None` + ''' + sig_from = '' + sig_known = True + uid_trusted = False + + assert error_msg is None or isinstance(error_msg, str) + + if not sigs: + error_msg = error_msg or 'no signature found' + elif not error_msg: + try: + key = crypto.get_key(sigs[0].fpr) + for uid in key.uids: + if crypto.check_uid_validity(key, uid.email): + sig_from = uid.uid + uid_trusted = True + break + else: + # No trusted uid found, since we did not break from the loop. + sig_from = key.uids[0].uid + except GPGProblem: + sig_from = sigs[0].fpr + sig_known = False + + if error_msg: + msg = 'Invalid: {}'.format(error_msg) + elif uid_trusted: + msg = 'Valid: {}'.format(sig_from) + else: + msg = 'Untrusted: {}'.format(sig_from) + + mail.add_header(X_SIGNATURE_VALID_HEADER, + 'False' if (error_msg or not sig_known) else 'True') + mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg) + +def _get_params(mail, failobj=None, header='content-type', unquote=True): + '''Get Content-Type parameters as dict. + + RFC 2045 specifies that parameter names are case-insensitive, so + we normalize them here. + + :param mail: :class:`email.message.Message` + :param failobj: object to return if no such header is found + :param header: the header to search for parameters, default + :param unquote: unquote the values + :returns: a `dict` containing the parameters + ''' + failobj = failobj or [] + return {k.lower(): v for k, v in mail.get_params(failobj, header, unquote)} + +def _handle_signatures(original, message, params): + """Shared code for handling message signatures. + + RFC 3156 is quite strict: + * exactly two messages + * the second is of type 'application/pgp-signature' + * the second contains the detached signature + + :param original: The original top-level mail. This is required to attache + special headers to + :type original: :class:`email.message.Message` + :param message: The multipart/signed payload to verify + :type message: :class:`email.message.Message` + :param params: the message parameters as returned by :func:`get_params` + :type params: dict[str, str] + """ + malformed = None + if len(message.get_payload()) != 2: + malformed = 'expected exactly two messages, got {0}'.format( + len(message.get_payload())) + else: + ct = message.get_payload(1).get_content_type() + if ct != _APP_PGP_SIG: + malformed = 'expected Content-Type: {0}, got: {1}'.format( + _APP_PGP_SIG, ct) + + # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message + # with 'PGP-'. maybe we should be more permissive here, or maybe not, this + # is crypto stuff... + if not params.get('micalg', 'nothing').startswith('pgp-'): + malformed = 'expected micalg=pgp-..., got: {0}'.format( + params.get('micalg', 'nothing')) + + sigs = [] + if not malformed: + try: + sigs = crypto.verify_detached( + message.get_payload(0).as_bytes(policy=email.policy.SMTP), + message.get_payload(1).get_payload(decode=True)) + except GPGProblem as e: + malformed = str(e) + + _add_signature_headers(original, sigs, malformed) + + +def _handle_encrypted(original, message, session_keys=None): + """Handle encrypted messages helper. + + RFC 3156 is quite strict: + * exactly two messages + * the first is of type 'application/pgp-encrypted' + * the first contains 'Version: 1' + * the second is of type 'application/octet-stream' + * the second contains the encrypted and possibly signed data + + :param original: The original top-level mail. This is required to attache + special headers to + :type original: :class:`email.message.Message` + :param message: The multipart/signed payload to verify + :type message: :class:`email.message.Message` + :param session_keys: a list OpenPGP session keys + :type session_keys: [str] + """ + malformed = False + + ct = message.get_payload(0).get_content_type() + if ct != _APP_PGP_ENC: + malformed = 'expected Content-Type: {0}, got: {1}'.format( + _APP_PGP_ENC, ct) + + want = 'application/octet-stream' + ct = message.get_payload(1).get_content_type() + if ct != want: + malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct) + + if not malformed: + # This should be safe because PGP uses US-ASCII characters only + payload = message.get_payload(1).get_payload().encode('ascii') + try: + sigs, d = crypto.decrypt_verify(payload, session_keys) + except GPGProblem as e: + # signature verification failures end up here too if the combined + # method is used, currently this prevents the interpretation of the + # recovered plain text mail. maybe that's a feature. + malformed = str(e) + else: + n = _decrypted_message_from_bytes(d, session_keys) + + # add the decrypted message to message. note that n contains all + # the attachments, no need to walk over n here. + original.attach(n) + + original.defects.extend(n.defects) + + # there are two methods for both signed and encrypted data, one is + # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the + # 'Combined method'. + if not sigs: + # 'RFC 1847 Encapsulation', the signature is a detached + # signature found in the recovered mime message of type + # multipart/signed. + if X_SIGNATURE_VALID_HEADER in n: + for k in (X_SIGNATURE_VALID_HEADER, + X_SIGNATURE_MESSAGE_HEADER): + original[k] = n[k] + else: + # 'Combined method', the signatures are returned by the + # decrypt_verify function. + + # note that if we reached this point, we know the signatures + # are valid. if they were not valid, the else block of the + # current try would not have been executed + _add_signature_headers(original, sigs, '') + + if malformed: + msg = 'Malformed OpenPGP message: {0}'.format(malformed) + content = email.message_from_string(msg, + _class=email.message.EmailMessage, + policy=email.policy.SMTP) + content.set_charset('utf-8') + original.attach(content) + +def _decrypted_message_from_bytes(bytestring, session_keys = None): + '''Detect and decrypt OpenPGP encrypted data in an email object. If this + succeeds, any mime messages found in the recovered plaintext + message are added to the returned message object. + + :param session_keys: a list OpenPGP session keys + :returns: :class:`email.message.Message` possibly augmented with + decrypted data + ''' + enc = email.message_from_bytes(bytestring, policy = email.policy.SMTP) + + # make sure no one smuggles a token in (data from enc is untrusted) + del enc[X_SIGNATURE_VALID_HEADER] + del enc[X_SIGNATURE_MESSAGE_HEADER] + + if enc.is_multipart(): + p = _get_params(enc) + + # handle OpenPGP signed data + if (enc.get_content_subtype() == 'signed' and + p.get('protocol') == _APP_PGP_SIG): + _handle_signatures(enc, enc, p) + + # handle OpenPGP encrypted data + elif (enc.get_content_subtype() == 'encrypted' and + p.get('protocol') == _APP_PGP_ENC and + 'Version: 1' in enc.get_payload(0).get_payload()): + _handle_encrypted(enc, enc, session_keys) + + # It is also possible to put either of the abov into a multipart/mixed + # segment + elif enc.get_content_subtype() == 'mixed': + sub = enc.get_payload(0) + + if sub.is_multipart(): + p = _get_params(sub) + + if (sub.get_content_subtype() == 'signed' and + p.get('protocol') == _APP_PGP_SIG): + _handle_signatures(enc, sub, p) + elif (sub.get_content_subtype() == 'encrypted' and + p.get('protocol') == _APP_PGP_ENC): + _handle_encrypted(enc, sub, session_keys) + + return enc + class _MessageHeaders: _msg = None @@ -147,8 +381,7 @@ class Message: "Message file is no longer accessible:\n%s" % self.filename try: with open(self.filename, 'rb') as f: - email = utils.decrypted_message_from_bytes( - f.read(), session_keys) + email = _decrypted_message_from_bytes(f.read(), session_keys) except IOError: email = email.message_from_string( warning, policy=email.policy.SMTP) |