summaryrefslogtreecommitdiff
path: root/alot/db
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2020-03-05 08:05:29 +0100
committerAnton Khirnov <anton@khirnov.net>2020-03-05 08:05:29 +0100
commit618a235fa30a6a83aebd1f4bda526deb9e9b630a (patch)
tree90c353429daa995ec8f7cc88eba31457c1690d06 /alot/db
parentd98af2606e888f4c09c1deb13c6276556d896af5 (diff)
db/utils: move message decryption code into db/message
It is only called from there, so there is no reason to keep it elsewhere.
Diffstat (limited to 'alot/db')
-rw-r--r--alot/db/message.py237
-rw-r--r--alot/db/utils.py285
2 files changed, 235 insertions, 287 deletions
diff --git a/alot/db/message.py b/alot/db/message.py
index 7b7187e0..566a848b 100644
--- a/alot/db/message.py
+++ b/alot/db/message.py
@@ -12,11 +12,245 @@ from notmuch import NullPointerError
from . import utils
from .utils import extract_body
from .attachment import Attachment
+from .. import crypto
from .. import helper
+from ..errors import GPGProblem
from ..settings.const import settings
charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
+X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid'
+X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message'
+
+_APP_PGP_SIG = 'application/pgp-signature'
+_APP_PGP_ENC = 'application/pgp-encrypted'
+
+def _add_signature_headers(mail, sigs, error_msg):
+ '''Add pseudo headers to the mail indicating whether the signature
+ verification was successful.
+
+ :param mail: :class:`email.message.Message` the message to entitle
+ :param sigs: list of :class:`gpg.results.Signature`
+ :param error_msg: An error message if there is one, or None
+ :type error_msg: :class:`str` or `None`
+ '''
+ sig_from = ''
+ sig_known = True
+ uid_trusted = False
+
+ assert error_msg is None or isinstance(error_msg, str)
+
+ if not sigs:
+ error_msg = error_msg or 'no signature found'
+ elif not error_msg:
+ try:
+ key = crypto.get_key(sigs[0].fpr)
+ for uid in key.uids:
+ if crypto.check_uid_validity(key, uid.email):
+ sig_from = uid.uid
+ uid_trusted = True
+ break
+ else:
+ # No trusted uid found, since we did not break from the loop.
+ sig_from = key.uids[0].uid
+ except GPGProblem:
+ sig_from = sigs[0].fpr
+ sig_known = False
+
+ if error_msg:
+ msg = 'Invalid: {}'.format(error_msg)
+ elif uid_trusted:
+ msg = 'Valid: {}'.format(sig_from)
+ else:
+ msg = 'Untrusted: {}'.format(sig_from)
+
+ mail.add_header(X_SIGNATURE_VALID_HEADER,
+ 'False' if (error_msg or not sig_known) else 'True')
+ mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg)
+
+def _get_params(mail, failobj=None, header='content-type', unquote=True):
+ '''Get Content-Type parameters as dict.
+
+ RFC 2045 specifies that parameter names are case-insensitive, so
+ we normalize them here.
+
+ :param mail: :class:`email.message.Message`
+ :param failobj: object to return if no such header is found
+ :param header: the header to search for parameters, default
+ :param unquote: unquote the values
+ :returns: a `dict` containing the parameters
+ '''
+ failobj = failobj or []
+ return {k.lower(): v for k, v in mail.get_params(failobj, header, unquote)}
+
+def _handle_signatures(original, message, params):
+ """Shared code for handling message signatures.
+
+ RFC 3156 is quite strict:
+ * exactly two messages
+ * the second is of type 'application/pgp-signature'
+ * the second contains the detached signature
+
+ :param original: The original top-level mail. This is required to attache
+ special headers to
+ :type original: :class:`email.message.Message`
+ :param message: The multipart/signed payload to verify
+ :type message: :class:`email.message.Message`
+ :param params: the message parameters as returned by :func:`get_params`
+ :type params: dict[str, str]
+ """
+ malformed = None
+ if len(message.get_payload()) != 2:
+ malformed = 'expected exactly two messages, got {0}'.format(
+ len(message.get_payload()))
+ else:
+ ct = message.get_payload(1).get_content_type()
+ if ct != _APP_PGP_SIG:
+ malformed = 'expected Content-Type: {0}, got: {1}'.format(
+ _APP_PGP_SIG, ct)
+
+ # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message
+ # with 'PGP-'. maybe we should be more permissive here, or maybe not, this
+ # is crypto stuff...
+ if not params.get('micalg', 'nothing').startswith('pgp-'):
+ malformed = 'expected micalg=pgp-..., got: {0}'.format(
+ params.get('micalg', 'nothing'))
+
+ sigs = []
+ if not malformed:
+ try:
+ sigs = crypto.verify_detached(
+ message.get_payload(0).as_bytes(policy=email.policy.SMTP),
+ message.get_payload(1).get_payload(decode=True))
+ except GPGProblem as e:
+ malformed = str(e)
+
+ _add_signature_headers(original, sigs, malformed)
+
+
+def _handle_encrypted(original, message, session_keys=None):
+ """Handle encrypted messages helper.
+
+ RFC 3156 is quite strict:
+ * exactly two messages
+ * the first is of type 'application/pgp-encrypted'
+ * the first contains 'Version: 1'
+ * the second is of type 'application/octet-stream'
+ * the second contains the encrypted and possibly signed data
+
+ :param original: The original top-level mail. This is required to attache
+ special headers to
+ :type original: :class:`email.message.Message`
+ :param message: The multipart/signed payload to verify
+ :type message: :class:`email.message.Message`
+ :param session_keys: a list OpenPGP session keys
+ :type session_keys: [str]
+ """
+ malformed = False
+
+ ct = message.get_payload(0).get_content_type()
+ if ct != _APP_PGP_ENC:
+ malformed = 'expected Content-Type: {0}, got: {1}'.format(
+ _APP_PGP_ENC, ct)
+
+ want = 'application/octet-stream'
+ ct = message.get_payload(1).get_content_type()
+ if ct != want:
+ malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct)
+
+ if not malformed:
+ # This should be safe because PGP uses US-ASCII characters only
+ payload = message.get_payload(1).get_payload().encode('ascii')
+ try:
+ sigs, d = crypto.decrypt_verify(payload, session_keys)
+ except GPGProblem as e:
+ # signature verification failures end up here too if the combined
+ # method is used, currently this prevents the interpretation of the
+ # recovered plain text mail. maybe that's a feature.
+ malformed = str(e)
+ else:
+ n = _decrypted_message_from_bytes(d, session_keys)
+
+ # add the decrypted message to message. note that n contains all
+ # the attachments, no need to walk over n here.
+ original.attach(n)
+
+ original.defects.extend(n.defects)
+
+ # there are two methods for both signed and encrypted data, one is
+ # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the
+ # 'Combined method'.
+ if not sigs:
+ # 'RFC 1847 Encapsulation', the signature is a detached
+ # signature found in the recovered mime message of type
+ # multipart/signed.
+ if X_SIGNATURE_VALID_HEADER in n:
+ for k in (X_SIGNATURE_VALID_HEADER,
+ X_SIGNATURE_MESSAGE_HEADER):
+ original[k] = n[k]
+ else:
+ # 'Combined method', the signatures are returned by the
+ # decrypt_verify function.
+
+ # note that if we reached this point, we know the signatures
+ # are valid. if they were not valid, the else block of the
+ # current try would not have been executed
+ _add_signature_headers(original, sigs, '')
+
+ if malformed:
+ msg = 'Malformed OpenPGP message: {0}'.format(malformed)
+ content = email.message_from_string(msg,
+ _class=email.message.EmailMessage,
+ policy=email.policy.SMTP)
+ content.set_charset('utf-8')
+ original.attach(content)
+
+def _decrypted_message_from_bytes(bytestring, session_keys = None):
+ '''Detect and decrypt OpenPGP encrypted data in an email object. If this
+ succeeds, any mime messages found in the recovered plaintext
+ message are added to the returned message object.
+
+ :param session_keys: a list OpenPGP session keys
+ :returns: :class:`email.message.Message` possibly augmented with
+ decrypted data
+ '''
+ enc = email.message_from_bytes(bytestring, policy = email.policy.SMTP)
+
+ # make sure no one smuggles a token in (data from enc is untrusted)
+ del enc[X_SIGNATURE_VALID_HEADER]
+ del enc[X_SIGNATURE_MESSAGE_HEADER]
+
+ if enc.is_multipart():
+ p = _get_params(enc)
+
+ # handle OpenPGP signed data
+ if (enc.get_content_subtype() == 'signed' and
+ p.get('protocol') == _APP_PGP_SIG):
+ _handle_signatures(enc, enc, p)
+
+ # handle OpenPGP encrypted data
+ elif (enc.get_content_subtype() == 'encrypted' and
+ p.get('protocol') == _APP_PGP_ENC and
+ 'Version: 1' in enc.get_payload(0).get_payload()):
+ _handle_encrypted(enc, enc, session_keys)
+
+ # It is also possible to put either of the abov into a multipart/mixed
+ # segment
+ elif enc.get_content_subtype() == 'mixed':
+ sub = enc.get_payload(0)
+
+ if sub.is_multipart():
+ p = _get_params(sub)
+
+ if (sub.get_content_subtype() == 'signed' and
+ p.get('protocol') == _APP_PGP_SIG):
+ _handle_signatures(enc, sub, p)
+ elif (sub.get_content_subtype() == 'encrypted' and
+ p.get('protocol') == _APP_PGP_ENC):
+ _handle_encrypted(enc, sub, session_keys)
+
+ return enc
+
class _MessageHeaders:
_msg = None
@@ -147,8 +381,7 @@ class Message:
"Message file is no longer accessible:\n%s" % self.filename
try:
with open(self.filename, 'rb') as f:
- email = utils.decrypted_message_from_bytes(
- f.read(), session_keys)
+ email = _decrypted_message_from_bytes(f.read(), session_keys)
except IOError:
email = email.message_from_string(
warning, policy=email.policy.SMTP)
diff --git a/alot/db/utils.py b/alot/db/utils.py
index a81c5cc4..020f51d0 100644
--- a/alot/db/utils.py
+++ b/alot/db/utils.py
@@ -9,302 +9,17 @@ import email.charset as charset
import email.policy
import email.utils
import tempfile
-import re
import logging
import mailcap
-import io
-import base64
-import quopri
-from .. import crypto
from .. import helper
-from ..errors import GPGProblem
from ..settings.const import settings
from ..helper import string_sanitize
-from ..helper import string_decode
from ..helper import parse_mailcap_nametemplate
from ..helper import split_commandstring
charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
-X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid'
-X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message'
-
-_APP_PGP_SIG = 'application/pgp-signature'
-_APP_PGP_ENC = 'application/pgp-encrypted'
-
-
-def add_signature_headers(mail, sigs, error_msg):
- '''Add pseudo headers to the mail indicating whether the signature
- verification was successful.
-
- :param mail: :class:`email.message.Message` the message to entitle
- :param sigs: list of :class:`gpg.results.Signature`
- :param error_msg: An error message if there is one, or None
- :type error_msg: :class:`str` or `None`
- '''
- sig_from = ''
- sig_known = True
- uid_trusted = False
-
- assert error_msg is None or isinstance(error_msg, str)
-
- if not sigs:
- error_msg = error_msg or 'no signature found'
- elif not error_msg:
- try:
- key = crypto.get_key(sigs[0].fpr)
- for uid in key.uids:
- if crypto.check_uid_validity(key, uid.email):
- sig_from = uid.uid
- uid_trusted = True
- break
- else:
- # No trusted uid found, since we did not break from the loop.
- sig_from = key.uids[0].uid
- except GPGProblem:
- sig_from = sigs[0].fpr
- sig_known = False
-
- if error_msg:
- msg = 'Invalid: {}'.format(error_msg)
- elif uid_trusted:
- msg = 'Valid: {}'.format(sig_from)
- else:
- msg = 'Untrusted: {}'.format(sig_from)
-
- mail.add_header(X_SIGNATURE_VALID_HEADER,
- 'False' if (error_msg or not sig_known) else 'True')
- mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg)
-
-
-def get_params(mail, failobj=None, header='content-type', unquote=True):
- '''Get Content-Type parameters as dict.
-
- RFC 2045 specifies that parameter names are case-insensitive, so
- we normalize them here.
-
- :param mail: :class:`email.message.Message`
- :param failobj: object to return if no such header is found
- :param header: the header to search for parameters, default
- :param unquote: unquote the values
- :returns: a `dict` containing the parameters
- '''
- failobj = failobj or []
- return {k.lower(): v for k, v in mail.get_params(failobj, header, unquote)}
-
-
-def _handle_signatures(original, message, params):
- """Shared code for handling message signatures.
-
- RFC 3156 is quite strict:
- * exactly two messages
- * the second is of type 'application/pgp-signature'
- * the second contains the detached signature
-
- :param original: The original top-level mail. This is required to attache
- special headers to
- :type original: :class:`email.message.Message`
- :param message: The multipart/signed payload to verify
- :type message: :class:`email.message.Message`
- :param params: the message parameters as returned by :func:`get_params`
- :type params: dict[str, str]
- """
- malformed = None
- if len(message.get_payload()) != 2:
- malformed = 'expected exactly two messages, got {0}'.format(
- len(message.get_payload()))
- else:
- ct = message.get_payload(1).get_content_type()
- if ct != _APP_PGP_SIG:
- malformed = 'expected Content-Type: {0}, got: {1}'.format(
- _APP_PGP_SIG, ct)
-
- # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message
- # with 'PGP-'. maybe we should be more permissive here, or maybe not, this
- # is crypto stuff...
- if not params.get('micalg', 'nothing').startswith('pgp-'):
- malformed = 'expected micalg=pgp-..., got: {0}'.format(
- params.get('micalg', 'nothing'))
-
- sigs = []
- if not malformed:
- try:
- sigs = crypto.verify_detached(
- message.get_payload(0).as_bytes(policy=email.policy.SMTP),
- message.get_payload(1).get_payload(decode=True))
- except GPGProblem as e:
- malformed = str(e)
-
- add_signature_headers(original, sigs, malformed)
-
-
-def _handle_encrypted(original, message, session_keys=None):
- """Handle encrypted messages helper.
-
- RFC 3156 is quite strict:
- * exactly two messages
- * the first is of type 'application/pgp-encrypted'
- * the first contains 'Version: 1'
- * the second is of type 'application/octet-stream'
- * the second contains the encrypted and possibly signed data
-
- :param original: The original top-level mail. This is required to attache
- special headers to
- :type original: :class:`email.message.Message`
- :param message: The multipart/signed payload to verify
- :type message: :class:`email.message.Message`
- :param session_keys: a list OpenPGP session keys
- :type session_keys: [str]
- """
- malformed = False
-
- ct = message.get_payload(0).get_content_type()
- if ct != _APP_PGP_ENC:
- malformed = 'expected Content-Type: {0}, got: {1}'.format(
- _APP_PGP_ENC, ct)
-
- want = 'application/octet-stream'
- ct = message.get_payload(1).get_content_type()
- if ct != want:
- malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct)
-
- if not malformed:
- # This should be safe because PGP uses US-ASCII characters only
- payload = message.get_payload(1).get_payload().encode('ascii')
- try:
- sigs, d = crypto.decrypt_verify(payload, session_keys)
- except GPGProblem as e:
- # signature verification failures end up here too if the combined
- # method is used, currently this prevents the interpretation of the
- # recovered plain text mail. maybe that's a feature.
- malformed = str(e)
- else:
- n = decrypted_message_from_bytes(d, session_keys)
-
- # add the decrypted message to message. note that n contains all
- # the attachments, no need to walk over n here.
- original.attach(n)
-
- original.defects.extend(n.defects)
-
- # there are two methods for both signed and encrypted data, one is
- # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the
- # 'Combined method'.
- if not sigs:
- # 'RFC 1847 Encapsulation', the signature is a detached
- # signature found in the recovered mime message of type
- # multipart/signed.
- if X_SIGNATURE_VALID_HEADER in n:
- for k in (X_SIGNATURE_VALID_HEADER,
- X_SIGNATURE_MESSAGE_HEADER):
- original[k] = n[k]
- else:
- # 'Combined method', the signatures are returned by the
- # decrypt_verify function.
-
- # note that if we reached this point, we know the signatures
- # are valid. if they were not valid, the else block of the
- # current try would not have been executed
- add_signature_headers(original, sigs, '')
-
- if malformed:
- msg = 'Malformed OpenPGP message: {0}'.format(malformed)
- content = email.message_from_string(msg,
- _class=email.message.EmailMessage,
- policy=email.policy.SMTP)
- content.set_charset('utf-8')
- original.attach(content)
-
-
-def decrypted_message_from_file(handle, session_keys=None):
- '''Reads a mail from the given file-like object and returns an email
- object, very much like email.message_from_file. In addition to
- that OpenPGP encrypted data is detected and decrypted. If this
- succeeds, any mime messages found in the recovered plaintext
- message are added to the returned message object.
-
- :param handle: a file-like object
- :param session_keys: a list OpenPGP session keys
- :returns: :class:`email.message.Message` possibly augmented with
- decrypted data
- '''
- return decrypted_message_from_message(email.message_from_file(handle,
- _class=email.message.EmailMessage),
- session_keys)
-
-
-def decrypted_message_from_message(m, session_keys=None):
- '''Detect and decrypt OpenPGP encrypted data in an email object. If this
- succeeds, any mime messages found in the recovered plaintext
- message are added to the returned message object.
-
- :param m: an email object
- :param session_keys: a list OpenPGP session keys
- :returns: :class:`email.message.Message` possibly augmented with
- decrypted data
- '''
- # make sure no one smuggles a token in (data from m is untrusted)
- del m[X_SIGNATURE_VALID_HEADER]
- del m[X_SIGNATURE_MESSAGE_HEADER]
-
- if m.is_multipart():
- p = get_params(m)
-
- # handle OpenPGP signed data
- if (m.get_content_subtype() == 'signed' and
- p.get('protocol') == _APP_PGP_SIG):
- _handle_signatures(m, m, p)
-
- # handle OpenPGP encrypted data
- elif (m.get_content_subtype() == 'encrypted' and
- p.get('protocol') == _APP_PGP_ENC and
- 'Version: 1' in m.get_payload(0).get_payload()):
- _handle_encrypted(m, m, session_keys)
-
- # It is also possible to put either of the abov into a multipart/mixed
- # segment
- elif m.get_content_subtype() == 'mixed':
- sub = m.get_payload(0)
-
- if sub.is_multipart():
- p = get_params(sub)
-
- if (sub.get_content_subtype() == 'signed' and
- p.get('protocol') == _APP_PGP_SIG):
- _handle_signatures(m, sub, p)
- elif (sub.get_content_subtype() == 'encrypted' and
- p.get('protocol') == _APP_PGP_ENC):
- _handle_encrypted(m, sub, session_keys)
-
- return m
-
-
-def decrypted_message_from_string(s, session_keys=None):
- '''Reads a mail from the given string. This is the equivalent of
- :func:`email.message_from_string` which does nothing but to wrap
- the given string in a StringIO object and to call
- :func:`email.message_from_file`.
-
- Please refer to the documentation of :func:`message_from_file` for
- details.
-
- '''
- return decrypted_message_from_file(io.StringIO(s), session_keys)
-
-
-def decrypted_message_from_bytes(bytestring, session_keys=None):
- """Create a Message from bytes.
-
- :param bytes bytestring: an email message as raw bytes
- :param session_keys: a list OpenPGP session keys
- """
- return decrypted_message_from_message(
- email.message_from_bytes(bytestring,
- _class=email.message.EmailMessage,
- policy=email.policy.SMTP),
- session_keys)
-
def render_part(part, field_key='copiousoutput'):
"""
renders a non-multipart email part into displayable plaintext by piping its