summaryrefslogtreecommitdiff
path: root/alot/db
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2020-04-18 18:20:01 +0200
committerAnton Khirnov <anton@khirnov.net>2020-04-18 18:26:38 +0200
commit09eb3dd5f8465299e75fad5900614614e62bd537 (patch)
tree8dc4f6727394306233ca336cbfea5311d7cd75b2 /alot/db
parenta06b892704982f3bc65ee7d6176efa22c24a4fb4 (diff)
db/message: restructure message body handling
Instead of allowing the callers to access the email part directly, introduce a new class for representing the MIME tree structure. All interaction with the message content should now happen through this class (some instances of direct access still remain and will be removed later). Encrypted/signed parts are now also handled through this structure rather than using a fragile hack of attaching the decrypted message to the encrypted one and using fake headers to signal encryption/signatures. Message body rendering is now done by walking through the whole MIME tree and considering all the parts for rendering rather than picking one specific part.
Diffstat (limited to 'alot/db')
-rw-r--r--alot/db/message.py556
1 files changed, 234 insertions, 322 deletions
diff --git a/alot/db/message.py b/alot/db/message.py
index d9720167..1c7a9eb2 100644
--- a/alot/db/message.py
+++ b/alot/db/message.py
@@ -24,225 +24,22 @@ from ..settings.const import settings
charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
-X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid'
-X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message'
-
_APP_PGP_SIG = 'application/pgp-signature'
_APP_PGP_ENC = 'application/pgp-encrypted'
+_TEXT_PLAIN = 'text/plain'
-def _add_signature_headers(mail, sigs, error_msg):
- '''Add pseudo headers to the mail indicating whether the signature
- verification was successful.
-
- :param mail: :class:`email.message.Message` the message to entitle
- :param sigs: list of :class:`gpg.results.Signature`
- :param error_msg: An error message if there is one, or None
- :type error_msg: :class:`str` or `None`
- '''
- sig_from = ''
- sig_known = True
- uid_trusted = False
-
- assert error_msg is None or isinstance(error_msg, str)
-
- if not sigs:
- error_msg = error_msg or 'no signature found'
- elif not error_msg:
- try:
- key = crypto.get_key(sigs[0].fpr)
- for uid in key.uids:
- if crypto.check_uid_validity(key, uid.email):
- sig_from = uid.uid
- uid_trusted = True
- break
- else:
- # No trusted uid found, since we did not break from the loop.
- sig_from = key.uids[0].uid
- except GPGProblem:
- sig_from = sigs[0].fpr
- sig_known = False
-
- if error_msg:
- msg = 'Invalid: {}'.format(error_msg)
- elif uid_trusted:
- msg = 'Valid: {}'.format(sig_from)
- else:
- msg = 'Untrusted: {}'.format(sig_from)
-
- mail.add_header(X_SIGNATURE_VALID_HEADER,
- 'False' if (error_msg or not sig_known) else 'True')
- mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg)
-
-def _handle_signatures(original, message):
- """Shared code for handling message signatures.
-
- RFC 3156 is quite strict:
- * exactly two messages
- * the second is of type 'application/pgp-signature'
- * the second contains the detached signature
-
- :param original: The original top-level mail. This is required to attache
- special headers to
- :type original: :class:`email.message.Message`
- :param message: The multipart/signed payload to verify
- :type message: :class:`email.message.Message`
- """
- malformed = None
- payload = message.get_payload()
- if len(payload) != 2:
- malformed = 'expected exactly two messages, got {0}'.format(len(payload))
- else:
- ct = payload[1].get_content_type()
- if ct != _APP_PGP_SIG:
- malformed = 'expected Content-Type: {0}, got: {1}'.format(
- _APP_PGP_SIG, ct)
-
- # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message
- # with 'PGP-'. maybe we should be more permissive here, or maybe not, this
- # is crypto stuff...
- micalg = message.get_param('micalg', '')
- if not micalg.startswith('pgp-'):
- malformed = 'expected micalg=pgp-..., got: {0}'.format(micalg)
-
- sigs = []
- if not malformed:
- try:
- sigs = crypto.verify_detached(
- payload[0].as_bytes(policy=email.policy.SMTP),
- payload[1].get_payload(decode=True))
- except GPGProblem as e:
- malformed = str(e)
-
- _add_signature_headers(original, sigs, malformed)
-
-
-def _handle_encrypted(original, message, session_keys=None):
- """Handle encrypted messages helper.
-
- RFC 3156 is quite strict:
- * exactly two messages
- * the first is of type 'application/pgp-encrypted'
- * the first contains 'Version: 1'
- * the second is of type 'application/octet-stream'
- * the second contains the encrypted and possibly signed data
-
- :param original: The original top-level mail. This is required to attache
- special headers to
- :type original: :class:`email.message.Message`
- :param message: The multipart/signed payload to verify
- :type message: :class:`email.message.Message`
- :param session_keys: a list OpenPGP session keys
- :type session_keys: [str]
- """
- malformed = False
-
- ct = message.get_payload(0).get_content_type()
- if ct != _APP_PGP_ENC:
- malformed = 'expected Content-Type: {0}, got: {1}'.format(
- _APP_PGP_ENC, ct)
-
- want = 'application/octet-stream'
- ct = message.get_payload(1).get_content_type()
- if ct != want:
- malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct)
-
- if not malformed:
- # This should be safe because PGP uses US-ASCII characters only
- payload = message.get_payload(1).get_payload().encode('ascii')
- try:
- sigs, d = crypto.decrypt_verify(payload, session_keys)
- except GPGProblem as e:
- # signature verification failures end up here too if the combined
- # method is used, currently this prevents the interpretation of the
- # recovered plain text mail. maybe that's a feature.
- malformed = str(e)
- else:
- n = _decrypted_message_from_bytes(d, session_keys)
-
- # add the decrypted message to message. note that n contains all
- # the attachments, no need to walk over n here.
- original.attach(n)
-
- original.defects.extend(n.defects)
-
- # there are two methods for both signed and encrypted data, one is
- # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the
- # 'Combined method'.
- if not sigs:
- # 'RFC 1847 Encapsulation', the signature is a detached
- # signature found in the recovered mime message of type
- # multipart/signed.
- if X_SIGNATURE_VALID_HEADER in n:
- for k in (X_SIGNATURE_VALID_HEADER,
- X_SIGNATURE_MESSAGE_HEADER):
- original[k] = n[k]
- else:
- # 'Combined method', the signatures are returned by the
- # decrypt_verify function.
-
- # note that if we reached this point, we know the signatures
- # are valid. if they were not valid, the else block of the
- # current try would not have been executed
- _add_signature_headers(original, sigs, '')
-
- if malformed:
- msg = 'Malformed OpenPGP message: {0}'.format(malformed)
- content = email.message_from_string(msg,
- _class=email.message.EmailMessage,
- policy=email.policy.SMTP)
- content.set_charset('utf-8')
- original.attach(content)
-
-def _decrypted_message_from_bytes(bytestring, session_keys = None):
- '''Detect and decrypt OpenPGP encrypted data in an email object. If this
- succeeds, any mime messages found in the recovered plaintext
- message are added to the returned message object.
-
- :param session_keys: a list OpenPGP session keys
- :returns: :class:`email.message.Message` possibly augmented with
- decrypted data
- '''
- enc = email.message_from_bytes(bytestring, policy = email.policy.SMTP)
-
- # make sure no one smuggles a token in (data from enc is untrusted)
- del enc[X_SIGNATURE_VALID_HEADER]
- del enc[X_SIGNATURE_MESSAGE_HEADER]
-
- if enc.is_multipart():
- # handle OpenPGP signed data
- if (enc.get_content_subtype() == 'signed' and
- enc.get_param('protocol') == _APP_PGP_SIG):
- _handle_signatures(enc, enc)
-
- # handle OpenPGP encrypted data
- elif (enc.get_content_subtype() == 'encrypted' and
- enc.get_param('protocol') == _APP_PGP_ENC and
- 'Version: 1' in enc.get_payload(0).get_payload()):
- _handle_encrypted(enc, enc, session_keys)
-
- # It is also possible to put either of the abov into a multipart/mixed
- # segment
- elif enc.get_content_subtype() == 'mixed':
- sub = enc.get_payload(0)
-
- if sub.is_multipart():
- if (sub.get_content_subtype() == 'signed' and
- sub.get_param('protocol') == _APP_PGP_SIG):
- _handle_signatures(enc, sub)
- elif (sub.get_content_subtype() == 'encrypted' and
- sub.get_param('protocol') == _APP_PGP_ENC):
- _handle_encrypted(enc, sub, session_keys)
-
- return enc
-
-def render_part(part, field_key='copiousoutput'):
+def _render_part(part, field_key='copiousoutput'):
"""
renders a non-multipart email part into displayable plaintext by piping its
payload through an external script. The handler itself is determined by
the mailcap entry for this part's ctype.
"""
ctype = part.get_content_type()
- raw_payload = remove_cte(part)
+ raw_payload = part.get_content()
+
+ if isinstance(raw_payload, str):
+ raw_payload = raw_payload.encode('utf-8')
+
rendered_payload = None
# get mime handler
_, entry = settings.mailcap_find_match(ctype, key=field_key)
@@ -284,85 +81,210 @@ def render_part(part, field_key='copiousoutput'):
return rendered_payload
-def remove_cte(part, as_string=False):
- """Interpret MIME-part according to it's Content-Transfer-Encodings.
+class _MimeTree:
+ _part = None
- This returns the payload of `part` as string or bytestring for display, or
- to be passed to an external program. In the raw file the payload may be
- encoded, e.g. in base64, quoted-printable, 7bit, or 8bit. This method will
- look for one of the above Content-Transfer-Encoding header and interpret
- the payload accordingly.
+ is_signed = False
+ is_encrypted = False
+ sig_valid = None
+ sig_trusted = None
+ signer_id = None
+ crypt_error = None
- Incorrect header values (common in spam messages) will be interpreted as
- lenient as possible and will result in INFO-level debug messages.
+ is_alternative = False
- ..Note:: All this may be depricated in favour of
- `email.contentmanager.raw_data_manager` (v3.6+)
+ children = None
+ attachment = None
- :param email.message.EmailMessage part: The part to decode
- :param bool as_string: If true return a str, otherwise return bytes
- :returns: The mail with any Content-Transfer-Encoding removed
- :rtype: Union[str, bytes]
- """
- payload = part.get_payload(decode = True)
- if as_string:
- enc = part.get_content_charset('ascii')
- if enc.startswith('windows-'):
- enc = enc.replace('windows-', 'cp', 1)
+ def __init__(self, part, session_keys = None):
+ self._part = part
+
+ if part.is_multipart():
+ st = part.get_content_subtype()
+
+ # handle signed/encrypted messages
+ if st == 'signed' and part.get_param('protocol') == _APP_PGP_SIG:
+ return self._handle_signed_pgp(session_keys)
+ elif st == 'encrypted':
+ return self._handle_encrypted(session_keys)
+
+ # for all other cases, we just put all the sub-parts into children
+ # multipart/alternative are flagged as such
+ # everything else is treated as multipart/mixed
+ if st == 'alternative':
+ self.is_alternative = True
+
+ children = []
+ for part in part.iter_parts():
+ children.append(_MimeTree(part, session_keys))
+ self.children = children
+ else:
+ cd = part.get_content_disposition()
+ fn = part.get_filename()
+ if cd == 'attachment' or fn is not None:
+ self.attachment = Attachment(part)
+
+ def render_str(self, alt_preference = None):
+ if self.children is not None:
+ if self.is_alternative and len(self.children) > 0:
+ child = None
+ for ch in self.children:
+ if ch.content_type == alt_preference:
+ child = ch
+ break
+ if child is None:
+ child = self.children[0]
+ return child.render_str(alt_preference)
+
+ parts = []
+ for ch in self.children:
+ part = ch.render_str(alt_preference)
+ if part:
+ parts.append(part)
+ return '\n'.join(parts)
+
+ content = self._part.get_content()
+
+ # no processing for plaintext
+ # XXX we may want to process plaintext as well
+ if self.content_type == _TEXT_PLAIN:
+ return content
+
+ return _render_part(self._part)
+
+ @property
+ def raw_data(self):
+ return self._part.as_bytes()
+
+ @property
+ def content_type(self):
+ return self._part.get_content_type()
+ @property
+ def content_maintype(self):
+ return self._part.get_content_maintype()
+ @property
+ def content_subtype(self):
+ return self._part.get_content_subtype()
+ def _handle_signed_pgp(self, session_keys):
+ """
+ Handle PGP-signed data.
+
+ RFC 3156 is quite strict:
+ * exactly two messages
+ * the second is of type 'application/pgp-signature'
+ * the second contains the detached signature
+ """
+ self.is_signed = True
+ payload = list(self._part.iter_parts())
+
+ if len(payload) != 2:
+ self.crypt_error = 'expected exactly two messages, got %d' % len(payload)
+ return
+
+ self.children = [_MimeTree(payload[0], session_keys)]
+
+ ct = payload[1].get_content_type()
+ if ct != _APP_PGP_SIG:
+ self.crypt_error = 'expected Content-Type: {0}, got: {1}'.format(
+ _APP_PGP_SIG, ct)
+ return
+
+ # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message
+ # with 'PGP-'. maybe we should be more permissive here, or maybe not, this
+ # is crypto stuff...
+ micalg = self._part.get_param('micalg', '')
+ if not micalg.startswith('pgp-'):
+ self.crypt_error = 'expected micalg=pgp-..., got: {0}'.format(micalg)
+ return
+
+ part_data = payload[0].as_bytes()
+ sig_data = payload[1].get_content()
+
+ # verify the signature
+ sig = None
+ gpg_err = None
try:
- payload = payload.decode(enc, errors = 'backslashreplace')
- except LookupError:
- # enc is unknown;
- # fall back to guessing the correct encoding using libmagic
- payload = helper.try_decode(payload)
- except UnicodeDecodeError as emsg:
- # the mail contains chars that are not enc-encoded.
- # libmagic works better than just ignoring those
- logging.debug('Decoding failure: {}'.format(emsg))
- payload = helper.try_decode(payload)
-
- return payload
-
-MISSING_HTML_MSG = ("This message contains a text/html part that was not "
- "rendered due to a missing mailcap entry. "
- "Please refer to item 5 in our FAQ: "
- "http://alot.rtfd.io/en/latest/faq.html")
-
-def extract_body(mail):
- """Returns a string view of a Message.
-
- This consults :ref:`prefer_plaintext <prefer-plaintext>`
- to determine if a "text/plain" alternative is preferred over a "text/html"
- part.
-
- :param mail: the mail to use
- :type mail: :class:`email.message.EmailMessage`
- :returns: The combined text of any parts to be used
- :rtype: str
- """
+ sigs = crypto.verify_detached(part_data, sig_data)
+ self.sig_valid = True
+ if len(sigs) == 1:
+ sig = sigs[0]
+ else:
+ gpg_err = 'exactly one signature expected'
+ except GPGProblem as e:
+ self.sig_valid = False
+ gpg_err = str(e)
- if settings.get('prefer_plaintext'):
- preferencelist = ('plain', 'html')
- else:
- preferencelist = ('html', 'plain')
+ # get the signer
+ if sig is not None:
+ self.signer_id, self.sig_trusted = self._sig_check_key(sig.fpr)
- body_part = mail.get_body(preferencelist)
- if body_part is None: # if no part matching preferredlist was found
- return ""
+ if gpg_err:
+ self.crypt_error = gpg_err
- displaystring = ""
+ def _sig_check_key(self, fingerprint):
+ try:
+ key = crypto.get_key(fingerprint)
+ for uid in key.uids:
+ if crypto.check_uid_validity(key, uid.email):
+ return uid.uid, True
- if body_part.get_content_type() == 'text/plain':
- displaystring = string_sanitize(remove_cte(body_part, as_string=True))
- else:
- rendered_payload = render_part(body_part)
- if rendered_payload: # handler had output
- displaystring = string_sanitize(rendered_payload)
- else:
- if body_part.get_content_type() == 'text/html':
- displaystring = MISSING_HTML_MSG
- return displaystring
+ # No trusted uid found
+ sig_from = key.uids[0].uid
+ except GPGProblem:
+ sig_from = fingerprint
+
+ return sig_from, False
+
+ def _handle_encrypted(self, session_keys):
+ """
+ Handle encrypted messages.
+
+ RFC 3156 is quite strict:
+ * exactly two parts
+ * the first is of type 'application/pgp-encrypted'
+ * the first contains 'Version: 1'
+ * the second is of type 'application/octet-stream'
+ * the second contains the encrypted and possibly signed data
+ """
+ self.is_encrypted = True
+ payload = list(self._part.iter_parts())
+
+ if len(payload) != 2:
+ self.crypt_error = 'expected exactly two messages, got %d' % len(payload)
+ return
+
+ part_control = payload[0]
+ part_body = payload[1]
+
+ ct = part_control.get_content_type()
+ if ct != _APP_PGP_ENC:
+ self.crypt_error = 'expected Content-Type: %s, got: %s' % (_APP_PGP_ENC, ct)
+ return
+
+ want = 'application/octet-stream'
+ ct = part_body.get_content_type()
+ if ct != want:
+ self.crypt_error = 'expected Content-Type: %s, got: %s' % (want, ct)
+ return
+
+ payload = part_body.get_content()
+ try:
+ sigs, d = crypto.decrypt_verify(payload, session_keys)
+ except GPGProblem as e:
+ # signature verification failures end up here too if the combined
+ # method is used, currently this prevents the interpretation of the
+ # recovered plain text mail. maybe that's a feature.
+ self.crypt_error = 'Failed to decrypt message: %s' % str(e)
+
+ child = email.message_from_bytes(d, policy = email.policy.SMTP)
+ self.children = [_MimeTree(child, session_keys)]
+
+ if sigs:
+ self.is_signed = True
+ self.sig_valid = True
+ if len(sigs) == 1:
+ self.signer_id, self.sig_trusted = self._sig_check_key(sigs[0].fpr)
class _MessageHeaders:
_msg = None
@@ -420,6 +342,11 @@ class Message:
"""
headers = None
+ """
+ A MimeTree object containing the body of the message.
+ """
+ body = None
+
def __init__(self, dbman, thread, msg, depth):
"""
:param dbman: db manager that is used for further lookups
@@ -461,8 +388,8 @@ class Message:
self._email = self._load_email(session_keys)
self.headers = _MessageHeaders(self._email)
+ self.body = _MimeTree(self._email, session_keys)
- self._attachments = None # will be read upon first use
self._tags = set(msg.get_tags())
sender = self._email.get('From')
@@ -502,7 +429,7 @@ class Message:
b"Message file is no longer accessible:\n%s" % self.filename
try:
with open(self.filename, 'rb') as f:
- mail = _decrypted_message_from_bytes(f.read(), session_keys)
+ mail = email.message_from_bytes(f.read(), policy = email.policy.SMTP)
except IOError:
mail = email.message_from_string(
warning, policy=email.policy.SMTP)
@@ -516,12 +443,6 @@ class Message:
"""returns :class:`email.email.EmailMessage` for this message"""
return self._email
- def get_message_parts(self):
- """yield all body parts of this message"""
- for msg in self._email.walk():
- if not msg.is_multipart():
- yield msg
-
def get_tags(self):
"""returns tags attached to this message as list of strings"""
return sorted(self._tags)
@@ -599,46 +520,37 @@ class Message:
self._dbman.untag('id:' + self.id, tags, myafterwards)
- def get_attachments(self):
+ def iter_attachments(self):
+ """
+ Iterate over all the attachments in this message.
"""
- returns messages attachments
+ def tree_walk(mime_tree):
+ if mime_tree.attachment:
+ yield mime_tree.attachment
- Derived from the leaves of the email mime tree
- that and are not part of :rfc:`2015` syntax for encrypted/signed mails
- and either have :mailheader:`Content-Disposition` `attachment`
- or have :mailheader:`Content-Disposition` `inline` but specify
- a filename (as parameter to `Content-Disposition`).
+ if mime_tree.children is not None:
+ for ch in mime_tree.children:
+ yield from tree_walk(ch)
- :rtype: list of :class:`Attachment`
- """
- if not self._attachments:
- self._attachments = []
- for part in self.get_message_parts():
- cd = part.get('Content-Disposition', '')
- filename = part.get_filename()
- ct = part.get_content_type()
- # replace underspecified mime description by a better guess
- if ct in ['octet/stream', 'application/octet-stream']:
- content = part.get_payload(decode=True)
- ct = helper.guess_mimetype(content)
- if (self._attachments and
- self._attachments[-1].get_content_type() ==
- 'application/pgp-encrypted'):
- self._attachments.pop()
-
- if cd.lower().startswith('attachment'):
- if ct.lower() not in ['application/pgp-signature']:
- self._attachments.append(Attachment(part))
- elif cd.lower().startswith('inline'):
- if (filename is not None and
- ct.lower() != 'application/pgp'):
- self._attachments.append(Attachment(part))
- return self._attachments
+ yield from tree_walk(self.body)
def get_body_text(self):
- """ returns bodystring extracted from this mail """
- # TODO: allow toggle commands to decide which part is considered body
- return extract_body(self._email)
+ """
+ Returns a string view of a Message.
+
+ This consults :ref:`prefer_plaintext <prefer-plaintext>`
+ to determine if a "text/plain" alternative is preferred over a "text/html"
+ part.
+
+ :returns: The combined text of any parts to be used
+ :rtype: str
+ """
+ if settings.get('prefer_plaintext'):
+ alt_preference = 'text/plain'
+ else:
+ alt_preference = 'text/html'
+
+ return self.body.render_str(alt_preference)
def matches(self, querystring):
"""tests if this messages is in the resultset for `querystring`"""