summaryrefslogtreecommitdiff
path: root/tests/db/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/db/test_utils.py')
-rw-r--r--tests/db/test_utils.py774
1 files changed, 774 insertions, 0 deletions
diff --git a/tests/db/test_utils.py b/tests/db/test_utils.py
new file mode 100644
index 00000000..7d54741f
--- /dev/null
+++ b/tests/db/test_utils.py
@@ -0,0 +1,774 @@
+# encoding: utf-8
+# Copyright (C) 2017 Lucas Hoffmann
+# Copyright © 2017 Dylan Baker
+# This file is released under the GNU GPL, version 3 or a later revision.
+# For further details see the COPYING file
+import base64
+import codecs
+import email
+import email.header
+import email.mime.application
+import email.policy
+import io
+import os
+import os.path
+import shutil
+import tempfile
+import unittest
+
+import gpg
+import mock
+
+from alot import crypto
+from alot.db import utils
+from alot.errors import GPGProblem
+from ..utilities import make_key, make_uid, TestCaseClassCleanup
+
+
+class TestGetParams(unittest.TestCase):
+
+ mailstring = '\n'.join([
+ 'From: me',
+ 'To: you',
+ 'Subject: header field capitalisation',
+ 'Content-type: text/plain; charset=utf-8',
+ 'X-Header: param=one; and=two; or=three',
+ "X-Quoted: param=utf-8''%C3%9Cmlaut; second=plain%C3%9C",
+ 'X-UPPERCASE: PARAM1=ONE; PARAM2=TWO'
+ '\n',
+ 'content'
+ ])
+ mail = email.message_from_string(mailstring)
+
+ def test_returns_content_type_parameters_by_default(self):
+ actual = utils.get_params(self.mail)
+ expected = {'text/plain': '', 'charset': 'utf-8'}
+ self.assertDictEqual(actual, expected)
+
+ def test_can_return_params_of_any_header_field(self):
+ actual = utils.get_params(self.mail, header='x-header')
+ expected = {'param': 'one', 'and': 'two', 'or': 'three'}
+ self.assertDictEqual(actual, expected)
+
+ @unittest.expectedFailure
+ def test_parameters_are_decoded(self):
+ actual = utils.get_params(self.mail, header='x-quoted')
+ expected = {'param': 'Ümlaut', 'second': 'plain%C3%9C'}
+ self.assertDictEqual(actual, expected)
+
+ def test_parameters_names_are_converted_to_lowercase(self):
+ actual = utils.get_params(self.mail, header='x-uppercase')
+ expected = {'param1': 'ONE', 'param2': 'TWO'}
+ self.assertDictEqual(actual, expected)
+
+ def test_returns_empty_dict_if_header_not_present(self):
+ actual = utils.get_params(self.mail, header='x-header-not-present')
+ self.assertDictEqual(actual, dict())
+
+ def test_returns_failobj_if_header_not_present(self):
+ failobj = [('my special failobj for the test', 'needs to be a pair!')]
+ actual = utils.get_params(self.mail, header='x-header-not-present',
+ failobj=failobj)
+ expected = dict(failobj)
+ self.assertEqual(actual, expected)
+
+
+class TestIsSubdirOf(unittest.TestCase):
+
+ def test_both_paths_absolute_matching(self):
+ superpath = '/a/b'
+ subpath = '/a/b/c/d.rst'
+ result = utils.is_subdir_of(subpath, superpath)
+ self.assertTrue(result)
+
+ def test_both_paths_absolute_not_matching(self):
+ superpath = '/a/z'
+ subpath = '/a/b/c/d.rst'
+ result = utils.is_subdir_of(subpath, superpath)
+ self.assertFalse(result)
+
+ def test_both_paths_relative_matching(self):
+ superpath = 'a/b'
+ subpath = 'a/b/c/d.rst'
+ result = utils.is_subdir_of(subpath, superpath)
+ self.assertTrue(result)
+
+ def test_both_paths_relative_not_matching(self):
+ superpath = 'a/z'
+ subpath = 'a/b/c/d.rst'
+ result = utils.is_subdir_of(subpath, superpath)
+ self.assertFalse(result)
+
+ def test_relative_path_and_absolute_path_matching(self):
+ superpath = 'a/b'
+ subpath = os.path.join(os.getcwd(), 'a/b/c/d.rst')
+ result = utils.is_subdir_of(subpath, superpath)
+ self.assertTrue(result)
+
+
+class TestExtractHeader(unittest.TestCase):
+
+ mailstring = '\n'.join([
+ 'From: me',
+ 'To: you',
+ 'Subject: header field capitalisation',
+ 'Content-type: text/plain; charset=utf-8',
+ 'X-Header: param=one; and=two; or=three',
+ "X-Quoted: param=utf-8''%C3%9Cmlaut; second=plain%C3%9C",
+ 'X-UPPERCASE: PARAM1=ONE; PARAM2=TWO'
+ '\n',
+ 'content'
+ ])
+ mail = email.message_from_string(mailstring)
+
+ def test_default_arguments_yield_all_headers(self):
+ actual = utils.extract_headers(self.mail)
+ # collect all lines until the first empty line, hence all header lines
+ expected = []
+ for line in self.mailstring.splitlines():
+ if not line:
+ break
+ expected.append(line)
+ expected = u'\n'.join(expected) + u'\n'
+ self.assertEqual(actual, expected)
+
+ def test_single_headers_can_be_retrieved(self):
+ actual = utils.extract_headers(self.mail, ['from'])
+ expected = u'from: me\n'
+ self.assertEqual(actual, expected)
+
+ def test_multible_headers_can_be_retrieved_in_predevined_order(self):
+ headers = ['x-header', 'to', 'x-uppercase']
+ actual = utils.extract_headers(self.mail, headers)
+ expected = u'x-header: param=one; and=two; or=three\nto: you\n' \
+ u'x-uppercase: PARAM1=ONE; PARAM2=TWO\n'
+ self.assertEqual(actual, expected)
+
+ def test_headers_can_be_retrieved_multible_times(self):
+ headers = ['from', 'from']
+ actual = utils.extract_headers(self.mail, headers)
+ expected = u'from: me\nfrom: me\n'
+ self.assertEqual(actual, expected)
+
+ def test_case_is_prserved_in_header_keys_but_irelevant(self):
+ headers = ['FROM', 'from']
+ actual = utils.extract_headers(self.mail, headers)
+ expected = u'FROM: me\nfrom: me\n'
+ self.assertEqual(actual, expected)
+
+ @unittest.expectedFailure
+ def test_header_values_are_not_decoded(self):
+ actual = utils.extract_headers(self.mail, ['x-quoted'])
+ expected = u"x-quoted: param=utf-8''%C3%9Cmlaut; second=plain%C3%9C\n",
+ self.assertEqual(actual, expected)
+
+
+class TestDecodeHeader(unittest.TestCase):
+
+ @staticmethod
+ def _quote(unicode_string, encoding):
+ """Turn a unicode string into a RFC2047 quoted ascii string
+
+ :param unicode_string: the string to encode
+ :type unicode_string: unicode
+ :param encoding: the encoding to use, 'utf-8', 'iso-8859-1', ...
+ :type encoding: str
+ :returns: the encoded string
+ :rtype: str
+ """
+ string = unicode_string.encode(encoding)
+ output = b'=?' + encoding.encode('ascii') + b'?Q?'
+ for byte in string:
+ output += b'=' + codecs.encode(bytes([byte]), 'hex').upper()
+ return (output + b'?=').decode('ascii')
+
+ @staticmethod
+ def _base64(unicode_string, encoding):
+ """Turn a unicode string into a RFC2047 base64 encoded ascii string
+
+ :param unicode_string: the string to encode
+ :type unicode_string: unicode
+ :param encoding: the encoding to use, 'utf-8', 'iso-8859-1', ...
+ :type encoding: str
+ :returns: the encoded string
+ :rtype: str
+ """
+ string = unicode_string.encode(encoding)
+ b64 = base64.encodebytes(string).strip()
+ result_bytes = b'=?' + encoding.encode('utf-8') + b'?B?' + b64 + b'?='
+ result = result_bytes.decode('ascii')
+ return result
+
+ def _test(self, teststring, expected):
+ actual = utils.decode_header(teststring)
+ self.assertEqual(actual, expected)
+
+ def test_non_ascii_strings_are_returned_as_unicode_directly(self):
+ text = u'Nön ÄSCII string¡'
+ self._test(text, text)
+
+ def test_basic_utf_8_quoted(self):
+ expected = u'ÄÖÜäöü'
+ text = self._quote(expected, 'utf-8')
+ self._test(text, expected)
+
+ def test_basic_iso_8859_1_quoted(self):
+ expected = u'ÄÖÜäöü'
+ text = self._quote(expected, 'iso-8859-1')
+ self._test(text, expected)
+
+ def test_basic_windows_1252_quoted(self):
+ expected = u'ÄÖÜäöü'
+ text = self._quote(expected, 'windows-1252')
+ self._test(text, expected)
+
+ def test_basic_utf_8_base64(self):
+ expected = u'ÄÖÜäöü'
+ text = self._base64(expected, 'utf-8')
+ self._test(text, expected)
+
+ def test_basic_iso_8859_1_base64(self):
+ expected = u'ÄÖÜäöü'
+ text = self._base64(expected, 'iso-8859-1')
+ self._test(text, expected)
+
+ def test_basic_iso_1252_base64(self):
+ expected = u'ÄÖÜäöü'
+ text = self._base64(expected, 'windows-1252')
+ self._test(text, expected)
+
+ def test_quoted_words_can_be_interrupted(self):
+ part = u'ÄÖÜäöü'
+ text = self._base64(part, 'utf-8') + ' and ' + \
+ self._quote(part, 'utf-8')
+ expected = u'ÄÖÜäöü and ÄÖÜäöü'
+ self._test(text, expected)
+
+ def test_different_encodings_can_be_mixed(self):
+ part = u'ÄÖÜäöü'
+ text = 'utf-8: ' + self._base64(part, 'utf-8') + \
+ ' again: ' + self._quote(part, 'utf-8') + \
+ ' latin1: ' + self._base64(part, 'iso-8859-1') + \
+ ' and ' + self._quote(part, 'iso-8859-1')
+ expected = (
+ u'utf-8: ÄÖÜäöü '
+ u'again: ÄÖÜäöü '
+ u'latin1: ÄÖÜäöü and ÄÖÜäöü'
+ )
+ self._test(text, expected)
+
+ def test_tabs_are_expanded_to_align_with_eigth_spaces(self):
+ text = 'tab: \t'
+ expected = u'tab: '
+ self._test(text, expected)
+
+ def test_newlines_are_not_touched_by_default(self):
+ text = 'first\nsecond\n third\n fourth'
+ expected = u'first\nsecond\n third\n fourth'
+ self._test(text, expected)
+
+ def test_continuation_newlines_can_be_normalized(self):
+ text = 'first\nsecond\n third\n\tfourth\n \t fifth'
+ expected = u'first\nsecond third fourth fifth'
+ actual = utils.decode_header(text, normalize=True)
+ self.assertEqual(actual, expected)
+
+
+class TestAddSignatureHeaders(unittest.TestCase):
+
+ class FakeMail(object):
+ def __init__(self):
+ self.headers = []
+
+ def add_header(self, header, value):
+ self.headers.append((header, value))
+
+ def check(self, key, valid, error_msg=u''):
+ mail = self.FakeMail()
+
+ with mock.patch('alot.db.utils.crypto.get_key',
+ mock.Mock(return_value=key)), \
+ mock.patch('alot.db.utils.crypto.check_uid_validity',
+ mock.Mock(return_value=valid)):
+ utils.add_signature_headers(mail, [mock.Mock(fpr='')], error_msg)
+
+ return mail
+
+ def test_length_0(self):
+ mail = self.FakeMail()
+ utils.add_signature_headers(mail, [], u'')
+ self.assertIn((utils.X_SIGNATURE_VALID_HEADER, u'False'), mail.headers)
+ self.assertIn(
+ (utils.X_SIGNATURE_MESSAGE_HEADER, u'Invalid: no signature found'),
+ mail.headers)
+
+ def test_valid(self):
+ key = make_key()
+ mail = self.check(key, True)
+
+ self.assertIn((utils.X_SIGNATURE_VALID_HEADER, u'True'), mail.headers)
+ self.assertIn(
+ (utils.X_SIGNATURE_MESSAGE_HEADER, u'Valid: mocked'), mail.headers)
+
+ def test_untrusted(self):
+ key = make_key()
+ mail = self.check(key, False)
+
+ self.assertIn((utils.X_SIGNATURE_VALID_HEADER, u'True'), mail.headers)
+ self.assertIn(
+ (utils.X_SIGNATURE_MESSAGE_HEADER, u'Untrusted: mocked'),
+ mail.headers)
+
+ def test_unicode_as_bytes(self):
+ mail = self.FakeMail()
+ key = make_key()
+ key.uids = [make_uid('andreá@example.com', uid=u'Andreá')]
+ mail = self.check(key, True)
+
+ self.assertIn((utils.X_SIGNATURE_VALID_HEADER, u'True'), mail.headers)
+ self.assertIn(
+ (utils.X_SIGNATURE_MESSAGE_HEADER, u'Valid: Andreá'),
+ mail.headers)
+
+ def test_error_message_unicode(self):
+ mail = self.check(mock.Mock(), mock.Mock(), u'error message')
+ self.assertIn((utils.X_SIGNATURE_VALID_HEADER, u'False'), mail.headers)
+ self.assertIn(
+ (utils.X_SIGNATURE_MESSAGE_HEADER, u'Invalid: error message'),
+ mail.headers)
+
+ def test_get_key_fails(self):
+ mail = self.FakeMail()
+ with mock.patch('alot.db.utils.crypto.get_key',
+ mock.Mock(side_effect=GPGProblem(u'', 0))):
+ utils.add_signature_headers(mail, [mock.Mock(fpr='')], u'')
+ self.assertIn((utils.X_SIGNATURE_VALID_HEADER, u'False'), mail.headers)
+ self.assertIn(
+ (utils.X_SIGNATURE_MESSAGE_HEADER, u'Untrusted: '),
+ mail.headers)
+
+
+class TestMessageFromFile(TestCaseClassCleanup):
+
+ @classmethod
+ def setUpClass(cls):
+ home = tempfile.mkdtemp()
+ cls.addClassCleanup(shutil.rmtree, home)
+ mock_home = mock.patch.dict(os.environ, {'GNUPGHOME': home})
+ mock_home.start()
+ cls.addClassCleanup(mock_home.stop)
+
+ with gpg.core.Context() as ctx:
+ search_dir = os.path.join(os.path.dirname(__file__),
+ '../static/gpg-keys')
+ for each in os.listdir(search_dir):
+ if os.path.splitext(each)[1] == '.gpg':
+ with open(os.path.join(search_dir, each)) as f:
+ ctx.op_import(f)
+
+ cls.keys = [
+ ctx.get_key("DD19862809A7573A74058FF255937AFBB156245D")]
+
+ def test_erase_alot_header_signature_valid(self):
+ """Alot uses special headers for passing certain kinds of information,
+ it's important that information isn't passed in from the original
+ message as a way to trick the user.
+ """
+ m = email.message.Message()
+ m.add_header(utils.X_SIGNATURE_VALID_HEADER, 'Bad')
+ message = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIs(message.get(utils.X_SIGNATURE_VALID_HEADER), None)
+
+ def test_erase_alot_header_message(self):
+ m = email.message.Message()
+ m.add_header(utils.X_SIGNATURE_MESSAGE_HEADER, 'Bad')
+ message = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIs(message.get(utils.X_SIGNATURE_MESSAGE_HEADER), None)
+
+ def test_plain_mail(self):
+ m = email.mime.text.MIMEText(u'This is some text', 'plain', 'utf-8')
+ m['Subject'] = 'test'
+ m['From'] = 'me'
+ m['To'] = 'Nobody'
+ message = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertEqual(message.get_payload(), 'This is some text')
+
+ def _make_signed(self):
+ """Create a signed message that is multipart/signed."""
+ text = b'This is some text'
+ t = email.mime.text.MIMEText(text, 'plain', 'utf-8')
+ _, sig = crypto.detached_signature_for(
+ t.as_bytes(policy=email.policy.SMTP), self.keys)
+ s = email.mime.application.MIMEApplication(
+ sig, 'pgp-signature', email.encoders.encode_7or8bit)
+ m = email.mime.multipart.MIMEMultipart('signed', None, [t, s])
+ m.set_param('protocol', 'application/pgp-signature')
+ m.set_param('micalg', 'pgp-sha256')
+ return m
+
+ def test_signed_headers_included(self):
+ """Headers are added to the message."""
+ m = self._make_signed()
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m)
+ self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
+
+ def test_signed_valid(self):
+ """Test that the signature is valid."""
+ m = self._make_signed()
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertEqual(m[utils.X_SIGNATURE_VALID_HEADER], 'True')
+
+ def test_signed_correct_from(self):
+ """Test that the signature is valid."""
+ m = self._make_signed()
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ # Don't test for valid/invalid since that might change
+ self.assertIn(
+ 'ambig <ambig@example.com>', m[utils.X_SIGNATURE_MESSAGE_HEADER])
+
+ def test_signed_wrong_mimetype_second_payload(self):
+ m = self._make_signed()
+ m.get_payload(1).set_type('text/plain')
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('expected Content-Type: ',
+ m[utils.X_SIGNATURE_MESSAGE_HEADER])
+
+ def test_signed_wrong_micalg(self):
+ m = self._make_signed()
+ m.set_param('micalg', 'foo')
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('expected micalg=pgp-...',
+ m[utils.X_SIGNATURE_MESSAGE_HEADER])
+
+ def test_signed_micalg_cap(self):
+ """The micalg parameter should be normalized to lower case.
+
+ From RFC 3156 § 5
+
+ The "micalg" parameter for the "application/pgp-signature" protocol
+ MUST contain exactly one hash-symbol of the format "pgp-<hash-
+ identifier>", where <hash-identifier> identifies the Message
+ Integrity Check (MIC) algorithm used to generate the signature.
+ Hash-symbols are constructed from the text names registered in [1]
+ or according to the mechanism defined in that document by
+ converting the text name to lower case and prefixing it with the
+ four characters "pgp-".
+
+ The spec is pretty clear that this is supposed to be lower cased.
+ """
+ m = self._make_signed()
+ m.set_param('micalg', 'PGP-SHA1')
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('expected micalg=pgp-',
+ m[utils.X_SIGNATURE_MESSAGE_HEADER])
+
+ def test_signed_more_than_two_messages(self):
+ """Per the spec only 2 payloads may be encapsulated inside the
+ multipart/signed payload, while it might be nice to cover more than 2
+ payloads (Postel's law), it would introduce serious complexity
+ since we would also need to cover those payloads being misordered.
+ Since getting the right number of payloads and getting them in the
+ right order should be fairly easy to implement correctly enforcing that
+ there are only two payloads seems reasonable.
+ """
+ m = self._make_signed()
+ m.attach(email.mime.text.MIMEText('foo'))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('expected exactly two messages, got 3',
+ m[utils.X_SIGNATURE_MESSAGE_HEADER])
+
+ # TODO: The case of more than two payloads, or the payloads being out of
+ # order. Also for the encrypted case.
+
+ def _make_encrypted(self, signed=False):
+ """Create an encrypted (and optionally signed) message."""
+ if signed:
+ t = self._make_signed()
+ else:
+ text = b'This is some text'
+ t = email.mime.text.MIMEText(text, 'plain', 'utf-8')
+ enc = crypto.encrypt(t.as_bytes(policy=email.policy.SMTP), self.keys)
+ e = email.mime.application.MIMEApplication(
+ enc, 'octet-stream', email.encoders.encode_7or8bit)
+
+ f = email.mime.application.MIMEApplication(
+ b'Version: 1', 'pgp-encrypted', email.encoders.encode_7or8bit)
+
+ m = email.mime.multipart.MIMEMultipart('encrypted', None, [f, e])
+ m.set_param('protocol', 'application/pgp-encrypted')
+
+ return m
+
+ def test_encrypted_length(self):
+ # It seems string that we just attach the unsigned message to the end
+ # of the mail, rather than replacing the whole encrypted payload with
+ # it's unencrypted equivalent
+ m = self._make_encrypted()
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertEqual(len(m.get_payload()), 3)
+
+ def test_encrypted_unsigned_is_decrypted(self):
+ m = self._make_encrypted()
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ # Check using m.walk, since we're not checking for ordering, just
+ # existence.
+ self.assertIn('This is some text', [n.get_payload() for n in m.walk()])
+
+ def test_encrypted_unsigned_doesnt_add_signed_headers(self):
+ """Since the message isn't signed, it shouldn't have headers saying
+ that there is a signature.
+ """
+ m = self._make_encrypted()
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertNotIn(utils.X_SIGNATURE_VALID_HEADER, m)
+ self.assertNotIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
+
+ def test_encrypted_signed_is_decrypted(self):
+ m = self._make_encrypted(True)
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('This is some text', [n.get_payload() for n in m.walk()])
+
+ def test_encrypted_signed_headers(self):
+ """Since the message is signed, it should have headers saying that
+ there is a signature.
+ """
+ m = self._make_encrypted(True)
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
+ self.assertIn(
+ 'ambig <ambig@example.com>', m[utils.X_SIGNATURE_MESSAGE_HEADER])
+
+ # TODO: tests for the RFC 2440 style combined signed/encrypted blob
+
+ def test_encrypted_wrong_mimetype_first_payload(self):
+ m = self._make_encrypted()
+ m.get_payload(0).set_type('text/plain')
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('Malformed OpenPGP message:',
+ m.get_payload(2).get_payload())
+
+ def test_encrypted_wrong_mimetype_second_payload(self):
+ m = self._make_encrypted()
+ m.get_payload(1).set_type('text/plain')
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('Malformed OpenPGP message:',
+ m.get_payload(2).get_payload())
+
+ def test_signed_in_multipart_mixed(self):
+ """It is valid to encapsulate a multipart/signed payload inside a
+ multipart/mixed payload, verify that works.
+ """
+ s = self._make_signed()
+ m = email.mime.multipart.MIMEMultipart('mixed', None, [s])
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m)
+ self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
+
+ def test_encrypted_unsigned_in_multipart_mixed(self):
+ """It is valid to encapsulate a multipart/encrypted payload inside a
+ multipart/mixed payload, verify that works.
+ """
+ s = self._make_encrypted()
+ m = email.mime.multipart.MIMEMultipart('mixed', None, [s])
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('This is some text', [n.get_payload() for n in m.walk()])
+ self.assertNotIn(utils.X_SIGNATURE_VALID_HEADER, m)
+ self.assertNotIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
+
+ def test_encrypted_signed_in_multipart_mixed(self):
+ """It is valid to encapsulate a multipart/encrypted payload inside a
+ multipart/mixed payload, verify that works when the multipart/encrypted
+ contains a multipart/signed.
+ """
+ s = self._make_encrypted(True)
+ m = email.mime.multipart.MIMEMultipart('mixed', None, [s])
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
+ self.assertIn('This is some text', [n.get_payload() for n in m.walk()])
+ self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m)
+ self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
+
+
+class TestExtractBody(unittest.TestCase):
+
+ @staticmethod
+ def _set_basic_headers(mail):
+ mail['Subject'] = 'Test email'
+ mail['To'] = 'foo@example.com'
+ mail['From'] = 'bar@example.com'
+
+ def test_single_text_plain(self):
+ mail = email.mime.text.MIMEText('This is an email')
+ self._set_basic_headers(mail)
+ actual = utils.extract_body(mail)
+
+ expected = 'This is an email'
+
+ self.assertEqual(actual, expected)
+
+ def test_two_text_plain(self):
+ mail = email.mime.multipart.MIMEMultipart()
+ self._set_basic_headers(mail)
+ mail.attach(email.mime.text.MIMEText('This is an email'))
+ mail.attach(email.mime.text.MIMEText('This is a second part'))
+
+ actual = utils.extract_body(mail)
+ expected = 'This is an email\n\nThis is a second part'
+
+ self.assertEqual(actual, expected)
+
+ def test_text_plain_and_other(self):
+ mail = email.mime.multipart.MIMEMultipart()
+ self._set_basic_headers(mail)
+ mail.attach(email.mime.text.MIMEText('This is an email'))
+ mail.attach(email.mime.application.MIMEApplication(b'1'))
+
+ actual = utils.extract_body(mail)
+ expected = 'This is an email'
+
+ self.assertEqual(actual, expected)
+
+ def test_text_plain_with_attachment_text(self):
+ mail = email.mime.multipart.MIMEMultipart()
+ self._set_basic_headers(mail)
+ mail.attach(email.mime.text.MIMEText('This is an email'))
+ attachment = email.mime.text.MIMEText('this shouldnt be displayed')
+ attachment['Content-Disposition'] = 'attachment'
+ mail.attach(attachment)
+
+ actual = utils.extract_body(mail)
+ expected = 'This is an email'
+
+ self.assertEqual(actual, expected)
+
+ def _make_mixed_plain_html(self):
+ mail = email.mime.multipart.MIMEMultipart()
+ self._set_basic_headers(mail)
+ mail.attach(email.mime.text.MIMEText('This is an email'))
+ mail.attach(email.mime.text.MIMEText(
+ '<!DOCTYPE html><html><body>This is an html email</body></html>',
+ 'html'))
+ return mail
+
+ @mock.patch('alot.db.utils.settings.get', mock.Mock(return_value=True))
+ def test_prefer_plaintext(self):
+ expected = 'This is an email'
+ mail = self._make_mixed_plain_html()
+ actual = utils.extract_body(mail)
+
+ self.assertEqual(actual, expected)
+
+ # Mock the handler to cat, so that no transformations of the html are made
+ # making the result non-deterministic
+ @mock.patch('alot.db.utils.settings.get', mock.Mock(return_value=False))
+ @mock.patch('alot.db.utils.settings.mailcap_find_match',
+ mock.Mock(return_value=(None, {'view': 'cat'})))
+ def test_prefer_html(self):
+ expected = (
+ '<!DOCTYPE html><html><body>This is an html email</body></html>')
+ mail = self._make_mixed_plain_html()
+ actual = utils.extract_body(mail)
+
+ self.assertEqual(actual, expected)
+
+ @mock.patch('alot.db.utils.settings.get', mock.Mock(return_value=False))
+ @mock.patch('alot.db.utils.settings.mailcap_find_match',
+ mock.Mock(return_value=(None, {'view': 'cat'})))
+ def test_types_provided(self):
+ # This should not return html, even though html is set to preferred
+ # since a types variable is passed
+ expected = 'This is an email'
+ mail = self._make_mixed_plain_html()
+ actual = utils.extract_body(mail, types=['text/plain'])
+
+ self.assertEqual(actual, expected)
+
+ @mock.patch('alot.db.utils.settings.mailcap_find_match',
+ mock.Mock(return_value=(None, {'view': 'cat'})))
+ def test_require_mailcap_stdin(self):
+ mail = email.mime.multipart.MIMEMultipart()
+ self._set_basic_headers(mail)
+ mail.attach(email.mime.text.MIMEText(
+ '<!DOCTYPE html><html><body>This is an html email</body></html>',
+ 'html'))
+ actual = utils.extract_body(mail)
+ expected = (
+ '<!DOCTYPE html><html><body>This is an html email</body></html>')
+
+ self.assertEqual(actual, expected)
+
+ @mock.patch('alot.db.utils.settings.mailcap_find_match',
+ mock.Mock(return_value=(None, {'view': 'cat %s'})))
+ def test_require_mailcap_file(self):
+ mail = email.mime.multipart.MIMEMultipart()
+ self._set_basic_headers(mail)
+ mail.attach(email.mime.text.MIMEText(
+ '<!DOCTYPE html><html><body>This is an html email</body></html>',
+ 'html'))
+ actual = utils.extract_body(mail)
+ expected = (
+ '<!DOCTYPE html><html><body>This is an html email</body></html>')
+
+ self.assertEqual(actual, expected)
+
+ @unittest.expectedFailure
+ def test_simple_utf8_file(self):
+ mail = email.message_from_binary_file(
+ open('tests/static/mail/utf8.eml', 'rb'))
+ actual = utils.extract_body(mail)
+ expected = "Liebe Grüße!\n"
+ self.assertEqual(actual, expected)
+
+class TestMessageFromString(unittest.TestCase):
+
+ """Tests for decrypted_message_from_string.
+
+ Because the implementation is that this is a wrapper around
+ decrypted_message_from_file, it's not important to have a large swath of
+ tests, just enough to show that things are being passed correctly.
+ """
+
+ def test(self):
+ m = email.mime.text.MIMEText(u'This is some text', 'plain', 'utf-8')
+ m['Subject'] = 'test'
+ m['From'] = 'me'
+ m['To'] = 'Nobody'
+ message = utils.decrypted_message_from_string(m.as_string())
+ self.assertEqual(message.get_payload(), 'This is some text')
+
+
+class TestRemoveCte(unittest.TestCase):
+
+ def test_char_vs_cte_mismatch(self): # #1291
+ with open('tests/static/mail/broken-utf8.eml') as fp:
+ mail = email.message_from_file(fp)
+ # This should not raise an UnicodeDecodeError.
+ with self.assertLogs(level='DEBUG') as cm: # keep logs
+ utils.remove_cte(mail, as_string=True)
+ # We expect no Exceptions but a complaint in the log
+ logmsg = 'DEBUG:root:Decoding failure: \'utf-8\' codec can\'t decode '\
+ 'byte 0xa1 in position 14: invalid start byte'
+ self.assertIn(logmsg, cm.output)
+
+ def test_malformed_cte_value(self):
+ with open('tests/static/mail/malformed-header-CTE.eml') as fp:
+ mail = email.message_from_file(fp)
+
+ with self.assertLogs(level='INFO') as cm: # keep logs
+ utils.remove_cte(mail, as_string=True)
+
+ # We expect no Exceptions but a complaint in the log
+ logmsg = 'INFO:root:Unknown Content-Transfer-Encoding: "7bit;"'
+ self.assertEqual(cm.output, [logmsg])
+
+ def test_unknown_cte_value(self):
+ with open('tests/static/mail/malformed-header-CTE-2.eml') as fp:
+ mail = email.message_from_file(fp)
+
+ with self.assertLogs(level='DEBUG') as cm: # keep logs
+ utils.remove_cte(mail, as_string=True)
+
+ # We expect no Exceptions but a complaint in the log
+ logmsg = 'DEBUG:root:failed to interpret Content-Transfer-Encoding: '\
+ '"normal"'
+ self.assertIn(logmsg, cm.output)