summaryrefslogtreecommitdiff
path: root/alot/db/message.py
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2020-03-05 08:05:29 +0100
committerAnton Khirnov <anton@khirnov.net>2020-03-05 08:05:29 +0100
commit618a235fa30a6a83aebd1f4bda526deb9e9b630a (patch)
tree90c353429daa995ec8f7cc88eba31457c1690d06 /alot/db/message.py
parentd98af2606e888f4c09c1deb13c6276556d896af5 (diff)
db/utils: move message decryption code into db/message
It is only called from there, so there is no reason to keep it elsewhere.
Diffstat (limited to 'alot/db/message.py')
-rw-r--r--alot/db/message.py237
1 files changed, 235 insertions, 2 deletions
diff --git a/alot/db/message.py b/alot/db/message.py
index 7b7187e0..566a848b 100644
--- a/alot/db/message.py
+++ b/alot/db/message.py
@@ -12,11 +12,245 @@ from notmuch import NullPointerError
from . import utils
from .utils import extract_body
from .attachment import Attachment
+from .. import crypto
from .. import helper
+from ..errors import GPGProblem
from ..settings.const import settings
charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
+X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid'
+X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message'
+
+_APP_PGP_SIG = 'application/pgp-signature'
+_APP_PGP_ENC = 'application/pgp-encrypted'
+
+def _add_signature_headers(mail, sigs, error_msg):
+ '''Add pseudo headers to the mail indicating whether the signature
+ verification was successful.
+
+ :param mail: :class:`email.message.Message` the message to entitle
+ :param sigs: list of :class:`gpg.results.Signature`
+ :param error_msg: An error message if there is one, or None
+ :type error_msg: :class:`str` or `None`
+ '''
+ sig_from = ''
+ sig_known = True
+ uid_trusted = False
+
+ assert error_msg is None or isinstance(error_msg, str)
+
+ if not sigs:
+ error_msg = error_msg or 'no signature found'
+ elif not error_msg:
+ try:
+ key = crypto.get_key(sigs[0].fpr)
+ for uid in key.uids:
+ if crypto.check_uid_validity(key, uid.email):
+ sig_from = uid.uid
+ uid_trusted = True
+ break
+ else:
+ # No trusted uid found, since we did not break from the loop.
+ sig_from = key.uids[0].uid
+ except GPGProblem:
+ sig_from = sigs[0].fpr
+ sig_known = False
+
+ if error_msg:
+ msg = 'Invalid: {}'.format(error_msg)
+ elif uid_trusted:
+ msg = 'Valid: {}'.format(sig_from)
+ else:
+ msg = 'Untrusted: {}'.format(sig_from)
+
+ mail.add_header(X_SIGNATURE_VALID_HEADER,
+ 'False' if (error_msg or not sig_known) else 'True')
+ mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg)
+
+def _get_params(mail, failobj=None, header='content-type', unquote=True):
+ '''Get Content-Type parameters as dict.
+
+ RFC 2045 specifies that parameter names are case-insensitive, so
+ we normalize them here.
+
+ :param mail: :class:`email.message.Message`
+ :param failobj: object to return if no such header is found
+ :param header: the header to search for parameters, default
+ :param unquote: unquote the values
+ :returns: a `dict` containing the parameters
+ '''
+ failobj = failobj or []
+ return {k.lower(): v for k, v in mail.get_params(failobj, header, unquote)}
+
+def _handle_signatures(original, message, params):
+ """Shared code for handling message signatures.
+
+ RFC 3156 is quite strict:
+ * exactly two messages
+ * the second is of type 'application/pgp-signature'
+ * the second contains the detached signature
+
+ :param original: The original top-level mail. This is required to attache
+ special headers to
+ :type original: :class:`email.message.Message`
+ :param message: The multipart/signed payload to verify
+ :type message: :class:`email.message.Message`
+ :param params: the message parameters as returned by :func:`get_params`
+ :type params: dict[str, str]
+ """
+ malformed = None
+ if len(message.get_payload()) != 2:
+ malformed = 'expected exactly two messages, got {0}'.format(
+ len(message.get_payload()))
+ else:
+ ct = message.get_payload(1).get_content_type()
+ if ct != _APP_PGP_SIG:
+ malformed = 'expected Content-Type: {0}, got: {1}'.format(
+ _APP_PGP_SIG, ct)
+
+ # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message
+ # with 'PGP-'. maybe we should be more permissive here, or maybe not, this
+ # is crypto stuff...
+ if not params.get('micalg', 'nothing').startswith('pgp-'):
+ malformed = 'expected micalg=pgp-..., got: {0}'.format(
+ params.get('micalg', 'nothing'))
+
+ sigs = []
+ if not malformed:
+ try:
+ sigs = crypto.verify_detached(
+ message.get_payload(0).as_bytes(policy=email.policy.SMTP),
+ message.get_payload(1).get_payload(decode=True))
+ except GPGProblem as e:
+ malformed = str(e)
+
+ _add_signature_headers(original, sigs, malformed)
+
+
+def _handle_encrypted(original, message, session_keys=None):
+ """Handle encrypted messages helper.
+
+ RFC 3156 is quite strict:
+ * exactly two messages
+ * the first is of type 'application/pgp-encrypted'
+ * the first contains 'Version: 1'
+ * the second is of type 'application/octet-stream'
+ * the second contains the encrypted and possibly signed data
+
+ :param original: The original top-level mail. This is required to attache
+ special headers to
+ :type original: :class:`email.message.Message`
+ :param message: The multipart/signed payload to verify
+ :type message: :class:`email.message.Message`
+ :param session_keys: a list OpenPGP session keys
+ :type session_keys: [str]
+ """
+ malformed = False
+
+ ct = message.get_payload(0).get_content_type()
+ if ct != _APP_PGP_ENC:
+ malformed = 'expected Content-Type: {0}, got: {1}'.format(
+ _APP_PGP_ENC, ct)
+
+ want = 'application/octet-stream'
+ ct = message.get_payload(1).get_content_type()
+ if ct != want:
+ malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct)
+
+ if not malformed:
+ # This should be safe because PGP uses US-ASCII characters only
+ payload = message.get_payload(1).get_payload().encode('ascii')
+ try:
+ sigs, d = crypto.decrypt_verify(payload, session_keys)
+ except GPGProblem as e:
+ # signature verification failures end up here too if the combined
+ # method is used, currently this prevents the interpretation of the
+ # recovered plain text mail. maybe that's a feature.
+ malformed = str(e)
+ else:
+ n = _decrypted_message_from_bytes(d, session_keys)
+
+ # add the decrypted message to message. note that n contains all
+ # the attachments, no need to walk over n here.
+ original.attach(n)
+
+ original.defects.extend(n.defects)
+
+ # there are two methods for both signed and encrypted data, one is
+ # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the
+ # 'Combined method'.
+ if not sigs:
+ # 'RFC 1847 Encapsulation', the signature is a detached
+ # signature found in the recovered mime message of type
+ # multipart/signed.
+ if X_SIGNATURE_VALID_HEADER in n:
+ for k in (X_SIGNATURE_VALID_HEADER,
+ X_SIGNATURE_MESSAGE_HEADER):
+ original[k] = n[k]
+ else:
+ # 'Combined method', the signatures are returned by the
+ # decrypt_verify function.
+
+ # note that if we reached this point, we know the signatures
+ # are valid. if they were not valid, the else block of the
+ # current try would not have been executed
+ _add_signature_headers(original, sigs, '')
+
+ if malformed:
+ msg = 'Malformed OpenPGP message: {0}'.format(malformed)
+ content = email.message_from_string(msg,
+ _class=email.message.EmailMessage,
+ policy=email.policy.SMTP)
+ content.set_charset('utf-8')
+ original.attach(content)
+
+def _decrypted_message_from_bytes(bytestring, session_keys = None):
+ '''Detect and decrypt OpenPGP encrypted data in an email object. If this
+ succeeds, any mime messages found in the recovered plaintext
+ message are added to the returned message object.
+
+ :param session_keys: a list OpenPGP session keys
+ :returns: :class:`email.message.Message` possibly augmented with
+ decrypted data
+ '''
+ enc = email.message_from_bytes(bytestring, policy = email.policy.SMTP)
+
+ # make sure no one smuggles a token in (data from enc is untrusted)
+ del enc[X_SIGNATURE_VALID_HEADER]
+ del enc[X_SIGNATURE_MESSAGE_HEADER]
+
+ if enc.is_multipart():
+ p = _get_params(enc)
+
+ # handle OpenPGP signed data
+ if (enc.get_content_subtype() == 'signed' and
+ p.get('protocol') == _APP_PGP_SIG):
+ _handle_signatures(enc, enc, p)
+
+ # handle OpenPGP encrypted data
+ elif (enc.get_content_subtype() == 'encrypted' and
+ p.get('protocol') == _APP_PGP_ENC and
+ 'Version: 1' in enc.get_payload(0).get_payload()):
+ _handle_encrypted(enc, enc, session_keys)
+
+ # It is also possible to put either of the abov into a multipart/mixed
+ # segment
+ elif enc.get_content_subtype() == 'mixed':
+ sub = enc.get_payload(0)
+
+ if sub.is_multipart():
+ p = _get_params(sub)
+
+ if (sub.get_content_subtype() == 'signed' and
+ p.get('protocol') == _APP_PGP_SIG):
+ _handle_signatures(enc, sub, p)
+ elif (sub.get_content_subtype() == 'encrypted' and
+ p.get('protocol') == _APP_PGP_ENC):
+ _handle_encrypted(enc, sub, session_keys)
+
+ return enc
+
class _MessageHeaders:
_msg = None
@@ -147,8 +381,7 @@ class Message:
"Message file is no longer accessible:\n%s" % self.filename
try:
with open(self.filename, 'rb') as f:
- email = utils.decrypted_message_from_bytes(
- f.read(), session_keys)
+ email = _decrypted_message_from_bytes(f.read(), session_keys)
except IOError:
email = email.message_from_string(
warning, policy=email.policy.SMTP)