summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDylan Baker <dylan@pnwbakers.com>2018-07-17 10:10:10 -0700
committerGitHub <noreply@github.com>2018-07-17 10:10:10 -0700
commit920043b3f7dc039c9e75aa62134cc662dfa94fda (patch)
tree0e81ae50580b86436f7f42c7f9b45d513679f295
parente4e82d3b81e50fc69fd4a929ff162f335b644ba0 (diff)
parentc51ad1c31b9ea0c9dd0b155b2fa1235eab8ddaa5 (diff)
Merge pull request #1266 from mjg/message-from-bytes
Message from bytes
-rw-r--r--alot/db/message.py2
-rw-r--r--alot/db/utils.py26
-rw-r--r--alot/helper.py32
-rwxr-xr-xsetup.py3
-rw-r--r--tests/db/utils_test.py46
5 files changed, 66 insertions, 43 deletions
diff --git a/alot/db/message.py b/alot/db/message.py
index 10115072..b53ec6c0 100644
--- a/alot/db/message.py
+++ b/alot/db/message.py
@@ -101,7 +101,7 @@ class Message(object):
if not self._email:
try:
with open(path, 'rb') as f:
- self._email = utils.message_from_bytes(f.read())
+ self._email = utils.decrypted_message_from_bytes(f.read())
except IOError:
self._email = email.message_from_string(warning)
return self._email
diff --git a/alot/db/utils.py b/alot/db/utils.py
index d7c56310..37ef4c48 100644
--- a/alot/db/utils.py
+++ b/alot/db/utils.py
@@ -179,7 +179,7 @@ def _handle_encrypted(original, message):
# recovered plain text mail. maybe that's a feature.
malformed = str(e)
else:
- n = message_from_bytes(d)
+ n = decrypted_message_from_bytes(d)
# add the decrypted message to message. note that n contains all
# the attachments, no need to walk over n here.
@@ -214,7 +214,7 @@ def _handle_encrypted(original, message):
original.attach(content)
-def message_from_file(handle):
+def decrypted_message_from_file(handle):
'''Reads a mail from the given file-like object and returns an email
object, very much like email.message_from_file. In addition to
that OpenPGP encrypted data is detected and decrypted. If this
@@ -225,8 +225,18 @@ def message_from_file(handle):
:returns: :class:`email.message.Message` possibly augmented with
decrypted data
'''
- m = email.message_from_file(handle)
+ return decrypted_message_from_message(email.message_from_file(handle))
+
+def decrypted_message_from_message(m):
+ '''Detect and decrypt OpenPGP encrypted data in an email object. If this
+ succeeds, any mime messages found in the recovered plaintext
+ message are added to the returned message object.
+
+ :param m: an email object
+ :returns: :class:`email.message.Message` possibly augmented with
+ decrypted data
+ '''
# make sure no one smuggles a token in (data from m is untrusted)
del m[X_SIGNATURE_VALID_HEADER]
del m[X_SIGNATURE_MESSAGE_HEADER]
@@ -263,7 +273,7 @@ def message_from_file(handle):
return m
-def message_from_string(s):
+def decrypted_message_from_string(s):
'''Reads a mail from the given string. This is the equivalent of
:func:`email.message_from_string` which does nothing but to wrap
the given string in a StringIO object and to call
@@ -273,17 +283,15 @@ def message_from_string(s):
details.
'''
- return message_from_file(io.StringIO(s))
+ return decrypted_message_from_file(io.StringIO(s))
-def message_from_bytes(bytestring):
+def decrypted_message_from_bytes(bytestring):
"""Create a Message from bytes.
- Attempt to guess the encoding of the bytestring.
-
:param bytes bytestring: an email message as raw bytes
"""
- return message_from_file(io.StringIO(helper.try_decode(bytestring)))
+ return decrypted_message_from_message(email.message_from_bytes(bytestring))
def extract_headers(mail, headers=None):
diff --git a/alot/helper.py b/alot/helper.py
index 70d375c8..d0c21a49 100644
--- a/alot/helper.py
+++ b/alot/helper.py
@@ -22,7 +22,6 @@ from email.mime.image import MIMEImage
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
-import chardet
import urwid
import magic
from twisted.internet import reactor
@@ -384,17 +383,34 @@ def guess_mimetype(blob):
def guess_encoding(blob):
- """Use chardet to guess the encoding of a given data blob
+ """
+ uses file magic to determine the encoding of the given data blob.
- :param blob: A blob of bytes
- :type blob: bytes
+ :param blob: file content as read by file.read()
+ :type blob: data
:returns: encoding
:rtype: str
"""
- info = chardet.detect(blob)
- logging.debug('Encoding %s with confidence %f',
- info['encoding'], info['confidence'])
- return info['encoding']
+ # this is a bit of a hack to support different versions of python magic.
+ # Hopefully at some point this will no longer be necessary
+ #
+ # the version with open() is the bindings shipped with the file source from
+ # http://darwinsys.com/file/ - this is what is used by the python-magic
+ # package on Debian/Ubuntu. However it is not available on pypi/via pip.
+ #
+ # the version with from_buffer() is available at
+ # https://github.com/ahupp/python-magic and directly installable via pip.
+ #
+ # for more detail see https://github.com/pazz/alot/pull/588
+ if hasattr(magic, 'open'):
+ m = magic.open(magic.MAGIC_MIME_ENCODING)
+ m.load()
+ return m.buffer(blob)
+ elif hasattr(magic, 'from_buffer'):
+ m = magic.Magic(mime_encoding=True)
+ return m.from_buffer(blob)
+ else:
+ raise Exception('Unknown magic API')
def try_decode(blob):
diff --git a/setup.py b/setup.py
index 538c1a0b..1909fee0 100755
--- a/setup.py
+++ b/setup.py
@@ -50,8 +50,7 @@ setup(
'twisted>=10.2.0',
'python-magic',
'configobj>=4.7.0',
- 'gpg',
- 'chardet',
+ 'gpg'
],
tests_require=[
'mock',
diff --git a/tests/db/utils_test.py b/tests/db/utils_test.py
index 9e411e3d..23885067 100644
--- a/tests/db/utils_test.py
+++ b/tests/db/utils_test.py
@@ -443,13 +443,13 @@ class TestMessageFromFile(TestCaseClassCleanup):
"""
m = email.message.Message()
m.add_header(utils.X_SIGNATURE_VALID_HEADER, 'Bad')
- message = utils.message_from_file(io.StringIO(m.as_string()))
+ message = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIs(message.get(utils.X_SIGNATURE_VALID_HEADER), None)
def test_erase_alot_header_message(self):
m = email.message.Message()
m.add_header(utils.X_SIGNATURE_MESSAGE_HEADER, 'Bad')
- message = utils.message_from_file(io.StringIO(m.as_string()))
+ message = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIs(message.get(utils.X_SIGNATURE_MESSAGE_HEADER), None)
def test_plain_mail(self):
@@ -457,7 +457,7 @@ class TestMessageFromFile(TestCaseClassCleanup):
m['Subject'] = 'test'
m['From'] = 'me'
m['To'] = 'Nobody'
- message = utils.message_from_file(io.StringIO(m.as_string()))
+ message = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertEqual(message.get_payload(), 'This is some text')
def _make_signed(self):
@@ -476,20 +476,20 @@ class TestMessageFromFile(TestCaseClassCleanup):
def test_signed_headers_included(self):
"""Headers are added to the message."""
m = self._make_signed()
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m)
self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
def test_signed_valid(self):
"""Test that the signature is valid."""
m = self._make_signed()
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertEqual(m[utils.X_SIGNATURE_VALID_HEADER], 'True')
def test_signed_correct_from(self):
"""Test that the signature is valid."""
m = self._make_signed()
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
# Don't test for valid/invalid since that might change
self.assertIn(
'ambig <ambig@example.com>', m[utils.X_SIGNATURE_MESSAGE_HEADER])
@@ -497,14 +497,14 @@ class TestMessageFromFile(TestCaseClassCleanup):
def test_signed_wrong_mimetype_second_payload(self):
m = self._make_signed()
m.get_payload(1).set_type('text/plain')
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('expected Content-Type: ',
m[utils.X_SIGNATURE_MESSAGE_HEADER])
def test_signed_wrong_micalg(self):
m = self._make_signed()
m.set_param('micalg', 'foo')
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('expected micalg=pgp-...',
m[utils.X_SIGNATURE_MESSAGE_HEADER])
@@ -526,7 +526,7 @@ class TestMessageFromFile(TestCaseClassCleanup):
"""
m = self._make_signed()
m.set_param('micalg', 'PGP-SHA1')
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('expected micalg=pgp-',
m[utils.X_SIGNATURE_MESSAGE_HEADER])
@@ -541,7 +541,7 @@ class TestMessageFromFile(TestCaseClassCleanup):
"""
m = self._make_signed()
m.attach(email.mime.text.MIMEText('foo'))
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('expected exactly two messages, got 3',
m[utils.X_SIGNATURE_MESSAGE_HEADER])
@@ -572,12 +572,12 @@ class TestMessageFromFile(TestCaseClassCleanup):
# of the mail, rather than replacing the whole encrypted payload with
# it's unencrypted equivalent
m = self._make_encrypted()
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertEqual(len(m.get_payload()), 3)
def test_encrypted_unsigned_is_decrypted(self):
m = self._make_encrypted()
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
# Check using m.walk, since we're not checking for ordering, just
# existence.
self.assertIn('This is some text', [n.get_payload() for n in m.walk()])
@@ -587,13 +587,13 @@ class TestMessageFromFile(TestCaseClassCleanup):
that there is a signature.
"""
m = self._make_encrypted()
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertNotIn(utils.X_SIGNATURE_VALID_HEADER, m)
self.assertNotIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
def test_encrypted_signed_is_decrypted(self):
m = self._make_encrypted(True)
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('This is some text', [n.get_payload() for n in m.walk()])
def test_encrypted_signed_headers(self):
@@ -601,7 +601,7 @@ class TestMessageFromFile(TestCaseClassCleanup):
there is a signature.
"""
m = self._make_encrypted(True)
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
self.assertIn(
'ambig <ambig@example.com>', m[utils.X_SIGNATURE_MESSAGE_HEADER])
@@ -611,14 +611,14 @@ class TestMessageFromFile(TestCaseClassCleanup):
def test_encrypted_wrong_mimetype_first_payload(self):
m = self._make_encrypted()
m.get_payload(0).set_type('text/plain')
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('Malformed OpenPGP message:',
m.get_payload(2).get_payload())
def test_encrypted_wrong_mimetype_second_payload(self):
m = self._make_encrypted()
m.get_payload(1).set_type('text/plain')
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('Malformed OpenPGP message:',
m.get_payload(2).get_payload())
@@ -628,7 +628,7 @@ class TestMessageFromFile(TestCaseClassCleanup):
"""
s = self._make_signed()
m = email.mime.multipart.MIMEMultipart('mixed', None, [s])
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m)
self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
@@ -638,7 +638,7 @@ class TestMessageFromFile(TestCaseClassCleanup):
"""
s = self._make_encrypted()
m = email.mime.multipart.MIMEMultipart('mixed', None, [s])
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('This is some text', [n.get_payload() for n in m.walk()])
self.assertNotIn(utils.X_SIGNATURE_VALID_HEADER, m)
self.assertNotIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
@@ -650,7 +650,7 @@ class TestMessageFromFile(TestCaseClassCleanup):
"""
s = self._make_encrypted(True)
m = email.mime.multipart.MIMEMultipart('mixed', None, [s])
- m = utils.message_from_file(io.StringIO(m.as_string()))
+ m = utils.decrypted_message_from_file(io.StringIO(m.as_string()))
self.assertIn('This is some text', [n.get_payload() for n in m.walk()])
self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m)
self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m)
@@ -781,10 +781,10 @@ class TestExtractBody(unittest.TestCase):
class TestMessageFromString(unittest.TestCase):
- """Tests for message_from_string.
+ """Tests for decrypted_message_from_string.
Because the implementation is that this is a wrapper around
- message_from_file, it's not important to have a large swath of tests, just
+ decrypted_message_from_file, it's not important to have a large swath of tests, just
enough to show that things are being passed correctly.
"""
@@ -793,5 +793,5 @@ class TestMessageFromString(unittest.TestCase):
m['Subject'] = 'test'
m['From'] = 'me'
m['To'] = 'Nobody'
- message = utils.message_from_string(m.as_string())
+ message = utils.decrypted_message_from_string(m.as_string())
self.assertEqual(message.get_payload(), 'This is some text')