From cd35ec5f89cff3ba8c7780209efa7e8b0628744d Mon Sep 17 00:00:00 2001 From: Anton Khirnov Date: Sat, 30 Jan 2021 14:12:33 +0100 Subject: db/envelope: move to a new module "mail" It has nothing to do with the database. --- alot/commands/envelope.py | 2 +- alot/commands/globals.py | 4 +- alot/commands/thread.py | 2 +- alot/db/envelope.py | 494 ---------------------------------------- alot/mail/envelope.py | 494 ++++++++++++++++++++++++++++++++++++++++ tests/commands/test_envelope.py | 2 +- tests/db/test_envelope.py | 113 --------- tests/mail/test_envelope.py | 113 +++++++++ 8 files changed, 612 insertions(+), 612 deletions(-) delete mode 100644 alot/db/envelope.py create mode 100644 alot/mail/envelope.py delete mode 100644 tests/db/test_envelope.py create mode 100644 tests/mail/test_envelope.py diff --git a/alot/commands/envelope.py b/alot/commands/envelope.py index 47e7463e..ad6edc3f 100644 --- a/alot/commands/envelope.py +++ b/alot/commands/envelope.py @@ -313,7 +313,7 @@ class EditCommand(Command): def __init__(self, envelope=None, spawn=None, refocus=True, **kwargs): """ :param envelope: email to edit - :type envelope: :class:`~alot.db.envelope.Envelope` + :type envelope: :class:`~alot.mail.envelope.Envelope` :param spawn: force spawning of editor in a new terminal :type spawn: bool :param refocus: m diff --git a/alot/commands/globals.py b/alot/commands/globals.py index b95d37b9..65e8973b 100644 --- a/alot/commands/globals.py +++ b/alot/commands/globals.py @@ -29,7 +29,7 @@ from ..completion.contacts import ContactsCompleter from ..completion.accounts import AccountCompleter from ..completion.tags import TagsCompleter from ..widgets.utils import DialogBox -from ..db.envelope import Envelope +from ..mail.envelope import Envelope from ..settings.const import settings from ..settings.errors import ConfigError, NoMatchingAccount from ..utils import argparse as cargparse @@ -673,7 +673,7 @@ class ComposeCommand(Command): **kwargs): """ :param envelope: use existing envelope - :type envelope: :class:`~alot.db.envelope.Envelope` + :type envelope: :class:`~alot.mail.envelope.Envelope` :param headers: forced header values :type headers: dict (str->str) :param template: name of template to parse into the envelope after diff --git a/alot/commands/thread.py b/alot/commands/thread.py index ff70289b..0a13b868 100644 --- a/alot/commands/thread.py +++ b/alot/commands/thread.py @@ -27,7 +27,7 @@ from .common import RetagPromptCommand from .envelope import SendCommand from ..completion.contacts import ContactsCompleter from ..completion.path import PathCompleter -from ..db.envelope import Envelope +from ..mail.envelope import Envelope from ..db.attachment import Attachment from ..db.errors import DatabaseROError from ..settings.const import settings diff --git a/alot/db/envelope.py b/alot/db/envelope.py deleted file mode 100644 index f9909308..00000000 --- a/alot/db/envelope.py +++ /dev/null @@ -1,494 +0,0 @@ -# Copyright (C) 2011-2012 Patrick Totzke -# This file is released under the GNU GPL, version 3 or a later revision. -# For further details see the COPYING file -import glob -import logging -import os -import re -import email -import email.policy -from email.encoders import encode_7or8bit -from email.message import MIMEPart -from email.mime.audio import MIMEAudio -from email.mime.base import MIMEBase -from email.mime.image import MIMEImage -from email.mime.text import MIMEText -import email.charset as charset -from urllib.parse import unquote - -import gpg -import magic - -from .attachment import Attachment -from .. import __version__ -from .. import helper -from .. import crypto -from ..settings.const import settings -from ..errors import GPGProblem, GPGCode - -charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') - -def _libmagic_version_at_least(version): - """ - checks if the libmagic library installed is more recent than a given - version. - - :param version: minimum version expected in the form XYY (i.e. 5.14 -> 514) - with XYY >= 513 - """ - if hasattr(magic, 'open'): - magic_wrapper = magic._libraries['magic'] - elif hasattr(magic, 'from_buffer'): - magic_wrapper = magic.libmagic - else: - raise Exception('Unknown magic API') - - if not hasattr(magic_wrapper, 'magic_version'): - # The magic_version function has been introduced in libmagic 5.13, - # if it's not present, we can't guess right, so let's assume False - return False - - # Depending on the libmagic/ctypes version, magic_version is a function or - # a callable: - if callable(magic_wrapper.magic_version): - return magic_wrapper.magic_version() >= version - - return magic_wrapper.magic_version >= version - -def _guess_encoding(blob): - """ - uses file magic to determine the encoding of the given data blob. - - :param blob: file content as read by file.read() - :type blob: data - :returns: encoding - :rtype: str - """ - # this is a bit of a hack to support different versions of python magic. - # Hopefully at some point this will no longer be necessary - # - # the version with open() is the bindings shipped with the file source from - # http://darwinsys.com/file/ - this is what is used by the python-magic - # package on Debian/Ubuntu. However it is not available on pypi/via pip. - # - # the version with from_buffer() is available at - # https://github.com/ahupp/python-magic and directly installable via pip. - # - # for more detail see https://github.com/pazz/alot/pull/588 - if hasattr(magic, 'open'): - m = magic.open(magic.MAGIC_MIME_ENCODING) - m.load() - return m.buffer(blob) - elif hasattr(magic, 'from_buffer'): - m = magic.Magic(mime_encoding=True) - return m.from_buffer(blob) - else: - raise Exception('Unknown magic API') - -# TODO: make this work on blobs, not paths -def _mimewrap(path, filename, ctype): - """Take the contents of the given path and wrap them into an email MIME - part according to the content type. The content type is auto detected from - the actual file contents and the file name if it is not given. - - :param path: the path to the file contents - :type path: str - :param filename: the file name to use in the generated MIME part - :type filename: str or None - :param ctype: the content type of the file contents in path - :type ctype: str or None - :returns: the message MIME part storing the data from path - :rtype: subclasses of email.mime.base.MIMEBase - """ - - with open(path, 'rb') as f: - content = f.read() - if not ctype: - ctype = helper.guess_mimetype(content) - # libmagic < 5.12 incorrectly detects excel/powerpoint files as - # 'application/msword' (see #179 and #186 in libmagic bugtracker) - # This is a workaround, based on file extension, useful as long - # as distributions still ship libmagic 5.11. - if (ctype == 'application/msword' and - not _libmagic_version_at_least(513)): - mimetype, _ = mimetypes.guess_type(path) - if mimetype: - ctype = mimetype - - maintype, subtype = ctype.split('/', 1) - if maintype == 'text': - part = MIMEText(content.decode(_guess_encoding(content), 'replace'), - _subtype=subtype, - _charset='utf-8') - elif maintype == 'image': - part = MIMEImage(content, _subtype=subtype) - elif maintype == 'audio': - part = MIMEAudio(content, _subtype=subtype) - else: - part = MIMEBase(maintype, subtype) - part.set_payload(content) - # Encode the payload using Base64 - email.encoders.encode_base64(part) - # Set the filename parameter - if not filename: - filename = os.path.basename(path) - part.add_header('Content-Disposition', 'attachment', - filename=filename) - return part - -class Envelope: - """ - a message that is not yet sent and still editable. - - It holds references to unencoded! body text and mail headers among other - things. Envelope implements the python container API for easy access of - header values. So `e['To']`, `e['To'] = 'foo@bar.baz'` and - 'e.get_all('To')' would work for an envelope `e`.. - """ - - headers = None - """ - dict containing the mail headers (a list of strings for each header key) - """ - body = None - """mail body as unicode string""" - tmpfile = None - """template text for initial content""" - attachments = None - """list of :class:`Attachments `""" - tags = None - """tags to add after successful sendout""" - account = None - """account to send from""" - - def __init__( - self, template=None, bodytext=None, headers=None, attachments=None, - sign=False, sign_key=None, encrypt=False, tags=None, replied=None, - passed=None, account=None): - """ - :param template: if not None, the envelope will be initialised by - :meth:`parsing ` this string before - setting any other values given to this constructor. - :type template: str - :param bodytext: text used as body part - :type bodytext: str - :param headers: unencoded header values - :type headers: dict (str -> [unicode]) - :param attachments: file attachments to include - :type attachments: list of :class:`~alot.db.attachment.Attachment` - :param tags: tags to add after successful sendout and saving this msg - :type tags: set of str - :param replied: message being replied to - :type replied: :class:`~alot.db.message.Message` - :param passed: message being passed on - :type replied: :class:`~alot.db.message.Message` - :param account: account to send from - :type account: :class:`Account` - """ - logging.debug('TEMPLATE: %s', template) - if template: - self.parse_template(template) - logging.debug('PARSED TEMPLATE: %s', template) - logging.debug('BODY: %s', self.body) - self.body = bodytext or '' - # TODO: if this was as collections.defaultdict a number of methods - # could be simplified. - self.headers = headers or {} - self.attachments = list(attachments) if attachments is not None else [] - self.sign = sign - self.sign_key = sign_key - self.encrypt = encrypt - self.encrypt_keys = {} - self.tags = tags or frozenset() # tags to add after successful sendout - self.replied = replied # message being replied to - self.passed = passed # message being passed on - self.sent_time = None - self.modified_since_sent = False - self.sending = False # semaphore to avoid accidental double sendout - self.account = account - - def __str__(self): - return "Envelope (%s)\n%s" % (self.headers, self.body) - - def __setitem__(self, name, val): - """setter for header values. This allows adding header like so: - envelope['Subject'] = 'sm\xf8rebr\xf8d' - """ - if name not in self.headers: - self.headers[name] = [] - self.headers[name].append(val) - - if self.sent_time: - self.modified_since_sent = True - - def __getitem__(self, name): - """getter for header values. - :raises: KeyError if undefined - """ - return self.headers[name][0] - - def __delitem__(self, name): - del self.headers[name] - - if self.sent_time: - self.modified_since_sent = True - - def __contains__(self, name): - return name in self.headers - - def get(self, key, fallback=None): - """secure getter for header values that allows specifying a `fallback` - return string (defaults to None). This returns the first matching value - and doesn't raise KeyErrors""" - if key in self.headers: - value = self.headers[key][0] - else: - value = fallback - return value - - def get_all(self, key, fallback=None): - """returns all header values for given key""" - if key in self.headers: - value = self.headers[key] - else: - value = fallback or [] - return value - - def add(self, key, value): - """add header value""" - if key not in self.headers: - self.headers[key] = [] - self.headers[key].append(value) - - if self.sent_time: - self.modified_since_sent = True - - def attach(self, attachment, filename=None, ctype=None): - """ - attach a file - - :param attachment: File to attach, given as - :class:`~alot.db.attachment.Attachment` object or path to a file. - :type attachment: :class:`~alot.db.attachment.Attachment` or str - :param filename: filename to use in content-disposition. - Will be ignored if `path` matches multiple files - :param ctype: force content-type to be used for this attachment - :type ctype: str - """ - - if isinstance(attachment, Attachment): - self.attachments.append(attachment) - elif isinstance(attachment, str): - path = os.path.expanduser(attachment) - part = _mimewrap(path, filename, ctype) - self.attachments.append(Attachment(part)) - else: - raise TypeError('attach accepts an Attachment or str') - - if self.sent_time: - self.modified_since_sent = True - - def construct_mail(self): - """ - compiles the information contained in this envelope into a - :class:`email.message.EmailMessage`. - """ - # make suire everything is 7-bit clean to avoid - # compatibility problems - # TODO: consider SMTPUTF8 support? - policy = email.policy.SMTP.clone(cte_type = '7bit') - - # we actually use MIMEPart instead of EmailMessage, to - # avoid the subparts getting spurious MIME-Version everywhere - mail = MIMEPart(policy = policy) - mail.set_content(self.body, subtype = 'plain', charset = 'utf-8') - - # add attachments - for a in self.attachments: - maintype, _, subtype = a.get_content_type().partition('/') - fname = a.get_filename() - data = a.get_data() - mail.add_attachment(data, filename = fname, - maintype = maintype, subtype = subtype) - - if self.sign: - to_sign = mail - plaintext = to_sign.as_bytes() - logging.debug('signing plaintext: %s', plaintext) - - try: - signatures, signature_blob = crypto.detached_signature_for( - plaintext, [self.sign_key]) - if len(signatures) != 1: - raise GPGProblem("Could not sign message (GPGME " - "did not return a signature)", - code=GPGCode.KEY_CANNOT_SIGN) - except gpg.errors.GPGMEError as e: - if e.getcode() == gpg.errors.BAD_PASSPHRASE: - # If GPG_AGENT_INFO is unset or empty, the user just does - # not have gpg-agent running (properly). - if os.environ.get('GPG_AGENT_INFO', '').strip() == '': - msg = "Got invalid passphrase and GPG_AGENT_INFO\ - not set. Please set up gpg-agent." - raise GPGProblem(msg, code=GPGCode.BAD_PASSPHRASE) - else: - raise GPGProblem("Bad passphrase. Is gpg-agent " - "running?", - code=GPGCode.BAD_PASSPHRASE) - raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_SIGN) - - signature_mime = MIMEPart(policy = to_sign.policy) - signature_mime.set_content(signature_blob, maintype = 'application', - subtype = 'pgp-signature') - signature_mime.set_param('name', 'signature.asc') - signature_mime['Content-Description'] = 'signature' - - # FIXME: this uses private methods, because - # python's "new" EmailMessage API does not - # allow arbitrary multipart constructs - mail = MIMEPart(policy = to_sign.policy) - mail._make_multipart('signed', (), None) - mail.set_param('protocol', 'application/pgp-signature') - mail.set_param('micalg', crypto.RFC3156_micalg_from_algo(signatures[0].hash_algo)) - mail.attach(to_sign) - mail.attach(signature_mime) - - if self.encrypt: - to_encrypt = mail - plaintext = to_encrypt.as_bytes() - logging.debug('encrypting plaintext: %s', plaintext) - - try: - encrypted_blob = crypto.encrypt( - plaintext, list(self.encrypt_keys.values())) - except gpg.errors.GPGMEError as e: - raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_ENCRYPT) - - version_str = b'Version: 1' - encryption_mime = MIMEPart(policy = to_encrypt.policy) - encryption_mime.set_content(version_str, maintype = 'application', - subtype = 'pgp-encrypted') - - encrypted_mime = MIMEPart(policy = to_encrypt.policy) - encrypted_mime.set_content(encrypted_blob, maintype = 'application', - subtype = 'octet-stream') - - mail = MIMEPart(policy = to_encrypt.policy) - mail._make_multipart('encrypted', (), None) - mail.set_param('protocol', 'application/pgp-encrypted') - mail.attach(encryption_mime) - mail.attach(encrypted_mime) - - headers = self.headers.copy() - - # add Date header - if 'Date' not in headers: - headers['Date'] = [email.utils.formatdate(localtime=True)] - - # add Message-ID - if 'Message-ID' not in headers: - domain = settings.get('message_id_domain') - headers['Message-ID'] = [email.utils.make_msgid(domain=domain)] - - if 'User-Agent' in headers: - uastring_format = headers['User-Agent'][0] - else: - uastring_format = settings.get('user_agent').strip() - uastring = uastring_format.format(version=__version__) - if uastring: - headers['User-Agent'] = [uastring] - - # copy headers from envelope to mail - for k, vlist in headers.items(): - for v in vlist: - mail.add_header(k, v) - - # as we are using MIMEPart instead of EmailMessage, set the - # MIME version manually - del mail['MIME-Version'] - mail['MIME-Version'] = '1.0' - - return mail - - def parse_template(self, raw, reset=False, only_body=False): - """parses a template or user edited string to fills this envelope. - - :param raw: the string to parse. - :type raw: str - :param reset: remove previous envelope content - :type reset: bool - :param only_body: do not parse headers - :type only_body: bool - """ - logging.debug('GoT: """\n%s\n"""', raw) - - if self.sent_time: - self.modified_since_sent = True - - if reset: - self.headers = {} - - headerEndPos = 0 - if not only_body: - # go through multiline, utf-8 encoded headers - # locally, lines are separated by a simple LF, not CRLF - # we decode the edited text ourselves here as - # email.message_from_file can't deal with raw utf8 header values - headerRe = re.compile(r'^(?P.+?):(?P(.|\n[ \t\r\f\v])+)$', - re.MULTILINE) - for header in headerRe.finditer(raw): - if header.start() > headerEndPos + 1: - break # switched to body - - key = header.group('k') - # simple unfolding as decribed in - # https://tools.ietf.org/html/rfc2822#section-2.2.3 - unfoldedValue = header.group('v').replace('\n', '') - self.add(key, unfoldedValue.strip()) - headerEndPos = header.end() - - # interpret 'Attach' pseudo header - if 'Attach' in self: - to_attach = [] - for line in self.get_all('Attach'): - gpath = os.path.expanduser(line.strip()) - to_attach += [g for g in glob.glob(gpath) - if os.path.isfile(g)] - logging.debug('Attaching: %s', to_attach) - for path in to_attach: - self.attach(path) - del self['Attach'] - - self.body = raw[headerEndPos:].strip() - - _MAILTO_PREFIX = 'mailto:' - _MAILTO_SAFE_HEADERS = ('Subject', 'Cc', 'Keywords') - - @classmethod - def from_mailto(cls, mailto): - if not mailto.startswith(cls._MAILTO_PREFIX): - raise ValueError('Invalid mailto string: %s' % mailto) - mailto = mailto[len(cls._MAILTO_PREFIX):] - - headers = {} - body = '' - - to, _, hfields = mailto.partition('?') - to = unquote(to) - if to: - headers['To'] = [to] - - for hfield in hfields.split('&'): - key, _, value = hfield.partition('=') - - key = key.capitalize() - value = unquote(value) - if not value: - continue - - if key == 'Body': - body = value - elif key in cls._MAILTO_SAFE_HEADERS: - headers[key] = [value] - - return cls(headers = headers, bodytext = body) diff --git a/alot/mail/envelope.py b/alot/mail/envelope.py new file mode 100644 index 00000000..5a798f19 --- /dev/null +++ b/alot/mail/envelope.py @@ -0,0 +1,494 @@ +# Copyright (C) 2011-2012 Patrick Totzke +# This file is released under the GNU GPL, version 3 or a later revision. +# For further details see the COPYING file +import glob +import logging +import os +import re +import email +import email.policy +from email.encoders import encode_7or8bit +from email.message import MIMEPart +from email.mime.audio import MIMEAudio +from email.mime.base import MIMEBase +from email.mime.image import MIMEImage +from email.mime.text import MIMEText +import email.charset as charset +from urllib.parse import unquote + +import gpg +import magic + +from ..db.attachment import Attachment +from .. import __version__ +from .. import helper +from .. import crypto +from ..settings.const import settings +from ..errors import GPGProblem, GPGCode + +charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') + +def _libmagic_version_at_least(version): + """ + checks if the libmagic library installed is more recent than a given + version. + + :param version: minimum version expected in the form XYY (i.e. 5.14 -> 514) + with XYY >= 513 + """ + if hasattr(magic, 'open'): + magic_wrapper = magic._libraries['magic'] + elif hasattr(magic, 'from_buffer'): + magic_wrapper = magic.libmagic + else: + raise Exception('Unknown magic API') + + if not hasattr(magic_wrapper, 'magic_version'): + # The magic_version function has been introduced in libmagic 5.13, + # if it's not present, we can't guess right, so let's assume False + return False + + # Depending on the libmagic/ctypes version, magic_version is a function or + # a callable: + if callable(magic_wrapper.magic_version): + return magic_wrapper.magic_version() >= version + + return magic_wrapper.magic_version >= version + +def _guess_encoding(blob): + """ + uses file magic to determine the encoding of the given data blob. + + :param blob: file content as read by file.read() + :type blob: data + :returns: encoding + :rtype: str + """ + # this is a bit of a hack to support different versions of python magic. + # Hopefully at some point this will no longer be necessary + # + # the version with open() is the bindings shipped with the file source from + # http://darwinsys.com/file/ - this is what is used by the python-magic + # package on Debian/Ubuntu. However it is not available on pypi/via pip. + # + # the version with from_buffer() is available at + # https://github.com/ahupp/python-magic and directly installable via pip. + # + # for more detail see https://github.com/pazz/alot/pull/588 + if hasattr(magic, 'open'): + m = magic.open(magic.MAGIC_MIME_ENCODING) + m.load() + return m.buffer(blob) + elif hasattr(magic, 'from_buffer'): + m = magic.Magic(mime_encoding=True) + return m.from_buffer(blob) + else: + raise Exception('Unknown magic API') + +# TODO: make this work on blobs, not paths +def _mimewrap(path, filename, ctype): + """Take the contents of the given path and wrap them into an email MIME + part according to the content type. The content type is auto detected from + the actual file contents and the file name if it is not given. + + :param path: the path to the file contents + :type path: str + :param filename: the file name to use in the generated MIME part + :type filename: str or None + :param ctype: the content type of the file contents in path + :type ctype: str or None + :returns: the message MIME part storing the data from path + :rtype: subclasses of email.mime.base.MIMEBase + """ + + with open(path, 'rb') as f: + content = f.read() + if not ctype: + ctype = helper.guess_mimetype(content) + # libmagic < 5.12 incorrectly detects excel/powerpoint files as + # 'application/msword' (see #179 and #186 in libmagic bugtracker) + # This is a workaround, based on file extension, useful as long + # as distributions still ship libmagic 5.11. + if (ctype == 'application/msword' and + not _libmagic_version_at_least(513)): + mimetype, _ = mimetypes.guess_type(path) + if mimetype: + ctype = mimetype + + maintype, subtype = ctype.split('/', 1) + if maintype == 'text': + part = MIMEText(content.decode(_guess_encoding(content), 'replace'), + _subtype=subtype, + _charset='utf-8') + elif maintype == 'image': + part = MIMEImage(content, _subtype=subtype) + elif maintype == 'audio': + part = MIMEAudio(content, _subtype=subtype) + else: + part = MIMEBase(maintype, subtype) + part.set_payload(content) + # Encode the payload using Base64 + email.encoders.encode_base64(part) + # Set the filename parameter + if not filename: + filename = os.path.basename(path) + part.add_header('Content-Disposition', 'attachment', + filename=filename) + return part + +class Envelope: + """ + a message that is not yet sent and still editable. + + It holds references to unencoded! body text and mail headers among other + things. Envelope implements the python container API for easy access of + header values. So `e['To']`, `e['To'] = 'foo@bar.baz'` and + 'e.get_all('To')' would work for an envelope `e`.. + """ + + headers = None + """ + dict containing the mail headers (a list of strings for each header key) + """ + body = None + """mail body as unicode string""" + tmpfile = None + """template text for initial content""" + attachments = None + """list of :class:`Attachments `""" + tags = None + """tags to add after successful sendout""" + account = None + """account to send from""" + + def __init__( + self, template=None, bodytext=None, headers=None, attachments=None, + sign=False, sign_key=None, encrypt=False, tags=None, replied=None, + passed=None, account=None): + """ + :param template: if not None, the envelope will be initialised by + :meth:`parsing ` this string before + setting any other values given to this constructor. + :type template: str + :param bodytext: text used as body part + :type bodytext: str + :param headers: unencoded header values + :type headers: dict (str -> [unicode]) + :param attachments: file attachments to include + :type attachments: list of :class:`~alot.db.attachment.Attachment` + :param tags: tags to add after successful sendout and saving this msg + :type tags: set of str + :param replied: message being replied to + :type replied: :class:`~alot.db.message.Message` + :param passed: message being passed on + :type replied: :class:`~alot.db.message.Message` + :param account: account to send from + :type account: :class:`Account` + """ + logging.debug('TEMPLATE: %s', template) + if template: + self.parse_template(template) + logging.debug('PARSED TEMPLATE: %s', template) + logging.debug('BODY: %s', self.body) + self.body = bodytext or '' + # TODO: if this was as collections.defaultdict a number of methods + # could be simplified. + self.headers = headers or {} + self.attachments = list(attachments) if attachments is not None else [] + self.sign = sign + self.sign_key = sign_key + self.encrypt = encrypt + self.encrypt_keys = {} + self.tags = tags or frozenset() # tags to add after successful sendout + self.replied = replied # message being replied to + self.passed = passed # message being passed on + self.sent_time = None + self.modified_since_sent = False + self.sending = False # semaphore to avoid accidental double sendout + self.account = account + + def __str__(self): + return "Envelope (%s)\n%s" % (self.headers, self.body) + + def __setitem__(self, name, val): + """setter for header values. This allows adding header like so: + envelope['Subject'] = 'sm\xf8rebr\xf8d' + """ + if name not in self.headers: + self.headers[name] = [] + self.headers[name].append(val) + + if self.sent_time: + self.modified_since_sent = True + + def __getitem__(self, name): + """getter for header values. + :raises: KeyError if undefined + """ + return self.headers[name][0] + + def __delitem__(self, name): + del self.headers[name] + + if self.sent_time: + self.modified_since_sent = True + + def __contains__(self, name): + return name in self.headers + + def get(self, key, fallback=None): + """secure getter for header values that allows specifying a `fallback` + return string (defaults to None). This returns the first matching value + and doesn't raise KeyErrors""" + if key in self.headers: + value = self.headers[key][0] + else: + value = fallback + return value + + def get_all(self, key, fallback=None): + """returns all header values for given key""" + if key in self.headers: + value = self.headers[key] + else: + value = fallback or [] + return value + + def add(self, key, value): + """add header value""" + if key not in self.headers: + self.headers[key] = [] + self.headers[key].append(value) + + if self.sent_time: + self.modified_since_sent = True + + def attach(self, attachment, filename=None, ctype=None): + """ + attach a file + + :param attachment: File to attach, given as + :class:`~alot.db.attachment.Attachment` object or path to a file. + :type attachment: :class:`~alot.db.attachment.Attachment` or str + :param filename: filename to use in content-disposition. + Will be ignored if `path` matches multiple files + :param ctype: force content-type to be used for this attachment + :type ctype: str + """ + + if isinstance(attachment, Attachment): + self.attachments.append(attachment) + elif isinstance(attachment, str): + path = os.path.expanduser(attachment) + part = _mimewrap(path, filename, ctype) + self.attachments.append(Attachment(part)) + else: + raise TypeError('attach accepts an Attachment or str') + + if self.sent_time: + self.modified_since_sent = True + + def construct_mail(self): + """ + compiles the information contained in this envelope into a + :class:`email.message.EmailMessage`. + """ + # make suire everything is 7-bit clean to avoid + # compatibility problems + # TODO: consider SMTPUTF8 support? + policy = email.policy.SMTP.clone(cte_type = '7bit') + + # we actually use MIMEPart instead of EmailMessage, to + # avoid the subparts getting spurious MIME-Version everywhere + mail = MIMEPart(policy = policy) + mail.set_content(self.body, subtype = 'plain', charset = 'utf-8') + + # add attachments + for a in self.attachments: + maintype, _, subtype = a.get_content_type().partition('/') + fname = a.get_filename() + data = a.get_data() + mail.add_attachment(data, filename = fname, + maintype = maintype, subtype = subtype) + + if self.sign: + to_sign = mail + plaintext = to_sign.as_bytes() + logging.debug('signing plaintext: %s', plaintext) + + try: + signatures, signature_blob = crypto.detached_signature_for( + plaintext, [self.sign_key]) + if len(signatures) != 1: + raise GPGProblem("Could not sign message (GPGME " + "did not return a signature)", + code=GPGCode.KEY_CANNOT_SIGN) + except gpg.errors.GPGMEError as e: + if e.getcode() == gpg.errors.BAD_PASSPHRASE: + # If GPG_AGENT_INFO is unset or empty, the user just does + # not have gpg-agent running (properly). + if os.environ.get('GPG_AGENT_INFO', '').strip() == '': + msg = "Got invalid passphrase and GPG_AGENT_INFO\ + not set. Please set up gpg-agent." + raise GPGProblem(msg, code=GPGCode.BAD_PASSPHRASE) + else: + raise GPGProblem("Bad passphrase. Is gpg-agent " + "running?", + code=GPGCode.BAD_PASSPHRASE) + raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_SIGN) + + signature_mime = MIMEPart(policy = to_sign.policy) + signature_mime.set_content(signature_blob, maintype = 'application', + subtype = 'pgp-signature') + signature_mime.set_param('name', 'signature.asc') + signature_mime['Content-Description'] = 'signature' + + # FIXME: this uses private methods, because + # python's "new" EmailMessage API does not + # allow arbitrary multipart constructs + mail = MIMEPart(policy = to_sign.policy) + mail._make_multipart('signed', (), None) + mail.set_param('protocol', 'application/pgp-signature') + mail.set_param('micalg', crypto.RFC3156_micalg_from_algo(signatures[0].hash_algo)) + mail.attach(to_sign) + mail.attach(signature_mime) + + if self.encrypt: + to_encrypt = mail + plaintext = to_encrypt.as_bytes() + logging.debug('encrypting plaintext: %s', plaintext) + + try: + encrypted_blob = crypto.encrypt( + plaintext, list(self.encrypt_keys.values())) + except gpg.errors.GPGMEError as e: + raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_ENCRYPT) + + version_str = b'Version: 1' + encryption_mime = MIMEPart(policy = to_encrypt.policy) + encryption_mime.set_content(version_str, maintype = 'application', + subtype = 'pgp-encrypted') + + encrypted_mime = MIMEPart(policy = to_encrypt.policy) + encrypted_mime.set_content(encrypted_blob, maintype = 'application', + subtype = 'octet-stream') + + mail = MIMEPart(policy = to_encrypt.policy) + mail._make_multipart('encrypted', (), None) + mail.set_param('protocol', 'application/pgp-encrypted') + mail.attach(encryption_mime) + mail.attach(encrypted_mime) + + headers = self.headers.copy() + + # add Date header + if 'Date' not in headers: + headers['Date'] = [email.utils.formatdate(localtime=True)] + + # add Message-ID + if 'Message-ID' not in headers: + domain = settings.get('message_id_domain') + headers['Message-ID'] = [email.utils.make_msgid(domain=domain)] + + if 'User-Agent' in headers: + uastring_format = headers['User-Agent'][0] + else: + uastring_format = settings.get('user_agent').strip() + uastring = uastring_format.format(version=__version__) + if uastring: + headers['User-Agent'] = [uastring] + + # copy headers from envelope to mail + for k, vlist in headers.items(): + for v in vlist: + mail.add_header(k, v) + + # as we are using MIMEPart instead of EmailMessage, set the + # MIME version manually + del mail['MIME-Version'] + mail['MIME-Version'] = '1.0' + + return mail + + def parse_template(self, raw, reset=False, only_body=False): + """parses a template or user edited string to fills this envelope. + + :param raw: the string to parse. + :type raw: str + :param reset: remove previous envelope content + :type reset: bool + :param only_body: do not parse headers + :type only_body: bool + """ + logging.debug('GoT: """\n%s\n"""', raw) + + if self.sent_time: + self.modified_since_sent = True + + if reset: + self.headers = {} + + headerEndPos = 0 + if not only_body: + # go through multiline, utf-8 encoded headers + # locally, lines are separated by a simple LF, not CRLF + # we decode the edited text ourselves here as + # email.message_from_file can't deal with raw utf8 header values + headerRe = re.compile(r'^(?P.+?):(?P(.|\n[ \t\r\f\v])+)$', + re.MULTILINE) + for header in headerRe.finditer(raw): + if header.start() > headerEndPos + 1: + break # switched to body + + key = header.group('k') + # simple unfolding as decribed in + # https://tools.ietf.org/html/rfc2822#section-2.2.3 + unfoldedValue = header.group('v').replace('\n', '') + self.add(key, unfoldedValue.strip()) + headerEndPos = header.end() + + # interpret 'Attach' pseudo header + if 'Attach' in self: + to_attach = [] + for line in self.get_all('Attach'): + gpath = os.path.expanduser(line.strip()) + to_attach += [g for g in glob.glob(gpath) + if os.path.isfile(g)] + logging.debug('Attaching: %s', to_attach) + for path in to_attach: + self.attach(path) + del self['Attach'] + + self.body = raw[headerEndPos:].strip() + + _MAILTO_PREFIX = 'mailto:' + _MAILTO_SAFE_HEADERS = ('Subject', 'Cc', 'Keywords') + + @classmethod + def from_mailto(cls, mailto): + if not mailto.startswith(cls._MAILTO_PREFIX): + raise ValueError('Invalid mailto string: %s' % mailto) + mailto = mailto[len(cls._MAILTO_PREFIX):] + + headers = {} + body = '' + + to, _, hfields = mailto.partition('?') + to = unquote(to) + if to: + headers['To'] = [to] + + for hfield in hfields.split('&'): + key, _, value = hfield.partition('=') + + key = key.capitalize() + value = unquote(value) + if not value: + continue + + if key == 'Body': + body = value + elif key in cls._MAILTO_SAFE_HEADERS: + headers[key] = [value] + + return cls(headers = headers, bodytext = body) diff --git a/tests/commands/test_envelope.py b/tests/commands/test_envelope.py index d14cbf1f..9531cb57 100644 --- a/tests/commands/test_envelope.py +++ b/tests/commands/test_envelope.py @@ -24,7 +24,7 @@ import unittest from unittest import mock from alot.commands import envelope -from alot.db.envelope import Envelope +from alot.mail.envelope import Envelope from alot.errors import GPGProblem from alot.settings.errors import NoMatchingAccount from alot.settings.manager import SettingsManager diff --git a/tests/db/test_envelope.py b/tests/db/test_envelope.py deleted file mode 100644 index 0d46ba43..00000000 --- a/tests/db/test_envelope.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright © 2017 Lucas Hoffmann -# Copyright © 2018 Dylan Baker -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import email -import email.policy -import os -import tempfile -import unittest -from unittest import mock - -from alot.db import envelope - -SETTINGS = { - 'user_agent': 'agent', -} - -class TestEnvelope(unittest.TestCase): - - def _compare_content(self, first, second): - c1 = first.get_content().replace('\r\n', '\n') - c2 = second.get_content().replace('\r\n', '\n') - self.assertEqual(c1, c2) - - def assertEmailEqual(self, first, second): - with self.subTest('body'): - self.assertEqual(first.is_multipart(), second.is_multipart()) - if not first.is_multipart(): - self._compare_content(first, second) - else: - for f, s in zip(first.walk(), second.walk()): - if f.is_multipart() or s.is_multipart(): - self.assertEqual(first.is_multipart(), - second.is_multipart()) - else: - self._compare_content(f, s) - with self.subTest('headers'): - self.assertListEqual(first.values(), second.values()) - - def test_setitem_stores_text_unchanged(self): - "Just ensure that the value is set and unchanged" - e = envelope.Envelope() - e['Subject'] = 'sm\xf8rebr\xf8d' - self.assertEqual(e['Subject'], 'sm\xf8rebr\xf8d') - - def _test_mail(self, envelope): - mail = envelope.construct_mail() - raw = mail.as_bytes() - actual = email.message_from_bytes(raw, policy = mail.policy) - self.assertEmailEqual(mail, actual) - - @mock.patch('alot.db.envelope.settings', SETTINGS) - def test_construct_mail_simple(self): - """Very simple envelope with a To, From, Subject, and body.""" - headers = { - 'From': 'foo@example.com', - 'To': 'bar@example.com', - 'Subject': 'Test email', - } - e = envelope.Envelope(headers={k: [v] for k, v in headers.items()}, - bodytext='Test') - self._test_mail(e) - - @mock.patch('alot.db.envelope.settings', SETTINGS) - def test_construct_mail_with_attachment(self): - """Very simple envelope with a To, From, Subject, body and attachment. - """ - headers = { - 'From': 'foo@example.com', - 'To': 'bar@example.com', - 'Subject': 'Test email', - } - e = envelope.Envelope(headers={k: [v] for k, v in headers.items()}, - bodytext='Test') - with tempfile.NamedTemporaryFile(mode='wt', delete=False) as f: - f.write('blah') - self.addCleanup(os.unlink, f.name) - e.attach(f.name) - - self._test_mail(e) - - @mock.patch('alot.db.envelope.settings', SETTINGS) - def test_parse_template(self): - """Tests multi-line header and body parsing""" - raw = ( - 'From: foo@example.com\n' - 'To: bar@example.com,\n' - ' baz@example.com\n' - 'Subject: Fwd: Test email\n' - '\n' - 'Some body content: which is not a header.\n' - ) - envlp = envelope.Envelope() - envlp.parse_template(raw) - self.assertDictEqual(envlp.headers, { - 'From': ['foo@example.com'], - 'To': ['bar@example.com, baz@example.com'], - 'Subject': ['Fwd: Test email'] - }) - self.assertEqual(envlp.body, - 'Some body content: which is not a header.') diff --git a/tests/mail/test_envelope.py b/tests/mail/test_envelope.py new file mode 100644 index 00000000..e3034b48 --- /dev/null +++ b/tests/mail/test_envelope.py @@ -0,0 +1,113 @@ +# Copyright © 2017 Lucas Hoffmann +# Copyright © 2018 Dylan Baker +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import email +import email.policy +import os +import tempfile +import unittest +from unittest import mock + +from alot.mail import envelope + +SETTINGS = { + 'user_agent': 'agent', +} + +class TestEnvelope(unittest.TestCase): + + def _compare_content(self, first, second): + c1 = first.get_content().replace('\r\n', '\n') + c2 = second.get_content().replace('\r\n', '\n') + self.assertEqual(c1, c2) + + def assertEmailEqual(self, first, second): + with self.subTest('body'): + self.assertEqual(first.is_multipart(), second.is_multipart()) + if not first.is_multipart(): + self._compare_content(first, second) + else: + for f, s in zip(first.walk(), second.walk()): + if f.is_multipart() or s.is_multipart(): + self.assertEqual(first.is_multipart(), + second.is_multipart()) + else: + self._compare_content(f, s) + with self.subTest('headers'): + self.assertListEqual(first.values(), second.values()) + + def test_setitem_stores_text_unchanged(self): + "Just ensure that the value is set and unchanged" + e = envelope.Envelope() + e['Subject'] = 'sm\xf8rebr\xf8d' + self.assertEqual(e['Subject'], 'sm\xf8rebr\xf8d') + + def _test_mail(self, envelope): + mail = envelope.construct_mail() + raw = mail.as_bytes() + actual = email.message_from_bytes(raw, policy = mail.policy) + self.assertEmailEqual(mail, actual) + + @mock.patch('alot.mail.envelope.settings', SETTINGS) + def test_construct_mail_simple(self): + """Very simple envelope with a To, From, Subject, and body.""" + headers = { + 'From': 'foo@example.com', + 'To': 'bar@example.com', + 'Subject': 'Test email', + } + e = envelope.Envelope(headers={k: [v] for k, v in headers.items()}, + bodytext='Test') + self._test_mail(e) + + @mock.patch('alot.mail.envelope.settings', SETTINGS) + def test_construct_mail_with_attachment(self): + """Very simple envelope with a To, From, Subject, body and attachment. + """ + headers = { + 'From': 'foo@example.com', + 'To': 'bar@example.com', + 'Subject': 'Test email', + } + e = envelope.Envelope(headers={k: [v] for k, v in headers.items()}, + bodytext='Test') + with tempfile.NamedTemporaryFile(mode='wt', delete=False) as f: + f.write('blah') + self.addCleanup(os.unlink, f.name) + e.attach(f.name) + + self._test_mail(e) + + @mock.patch('alot.mail.envelope.settings', SETTINGS) + def test_parse_template(self): + """Tests multi-line header and body parsing""" + raw = ( + 'From: foo@example.com\n' + 'To: bar@example.com,\n' + ' baz@example.com\n' + 'Subject: Fwd: Test email\n' + '\n' + 'Some body content: which is not a header.\n' + ) + envlp = envelope.Envelope() + envlp.parse_template(raw) + self.assertDictEqual(envlp.headers, { + 'From': ['foo@example.com'], + 'To': ['bar@example.com, baz@example.com'], + 'Subject': ['Fwd: Test email'] + }) + self.assertEqual(envlp.body, + 'Some body content: which is not a header.') -- cgit v1.2.3