diff options
author | Anton Khirnov <anton@khirnov.net> | 2020-03-05 08:05:29 +0100 |
---|---|---|
committer | Anton Khirnov <anton@khirnov.net> | 2020-03-05 08:05:29 +0100 |
commit | 618a235fa30a6a83aebd1f4bda526deb9e9b630a (patch) | |
tree | 90c353429daa995ec8f7cc88eba31457c1690d06 /alot/db | |
parent | d98af2606e888f4c09c1deb13c6276556d896af5 (diff) |
db/utils: move message decryption code into db/message
It is only called from there, so there is no reason to keep it
elsewhere.
Diffstat (limited to 'alot/db')
-rw-r--r-- | alot/db/message.py | 237 | ||||
-rw-r--r-- | alot/db/utils.py | 285 |
2 files changed, 235 insertions, 287 deletions
diff --git a/alot/db/message.py b/alot/db/message.py index 7b7187e0..566a848b 100644 --- a/alot/db/message.py +++ b/alot/db/message.py @@ -12,11 +12,245 @@ from notmuch import NullPointerError from . import utils from .utils import extract_body from .attachment import Attachment +from .. import crypto from .. import helper +from ..errors import GPGProblem from ..settings.const import settings charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') +X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid' +X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message' + +_APP_PGP_SIG = 'application/pgp-signature' +_APP_PGP_ENC = 'application/pgp-encrypted' + +def _add_signature_headers(mail, sigs, error_msg): + '''Add pseudo headers to the mail indicating whether the signature + verification was successful. + + :param mail: :class:`email.message.Message` the message to entitle + :param sigs: list of :class:`gpg.results.Signature` + :param error_msg: An error message if there is one, or None + :type error_msg: :class:`str` or `None` + ''' + sig_from = '' + sig_known = True + uid_trusted = False + + assert error_msg is None or isinstance(error_msg, str) + + if not sigs: + error_msg = error_msg or 'no signature found' + elif not error_msg: + try: + key = crypto.get_key(sigs[0].fpr) + for uid in key.uids: + if crypto.check_uid_validity(key, uid.email): + sig_from = uid.uid + uid_trusted = True + break + else: + # No trusted uid found, since we did not break from the loop. + sig_from = key.uids[0].uid + except GPGProblem: + sig_from = sigs[0].fpr + sig_known = False + + if error_msg: + msg = 'Invalid: {}'.format(error_msg) + elif uid_trusted: + msg = 'Valid: {}'.format(sig_from) + else: + msg = 'Untrusted: {}'.format(sig_from) + + mail.add_header(X_SIGNATURE_VALID_HEADER, + 'False' if (error_msg or not sig_known) else 'True') + mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg) + +def _get_params(mail, failobj=None, header='content-type', unquote=True): + '''Get Content-Type parameters as dict. + + RFC 2045 specifies that parameter names are case-insensitive, so + we normalize them here. + + :param mail: :class:`email.message.Message` + :param failobj: object to return if no such header is found + :param header: the header to search for parameters, default + :param unquote: unquote the values + :returns: a `dict` containing the parameters + ''' + failobj = failobj or [] + return {k.lower(): v for k, v in mail.get_params(failobj, header, unquote)} + +def _handle_signatures(original, message, params): + """Shared code for handling message signatures. + + RFC 3156 is quite strict: + * exactly two messages + * the second is of type 'application/pgp-signature' + * the second contains the detached signature + + :param original: The original top-level mail. This is required to attache + special headers to + :type original: :class:`email.message.Message` + :param message: The multipart/signed payload to verify + :type message: :class:`email.message.Message` + :param params: the message parameters as returned by :func:`get_params` + :type params: dict[str, str] + """ + malformed = None + if len(message.get_payload()) != 2: + malformed = 'expected exactly two messages, got {0}'.format( + len(message.get_payload())) + else: + ct = message.get_payload(1).get_content_type() + if ct != _APP_PGP_SIG: + malformed = 'expected Content-Type: {0}, got: {1}'.format( + _APP_PGP_SIG, ct) + + # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message + # with 'PGP-'. maybe we should be more permissive here, or maybe not, this + # is crypto stuff... + if not params.get('micalg', 'nothing').startswith('pgp-'): + malformed = 'expected micalg=pgp-..., got: {0}'.format( + params.get('micalg', 'nothing')) + + sigs = [] + if not malformed: + try: + sigs = crypto.verify_detached( + message.get_payload(0).as_bytes(policy=email.policy.SMTP), + message.get_payload(1).get_payload(decode=True)) + except GPGProblem as e: + malformed = str(e) + + _add_signature_headers(original, sigs, malformed) + + +def _handle_encrypted(original, message, session_keys=None): + """Handle encrypted messages helper. + + RFC 3156 is quite strict: + * exactly two messages + * the first is of type 'application/pgp-encrypted' + * the first contains 'Version: 1' + * the second is of type 'application/octet-stream' + * the second contains the encrypted and possibly signed data + + :param original: The original top-level mail. This is required to attache + special headers to + :type original: :class:`email.message.Message` + :param message: The multipart/signed payload to verify + :type message: :class:`email.message.Message` + :param session_keys: a list OpenPGP session keys + :type session_keys: [str] + """ + malformed = False + + ct = message.get_payload(0).get_content_type() + if ct != _APP_PGP_ENC: + malformed = 'expected Content-Type: {0}, got: {1}'.format( + _APP_PGP_ENC, ct) + + want = 'application/octet-stream' + ct = message.get_payload(1).get_content_type() + if ct != want: + malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct) + + if not malformed: + # This should be safe because PGP uses US-ASCII characters only + payload = message.get_payload(1).get_payload().encode('ascii') + try: + sigs, d = crypto.decrypt_verify(payload, session_keys) + except GPGProblem as e: + # signature verification failures end up here too if the combined + # method is used, currently this prevents the interpretation of the + # recovered plain text mail. maybe that's a feature. + malformed = str(e) + else: + n = _decrypted_message_from_bytes(d, session_keys) + + # add the decrypted message to message. note that n contains all + # the attachments, no need to walk over n here. + original.attach(n) + + original.defects.extend(n.defects) + + # there are two methods for both signed and encrypted data, one is + # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the + # 'Combined method'. + if not sigs: + # 'RFC 1847 Encapsulation', the signature is a detached + # signature found in the recovered mime message of type + # multipart/signed. + if X_SIGNATURE_VALID_HEADER in n: + for k in (X_SIGNATURE_VALID_HEADER, + X_SIGNATURE_MESSAGE_HEADER): + original[k] = n[k] + else: + # 'Combined method', the signatures are returned by the + # decrypt_verify function. + + # note that if we reached this point, we know the signatures + # are valid. if they were not valid, the else block of the + # current try would not have been executed + _add_signature_headers(original, sigs, '') + + if malformed: + msg = 'Malformed OpenPGP message: {0}'.format(malformed) + content = email.message_from_string(msg, + _class=email.message.EmailMessage, + policy=email.policy.SMTP) + content.set_charset('utf-8') + original.attach(content) + +def _decrypted_message_from_bytes(bytestring, session_keys = None): + '''Detect and decrypt OpenPGP encrypted data in an email object. If this + succeeds, any mime messages found in the recovered plaintext + message are added to the returned message object. + + :param session_keys: a list OpenPGP session keys + :returns: :class:`email.message.Message` possibly augmented with + decrypted data + ''' + enc = email.message_from_bytes(bytestring, policy = email.policy.SMTP) + + # make sure no one smuggles a token in (data from enc is untrusted) + del enc[X_SIGNATURE_VALID_HEADER] + del enc[X_SIGNATURE_MESSAGE_HEADER] + + if enc.is_multipart(): + p = _get_params(enc) + + # handle OpenPGP signed data + if (enc.get_content_subtype() == 'signed' and + p.get('protocol') == _APP_PGP_SIG): + _handle_signatures(enc, enc, p) + + # handle OpenPGP encrypted data + elif (enc.get_content_subtype() == 'encrypted' and + p.get('protocol') == _APP_PGP_ENC and + 'Version: 1' in enc.get_payload(0).get_payload()): + _handle_encrypted(enc, enc, session_keys) + + # It is also possible to put either of the abov into a multipart/mixed + # segment + elif enc.get_content_subtype() == 'mixed': + sub = enc.get_payload(0) + + if sub.is_multipart(): + p = _get_params(sub) + + if (sub.get_content_subtype() == 'signed' and + p.get('protocol') == _APP_PGP_SIG): + _handle_signatures(enc, sub, p) + elif (sub.get_content_subtype() == 'encrypted' and + p.get('protocol') == _APP_PGP_ENC): + _handle_encrypted(enc, sub, session_keys) + + return enc + class _MessageHeaders: _msg = None @@ -147,8 +381,7 @@ class Message: "Message file is no longer accessible:\n%s" % self.filename try: with open(self.filename, 'rb') as f: - email = utils.decrypted_message_from_bytes( - f.read(), session_keys) + email = _decrypted_message_from_bytes(f.read(), session_keys) except IOError: email = email.message_from_string( warning, policy=email.policy.SMTP) diff --git a/alot/db/utils.py b/alot/db/utils.py index a81c5cc4..020f51d0 100644 --- a/alot/db/utils.py +++ b/alot/db/utils.py @@ -9,302 +9,17 @@ import email.charset as charset import email.policy import email.utils import tempfile -import re import logging import mailcap -import io -import base64 -import quopri -from .. import crypto from .. import helper -from ..errors import GPGProblem from ..settings.const import settings from ..helper import string_sanitize -from ..helper import string_decode from ..helper import parse_mailcap_nametemplate from ..helper import split_commandstring charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') -X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid' -X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message' - -_APP_PGP_SIG = 'application/pgp-signature' -_APP_PGP_ENC = 'application/pgp-encrypted' - - -def add_signature_headers(mail, sigs, error_msg): - '''Add pseudo headers to the mail indicating whether the signature - verification was successful. - - :param mail: :class:`email.message.Message` the message to entitle - :param sigs: list of :class:`gpg.results.Signature` - :param error_msg: An error message if there is one, or None - :type error_msg: :class:`str` or `None` - ''' - sig_from = '' - sig_known = True - uid_trusted = False - - assert error_msg is None or isinstance(error_msg, str) - - if not sigs: - error_msg = error_msg or 'no signature found' - elif not error_msg: - try: - key = crypto.get_key(sigs[0].fpr) - for uid in key.uids: - if crypto.check_uid_validity(key, uid.email): - sig_from = uid.uid - uid_trusted = True - break - else: - # No trusted uid found, since we did not break from the loop. - sig_from = key.uids[0].uid - except GPGProblem: - sig_from = sigs[0].fpr - sig_known = False - - if error_msg: - msg = 'Invalid: {}'.format(error_msg) - elif uid_trusted: - msg = 'Valid: {}'.format(sig_from) - else: - msg = 'Untrusted: {}'.format(sig_from) - - mail.add_header(X_SIGNATURE_VALID_HEADER, - 'False' if (error_msg or not sig_known) else 'True') - mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg) - - -def get_params(mail, failobj=None, header='content-type', unquote=True): - '''Get Content-Type parameters as dict. - - RFC 2045 specifies that parameter names are case-insensitive, so - we normalize them here. - - :param mail: :class:`email.message.Message` - :param failobj: object to return if no such header is found - :param header: the header to search for parameters, default - :param unquote: unquote the values - :returns: a `dict` containing the parameters - ''' - failobj = failobj or [] - return {k.lower(): v for k, v in mail.get_params(failobj, header, unquote)} - - -def _handle_signatures(original, message, params): - """Shared code for handling message signatures. - - RFC 3156 is quite strict: - * exactly two messages - * the second is of type 'application/pgp-signature' - * the second contains the detached signature - - :param original: The original top-level mail. This is required to attache - special headers to - :type original: :class:`email.message.Message` - :param message: The multipart/signed payload to verify - :type message: :class:`email.message.Message` - :param params: the message parameters as returned by :func:`get_params` - :type params: dict[str, str] - """ - malformed = None - if len(message.get_payload()) != 2: - malformed = 'expected exactly two messages, got {0}'.format( - len(message.get_payload())) - else: - ct = message.get_payload(1).get_content_type() - if ct != _APP_PGP_SIG: - malformed = 'expected Content-Type: {0}, got: {1}'.format( - _APP_PGP_SIG, ct) - - # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message - # with 'PGP-'. maybe we should be more permissive here, or maybe not, this - # is crypto stuff... - if not params.get('micalg', 'nothing').startswith('pgp-'): - malformed = 'expected micalg=pgp-..., got: {0}'.format( - params.get('micalg', 'nothing')) - - sigs = [] - if not malformed: - try: - sigs = crypto.verify_detached( - message.get_payload(0).as_bytes(policy=email.policy.SMTP), - message.get_payload(1).get_payload(decode=True)) - except GPGProblem as e: - malformed = str(e) - - add_signature_headers(original, sigs, malformed) - - -def _handle_encrypted(original, message, session_keys=None): - """Handle encrypted messages helper. - - RFC 3156 is quite strict: - * exactly two messages - * the first is of type 'application/pgp-encrypted' - * the first contains 'Version: 1' - * the second is of type 'application/octet-stream' - * the second contains the encrypted and possibly signed data - - :param original: The original top-level mail. This is required to attache - special headers to - :type original: :class:`email.message.Message` - :param message: The multipart/signed payload to verify - :type message: :class:`email.message.Message` - :param session_keys: a list OpenPGP session keys - :type session_keys: [str] - """ - malformed = False - - ct = message.get_payload(0).get_content_type() - if ct != _APP_PGP_ENC: - malformed = 'expected Content-Type: {0}, got: {1}'.format( - _APP_PGP_ENC, ct) - - want = 'application/octet-stream' - ct = message.get_payload(1).get_content_type() - if ct != want: - malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct) - - if not malformed: - # This should be safe because PGP uses US-ASCII characters only - payload = message.get_payload(1).get_payload().encode('ascii') - try: - sigs, d = crypto.decrypt_verify(payload, session_keys) - except GPGProblem as e: - # signature verification failures end up here too if the combined - # method is used, currently this prevents the interpretation of the - # recovered plain text mail. maybe that's a feature. - malformed = str(e) - else: - n = decrypted_message_from_bytes(d, session_keys) - - # add the decrypted message to message. note that n contains all - # the attachments, no need to walk over n here. - original.attach(n) - - original.defects.extend(n.defects) - - # there are two methods for both signed and encrypted data, one is - # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the - # 'Combined method'. - if not sigs: - # 'RFC 1847 Encapsulation', the signature is a detached - # signature found in the recovered mime message of type - # multipart/signed. - if X_SIGNATURE_VALID_HEADER in n: - for k in (X_SIGNATURE_VALID_HEADER, - X_SIGNATURE_MESSAGE_HEADER): - original[k] = n[k] - else: - # 'Combined method', the signatures are returned by the - # decrypt_verify function. - - # note that if we reached this point, we know the signatures - # are valid. if they were not valid, the else block of the - # current try would not have been executed - add_signature_headers(original, sigs, '') - - if malformed: - msg = 'Malformed OpenPGP message: {0}'.format(malformed) - content = email.message_from_string(msg, - _class=email.message.EmailMessage, - policy=email.policy.SMTP) - content.set_charset('utf-8') - original.attach(content) - - -def decrypted_message_from_file(handle, session_keys=None): - '''Reads a mail from the given file-like object and returns an email - object, very much like email.message_from_file. In addition to - that OpenPGP encrypted data is detected and decrypted. If this - succeeds, any mime messages found in the recovered plaintext - message are added to the returned message object. - - :param handle: a file-like object - :param session_keys: a list OpenPGP session keys - :returns: :class:`email.message.Message` possibly augmented with - decrypted data - ''' - return decrypted_message_from_message(email.message_from_file(handle, - _class=email.message.EmailMessage), - session_keys) - - -def decrypted_message_from_message(m, session_keys=None): - '''Detect and decrypt OpenPGP encrypted data in an email object. If this - succeeds, any mime messages found in the recovered plaintext - message are added to the returned message object. - - :param m: an email object - :param session_keys: a list OpenPGP session keys - :returns: :class:`email.message.Message` possibly augmented with - decrypted data - ''' - # make sure no one smuggles a token in (data from m is untrusted) - del m[X_SIGNATURE_VALID_HEADER] - del m[X_SIGNATURE_MESSAGE_HEADER] - - if m.is_multipart(): - p = get_params(m) - - # handle OpenPGP signed data - if (m.get_content_subtype() == 'signed' and - p.get('protocol') == _APP_PGP_SIG): - _handle_signatures(m, m, p) - - # handle OpenPGP encrypted data - elif (m.get_content_subtype() == 'encrypted' and - p.get('protocol') == _APP_PGP_ENC and - 'Version: 1' in m.get_payload(0).get_payload()): - _handle_encrypted(m, m, session_keys) - - # It is also possible to put either of the abov into a multipart/mixed - # segment - elif m.get_content_subtype() == 'mixed': - sub = m.get_payload(0) - - if sub.is_multipart(): - p = get_params(sub) - - if (sub.get_content_subtype() == 'signed' and - p.get('protocol') == _APP_PGP_SIG): - _handle_signatures(m, sub, p) - elif (sub.get_content_subtype() == 'encrypted' and - p.get('protocol') == _APP_PGP_ENC): - _handle_encrypted(m, sub, session_keys) - - return m - - -def decrypted_message_from_string(s, session_keys=None): - '''Reads a mail from the given string. This is the equivalent of - :func:`email.message_from_string` which does nothing but to wrap - the given string in a StringIO object and to call - :func:`email.message_from_file`. - - Please refer to the documentation of :func:`message_from_file` for - details. - - ''' - return decrypted_message_from_file(io.StringIO(s), session_keys) - - -def decrypted_message_from_bytes(bytestring, session_keys=None): - """Create a Message from bytes. - - :param bytes bytestring: an email message as raw bytes - :param session_keys: a list OpenPGP session keys - """ - return decrypted_message_from_message( - email.message_from_bytes(bytestring, - _class=email.message.EmailMessage, - policy=email.policy.SMTP), - session_keys) - def render_part(part, field_key='copiousoutput'): """ renders a non-multipart email part into displayable plaintext by piping its |