summaryrefslogtreecommitdiff
path: root/alot/mail
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2021-01-30 14:12:33 +0100
committerAnton Khirnov <anton@khirnov.net>2021-01-30 14:12:33 +0100
commitcd35ec5f89cff3ba8c7780209efa7e8b0628744d (patch)
treea5f21a0b08ab2ce8a182f6cce7283f15d8f14dea /alot/mail
parent5dfe5a2831adbe3ec129d2ede1d9039739e98b71 (diff)
db/envelope: move to a new module "mail"
It has nothing to do with the database.
Diffstat (limited to 'alot/mail')
-rw-r--r--alot/mail/envelope.py494
1 files changed, 494 insertions, 0 deletions
diff --git a/alot/mail/envelope.py b/alot/mail/envelope.py
new file mode 100644
index 00000000..5a798f19
--- /dev/null
+++ b/alot/mail/envelope.py
@@ -0,0 +1,494 @@
+# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
+# This file is released under the GNU GPL, version 3 or a later revision.
+# For further details see the COPYING file
+import glob
+import logging
+import os
+import re
+import email
+import email.policy
+from email.encoders import encode_7or8bit
+from email.message import MIMEPart
+from email.mime.audio import MIMEAudio
+from email.mime.base import MIMEBase
+from email.mime.image import MIMEImage
+from email.mime.text import MIMEText
+import email.charset as charset
+from urllib.parse import unquote
+
+import gpg
+import magic
+
+from ..db.attachment import Attachment
+from .. import __version__
+from .. import helper
+from .. import crypto
+from ..settings.const import settings
+from ..errors import GPGProblem, GPGCode
+
+charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
+
+def _libmagic_version_at_least(version):
+ """
+ checks if the libmagic library installed is more recent than a given
+ version.
+
+ :param version: minimum version expected in the form XYY (i.e. 5.14 -> 514)
+ with XYY >= 513
+ """
+ if hasattr(magic, 'open'):
+ magic_wrapper = magic._libraries['magic']
+ elif hasattr(magic, 'from_buffer'):
+ magic_wrapper = magic.libmagic
+ else:
+ raise Exception('Unknown magic API')
+
+ if not hasattr(magic_wrapper, 'magic_version'):
+ # The magic_version function has been introduced in libmagic 5.13,
+ # if it's not present, we can't guess right, so let's assume False
+ return False
+
+ # Depending on the libmagic/ctypes version, magic_version is a function or
+ # a callable:
+ if callable(magic_wrapper.magic_version):
+ return magic_wrapper.magic_version() >= version
+
+ return magic_wrapper.magic_version >= version
+
+def _guess_encoding(blob):
+ """
+ uses file magic to determine the encoding of the given data blob.
+
+ :param blob: file content as read by file.read()
+ :type blob: data
+ :returns: encoding
+ :rtype: str
+ """
+ # this is a bit of a hack to support different versions of python magic.
+ # Hopefully at some point this will no longer be necessary
+ #
+ # the version with open() is the bindings shipped with the file source from
+ # http://darwinsys.com/file/ - this is what is used by the python-magic
+ # package on Debian/Ubuntu. However it is not available on pypi/via pip.
+ #
+ # the version with from_buffer() is available at
+ # https://github.com/ahupp/python-magic and directly installable via pip.
+ #
+ # for more detail see https://github.com/pazz/alot/pull/588
+ if hasattr(magic, 'open'):
+ m = magic.open(magic.MAGIC_MIME_ENCODING)
+ m.load()
+ return m.buffer(blob)
+ elif hasattr(magic, 'from_buffer'):
+ m = magic.Magic(mime_encoding=True)
+ return m.from_buffer(blob)
+ else:
+ raise Exception('Unknown magic API')
+
+# TODO: make this work on blobs, not paths
+def _mimewrap(path, filename, ctype):
+ """Take the contents of the given path and wrap them into an email MIME
+ part according to the content type. The content type is auto detected from
+ the actual file contents and the file name if it is not given.
+
+ :param path: the path to the file contents
+ :type path: str
+ :param filename: the file name to use in the generated MIME part
+ :type filename: str or None
+ :param ctype: the content type of the file contents in path
+ :type ctype: str or None
+ :returns: the message MIME part storing the data from path
+ :rtype: subclasses of email.mime.base.MIMEBase
+ """
+
+ with open(path, 'rb') as f:
+ content = f.read()
+ if not ctype:
+ ctype = helper.guess_mimetype(content)
+ # libmagic < 5.12 incorrectly detects excel/powerpoint files as
+ # 'application/msword' (see #179 and #186 in libmagic bugtracker)
+ # This is a workaround, based on file extension, useful as long
+ # as distributions still ship libmagic 5.11.
+ if (ctype == 'application/msword' and
+ not _libmagic_version_at_least(513)):
+ mimetype, _ = mimetypes.guess_type(path)
+ if mimetype:
+ ctype = mimetype
+
+ maintype, subtype = ctype.split('/', 1)
+ if maintype == 'text':
+ part = MIMEText(content.decode(_guess_encoding(content), 'replace'),
+ _subtype=subtype,
+ _charset='utf-8')
+ elif maintype == 'image':
+ part = MIMEImage(content, _subtype=subtype)
+ elif maintype == 'audio':
+ part = MIMEAudio(content, _subtype=subtype)
+ else:
+ part = MIMEBase(maintype, subtype)
+ part.set_payload(content)
+ # Encode the payload using Base64
+ email.encoders.encode_base64(part)
+ # Set the filename parameter
+ if not filename:
+ filename = os.path.basename(path)
+ part.add_header('Content-Disposition', 'attachment',
+ filename=filename)
+ return part
+
+class Envelope:
+ """
+ a message that is not yet sent and still editable.
+
+ It holds references to unencoded! body text and mail headers among other
+ things. Envelope implements the python container API for easy access of
+ header values. So `e['To']`, `e['To'] = 'foo@bar.baz'` and
+ 'e.get_all('To')' would work for an envelope `e`..
+ """
+
+ headers = None
+ """
+ dict containing the mail headers (a list of strings for each header key)
+ """
+ body = None
+ """mail body as unicode string"""
+ tmpfile = None
+ """template text for initial content"""
+ attachments = None
+ """list of :class:`Attachments <alot.db.attachment.Attachment>`"""
+ tags = None
+ """tags to add after successful sendout"""
+ account = None
+ """account to send from"""
+
+ def __init__(
+ self, template=None, bodytext=None, headers=None, attachments=None,
+ sign=False, sign_key=None, encrypt=False, tags=None, replied=None,
+ passed=None, account=None):
+ """
+ :param template: if not None, the envelope will be initialised by
+ :meth:`parsing <parse_template>` this string before
+ setting any other values given to this constructor.
+ :type template: str
+ :param bodytext: text used as body part
+ :type bodytext: str
+ :param headers: unencoded header values
+ :type headers: dict (str -> [unicode])
+ :param attachments: file attachments to include
+ :type attachments: list of :class:`~alot.db.attachment.Attachment`
+ :param tags: tags to add after successful sendout and saving this msg
+ :type tags: set of str
+ :param replied: message being replied to
+ :type replied: :class:`~alot.db.message.Message`
+ :param passed: message being passed on
+ :type replied: :class:`~alot.db.message.Message`
+ :param account: account to send from
+ :type account: :class:`Account`
+ """
+ logging.debug('TEMPLATE: %s', template)
+ if template:
+ self.parse_template(template)
+ logging.debug('PARSED TEMPLATE: %s', template)
+ logging.debug('BODY: %s', self.body)
+ self.body = bodytext or ''
+ # TODO: if this was as collections.defaultdict a number of methods
+ # could be simplified.
+ self.headers = headers or {}
+ self.attachments = list(attachments) if attachments is not None else []
+ self.sign = sign
+ self.sign_key = sign_key
+ self.encrypt = encrypt
+ self.encrypt_keys = {}
+ self.tags = tags or frozenset() # tags to add after successful sendout
+ self.replied = replied # message being replied to
+ self.passed = passed # message being passed on
+ self.sent_time = None
+ self.modified_since_sent = False
+ self.sending = False # semaphore to avoid accidental double sendout
+ self.account = account
+
+ def __str__(self):
+ return "Envelope (%s)\n%s" % (self.headers, self.body)
+
+ def __setitem__(self, name, val):
+ """setter for header values. This allows adding header like so:
+ envelope['Subject'] = 'sm\xf8rebr\xf8d'
+ """
+ if name not in self.headers:
+ self.headers[name] = []
+ self.headers[name].append(val)
+
+ if self.sent_time:
+ self.modified_since_sent = True
+
+ def __getitem__(self, name):
+ """getter for header values.
+ :raises: KeyError if undefined
+ """
+ return self.headers[name][0]
+
+ def __delitem__(self, name):
+ del self.headers[name]
+
+ if self.sent_time:
+ self.modified_since_sent = True
+
+ def __contains__(self, name):
+ return name in self.headers
+
+ def get(self, key, fallback=None):
+ """secure getter for header values that allows specifying a `fallback`
+ return string (defaults to None). This returns the first matching value
+ and doesn't raise KeyErrors"""
+ if key in self.headers:
+ value = self.headers[key][0]
+ else:
+ value = fallback
+ return value
+
+ def get_all(self, key, fallback=None):
+ """returns all header values for given key"""
+ if key in self.headers:
+ value = self.headers[key]
+ else:
+ value = fallback or []
+ return value
+
+ def add(self, key, value):
+ """add header value"""
+ if key not in self.headers:
+ self.headers[key] = []
+ self.headers[key].append(value)
+
+ if self.sent_time:
+ self.modified_since_sent = True
+
+ def attach(self, attachment, filename=None, ctype=None):
+ """
+ attach a file
+
+ :param attachment: File to attach, given as
+ :class:`~alot.db.attachment.Attachment` object or path to a file.
+ :type attachment: :class:`~alot.db.attachment.Attachment` or str
+ :param filename: filename to use in content-disposition.
+ Will be ignored if `path` matches multiple files
+ :param ctype: force content-type to be used for this attachment
+ :type ctype: str
+ """
+
+ if isinstance(attachment, Attachment):
+ self.attachments.append(attachment)
+ elif isinstance(attachment, str):
+ path = os.path.expanduser(attachment)
+ part = _mimewrap(path, filename, ctype)
+ self.attachments.append(Attachment(part))
+ else:
+ raise TypeError('attach accepts an Attachment or str')
+
+ if self.sent_time:
+ self.modified_since_sent = True
+
+ def construct_mail(self):
+ """
+ compiles the information contained in this envelope into a
+ :class:`email.message.EmailMessage`.
+ """
+ # make suire everything is 7-bit clean to avoid
+ # compatibility problems
+ # TODO: consider SMTPUTF8 support?
+ policy = email.policy.SMTP.clone(cte_type = '7bit')
+
+ # we actually use MIMEPart instead of EmailMessage, to
+ # avoid the subparts getting spurious MIME-Version everywhere
+ mail = MIMEPart(policy = policy)
+ mail.set_content(self.body, subtype = 'plain', charset = 'utf-8')
+
+ # add attachments
+ for a in self.attachments:
+ maintype, _, subtype = a.get_content_type().partition('/')
+ fname = a.get_filename()
+ data = a.get_data()
+ mail.add_attachment(data, filename = fname,
+ maintype = maintype, subtype = subtype)
+
+ if self.sign:
+ to_sign = mail
+ plaintext = to_sign.as_bytes()
+ logging.debug('signing plaintext: %s', plaintext)
+
+ try:
+ signatures, signature_blob = crypto.detached_signature_for(
+ plaintext, [self.sign_key])
+ if len(signatures) != 1:
+ raise GPGProblem("Could not sign message (GPGME "
+ "did not return a signature)",
+ code=GPGCode.KEY_CANNOT_SIGN)
+ except gpg.errors.GPGMEError as e:
+ if e.getcode() == gpg.errors.BAD_PASSPHRASE:
+ # If GPG_AGENT_INFO is unset or empty, the user just does
+ # not have gpg-agent running (properly).
+ if os.environ.get('GPG_AGENT_INFO', '').strip() == '':
+ msg = "Got invalid passphrase and GPG_AGENT_INFO\
+ not set. Please set up gpg-agent."
+ raise GPGProblem(msg, code=GPGCode.BAD_PASSPHRASE)
+ else:
+ raise GPGProblem("Bad passphrase. Is gpg-agent "
+ "running?",
+ code=GPGCode.BAD_PASSPHRASE)
+ raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_SIGN)
+
+ signature_mime = MIMEPart(policy = to_sign.policy)
+ signature_mime.set_content(signature_blob, maintype = 'application',
+ subtype = 'pgp-signature')
+ signature_mime.set_param('name', 'signature.asc')
+ signature_mime['Content-Description'] = 'signature'
+
+ # FIXME: this uses private methods, because
+ # python's "new" EmailMessage API does not
+ # allow arbitrary multipart constructs
+ mail = MIMEPart(policy = to_sign.policy)
+ mail._make_multipart('signed', (), None)
+ mail.set_param('protocol', 'application/pgp-signature')
+ mail.set_param('micalg', crypto.RFC3156_micalg_from_algo(signatures[0].hash_algo))
+ mail.attach(to_sign)
+ mail.attach(signature_mime)
+
+ if self.encrypt:
+ to_encrypt = mail
+ plaintext = to_encrypt.as_bytes()
+ logging.debug('encrypting plaintext: %s', plaintext)
+
+ try:
+ encrypted_blob = crypto.encrypt(
+ plaintext, list(self.encrypt_keys.values()))
+ except gpg.errors.GPGMEError as e:
+ raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_ENCRYPT)
+
+ version_str = b'Version: 1'
+ encryption_mime = MIMEPart(policy = to_encrypt.policy)
+ encryption_mime.set_content(version_str, maintype = 'application',
+ subtype = 'pgp-encrypted')
+
+ encrypted_mime = MIMEPart(policy = to_encrypt.policy)
+ encrypted_mime.set_content(encrypted_blob, maintype = 'application',
+ subtype = 'octet-stream')
+
+ mail = MIMEPart(policy = to_encrypt.policy)
+ mail._make_multipart('encrypted', (), None)
+ mail.set_param('protocol', 'application/pgp-encrypted')
+ mail.attach(encryption_mime)
+ mail.attach(encrypted_mime)
+
+ headers = self.headers.copy()
+
+ # add Date header
+ if 'Date' not in headers:
+ headers['Date'] = [email.utils.formatdate(localtime=True)]
+
+ # add Message-ID
+ if 'Message-ID' not in headers:
+ domain = settings.get('message_id_domain')
+ headers['Message-ID'] = [email.utils.make_msgid(domain=domain)]
+
+ if 'User-Agent' in headers:
+ uastring_format = headers['User-Agent'][0]
+ else:
+ uastring_format = settings.get('user_agent').strip()
+ uastring = uastring_format.format(version=__version__)
+ if uastring:
+ headers['User-Agent'] = [uastring]
+
+ # copy headers from envelope to mail
+ for k, vlist in headers.items():
+ for v in vlist:
+ mail.add_header(k, v)
+
+ # as we are using MIMEPart instead of EmailMessage, set the
+ # MIME version manually
+ del mail['MIME-Version']
+ mail['MIME-Version'] = '1.0'
+
+ return mail
+
+ def parse_template(self, raw, reset=False, only_body=False):
+ """parses a template or user edited string to fills this envelope.
+
+ :param raw: the string to parse.
+ :type raw: str
+ :param reset: remove previous envelope content
+ :type reset: bool
+ :param only_body: do not parse headers
+ :type only_body: bool
+ """
+ logging.debug('GoT: """\n%s\n"""', raw)
+
+ if self.sent_time:
+ self.modified_since_sent = True
+
+ if reset:
+ self.headers = {}
+
+ headerEndPos = 0
+ if not only_body:
+ # go through multiline, utf-8 encoded headers
+ # locally, lines are separated by a simple LF, not CRLF
+ # we decode the edited text ourselves here as
+ # email.message_from_file can't deal with raw utf8 header values
+ headerRe = re.compile(r'^(?P<k>.+?):(?P<v>(.|\n[ \t\r\f\v])+)$',
+ re.MULTILINE)
+ for header in headerRe.finditer(raw):
+ if header.start() > headerEndPos + 1:
+ break # switched to body
+
+ key = header.group('k')
+ # simple unfolding as decribed in
+ # https://tools.ietf.org/html/rfc2822#section-2.2.3
+ unfoldedValue = header.group('v').replace('\n', '')
+ self.add(key, unfoldedValue.strip())
+ headerEndPos = header.end()
+
+ # interpret 'Attach' pseudo header
+ if 'Attach' in self:
+ to_attach = []
+ for line in self.get_all('Attach'):
+ gpath = os.path.expanduser(line.strip())
+ to_attach += [g for g in glob.glob(gpath)
+ if os.path.isfile(g)]
+ logging.debug('Attaching: %s', to_attach)
+ for path in to_attach:
+ self.attach(path)
+ del self['Attach']
+
+ self.body = raw[headerEndPos:].strip()
+
+ _MAILTO_PREFIX = 'mailto:'
+ _MAILTO_SAFE_HEADERS = ('Subject', 'Cc', 'Keywords')
+
+ @classmethod
+ def from_mailto(cls, mailto):
+ if not mailto.startswith(cls._MAILTO_PREFIX):
+ raise ValueError('Invalid mailto string: %s' % mailto)
+ mailto = mailto[len(cls._MAILTO_PREFIX):]
+
+ headers = {}
+ body = ''
+
+ to, _, hfields = mailto.partition('?')
+ to = unquote(to)
+ if to:
+ headers['To'] = [to]
+
+ for hfield in hfields.split('&'):
+ key, _, value = hfield.partition('=')
+
+ key = key.capitalize()
+ value = unquote(value)
+ if not value:
+ continue
+
+ if key == 'Body':
+ body = value
+ elif key in cls._MAILTO_SAFE_HEADERS:
+ headers[key] = [value]
+
+ return cls(headers = headers, bodytext = body)