# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file import glob import logging import os import re import email import email.policy from email.message import MIMEPart import email.charset as charset from urllib.parse import unquote import gpg from . import headers as HDR from .attachment import Attachment from .. import __version__ from .. import crypto from ..settings.const import settings from ..errors import GPGProblem, GPGCode from ..utils.magic import guess_mimetype charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') class _EnvelopeHeaders: """ A dict-like container for envelope headers. Handles header-name case insensitivity and multiple headers. """ _keys_lower = None _keys = None _vals = None def __init__(self, data = None): self.clear() if data: for key, val in data.items(): self.add(key, val) def _key_to_idx(self, key): key = key.lower() if not key in self._keys_lower: raise KeyError('No such header: ' + key) return self._keys_lower.index(key) def __setitem__(self, key, val): try: idx = self._key_to_idx(key) self._keys[idx] = key self._vals[idx] = val except KeyError: self.add(key, val) def __getitem__(self, key): idx = self._key_to_idx(key) return self._vals[idx][0] def __delitem__(self, key): deleted = False while True: try: idx = self._key_to_idx(key) except KeyError: if deleted: return raise del self._keys_lower[idx] del self._keys[idx] del self._vals[idx] deleted = True def __contains__(self, key): return key.lower() in self._keys_lower def get(self, key, fallback = None): return self[key] if key in self else fallback def get_all(self, key): kl = key.lower() ret = [] for idx in range(len(self._keys)): if kl == self._keys_lower[idx]: ret.append(self._vals[idx]) return ret def add(self, key, val): self._keys_lower.append(key.lower()) self._keys.append(key) self._vals.append(val) def __iter__(self): return iter(self._keys) def keys(self): return self._keys def values(self): return self._vals def items(self): return zip(self.keys(), self.values()) def clear(self): self._keys_lower = [] self._keys = [] self._vals = [] def copy(self): ret = self.__class__() for k, v in self.items(): ret.add(k, v) return ret def __str__(self): return '\n'.join((k + ': ' + v\ for k, v in self.items())) class Envelope: """ a message that is not yet sent and still editable. It holds references to unencoded! body text and mail headers among other things. Envelope implements the python container API for easy access of header values. So `e['To']`, `e['To'] = 'foo@bar.baz'` and 'e.get_all('To')' would work for an envelope `e`.. """ headers = None """ dict containing the mail headers (a list of strings for each header key) """ body = None """mail body as unicode string""" tmpfile = None """template text for initial content""" attachments = None """list of :class:`Attachments `""" tags = None """tags to add after successful sendout""" account = None """account to send from""" def __init__( self, template=None, bodytext=None, headers=None, attachments=None, sign=False, sign_key=None, encrypt=False, tags=None, replied=None, passed=None, account=None): """ :param template: if not None, the envelope will be initialised by :meth:`parsing ` this string before setting any other values given to this constructor. :type template: str :param bodytext: text used as body part :type bodytext: str :param headers: unencoded header values :type headers: dict (str -> [unicode]) :param attachments: file attachments to include :type attachments: list of :class:`~alot.mail.attachment.Attachment` :param tags: tags to add after successful sendout and saving this msg :type tags: set of str :param replied: message being replied to :type replied: :class:`~alot.db.message.Message` :param passed: message being passed on :type replied: :class:`~alot.db.message.Message` :param account: account to send from :type account: :class:`Account` """ logging.debug('TEMPLATE: %s', template) if template: self.parse_template(template) logging.debug('PARSED TEMPLATE: %s', template) logging.debug('BODY: %s', self.body) self.body = bodytext or '' self.headers = _EnvelopeHeaders(headers) self.attachments = list(attachments) if attachments is not None else [] self.sign = sign self.sign_key = sign_key self.encrypt = encrypt self.encrypt_keys = {} self.tags = tags or frozenset() # tags to add after successful sendout self.replied = replied # message being replied to self.passed = passed # message being passed on self.sent_time = None self.modified_since_sent = False self.sending = False # semaphore to avoid accidental double sendout self.account = account def __str__(self): return "Envelope (%s)\n%s" % (self.headers, self.body) def __setitem__(self, name, val): self.headers[name] = val if self.sent_time: self.modified_since_sent = True def __getitem__(self, name): """getter for header values. :raises: KeyError if undefined """ return self.headers[name] def __delitem__(self, name): del self.headers[name] if self.sent_time: self.modified_since_sent = True def __contains__(self, name): return name in self.headers def get(self, key, fallback=None): return self.headers.get(key, fallback) def get_all(self, key): return self.headers.get_all(key) def add(self, key, value): self.headers.add(key, value) if self.sent_time: self.modified_since_sent = True def attach_file(self, path, filename = None): """ Attach a file at a given path. :param path path to the file to attach :type path str :param filename filename to write into attachment properties :type filename str or None """ with open(path, 'rb') as f: data = f.read() ctype = guess_mimetype(data) params = [] # accept only valid utf-8 as text attachments if ctype.partition('/')[0] == 'text': try: data.decode('utf-8') except UnicodeDecodeError as e: raise ValueError('Attachment is not valid UTF-8') from e params.append(('charset', 'utf-8')) # Set the filename parameter if not filename: filename = os.path.basename(path) attachment = Attachment(data, ctype, filename, tuple(params)) self.attach(attachment) def attach(self, attachment): """ attach data :param attachment: data to attach :type attachment: :class:`~alot.mail.attachment.Attachment` """ self.attachments.append(attachment) if self.sent_time: self.modified_since_sent = True def construct_mail(self): """ compiles the information contained in this envelope into a :class:`email.message.EmailMessage`. """ # make suire everything is 7-bit clean to avoid # compatibility problems # TODO: consider SMTPUTF8 support? policy = email.policy.SMTP.clone(cte_type = '7bit') # we actually use MIMEPart instead of EmailMessage, to # avoid the subparts getting spurious MIME-Version everywhere mail = MIMEPart(policy = policy) mail.set_content(self.body, subtype = 'plain', charset = 'utf-8') # add attachments for a in self.attachments: mail.add_attachment(a.data, filename = a.filename, maintype = a.content_maintype, subtype = a.content_subtype, params = { k : v for (k, v) in a.params }) if self.sign: to_sign = mail plaintext = to_sign.as_bytes() logging.debug('signing plaintext: %s', plaintext) try: signatures, signature_blob = crypto.detached_signature_for( plaintext, [self.sign_key]) if len(signatures) != 1: raise GPGProblem("Could not sign message (GPGME " "did not return a signature)", code=GPGCode.KEY_CANNOT_SIGN) except gpg.errors.GPGMEError as e: if e.getcode() == gpg.errors.BAD_PASSPHRASE: # If GPG_AGENT_INFO is unset or empty, the user just does # not have gpg-agent running (properly). if os.environ.get('GPG_AGENT_INFO', '').strip() == '': msg = "Got invalid passphrase and GPG_AGENT_INFO\ not set. Please set up gpg-agent." raise GPGProblem(msg, code=GPGCode.BAD_PASSPHRASE) else: raise GPGProblem("Bad passphrase. Is gpg-agent " "running?", code=GPGCode.BAD_PASSPHRASE) raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_SIGN) signature_mime = MIMEPart(policy = to_sign.policy) signature_mime.set_content(signature_blob, maintype = 'application', subtype = 'pgp-signature') signature_mime.set_param('name', 'signature.asc') signature_mime['Content-Description'] = 'signature' # FIXME: this uses private methods, because # python's "new" EmailMessage API does not # allow arbitrary multipart constructs mail = MIMEPart(policy = to_sign.policy) mail._make_multipart('signed', (), None) mail.set_param('protocol', 'application/pgp-signature') mail.set_param('micalg', crypto.RFC3156_micalg_from_algo(signatures[0].hash_algo)) mail.attach(to_sign) mail.attach(signature_mime) if self.encrypt: to_encrypt = mail plaintext = to_encrypt.as_bytes() logging.debug('encrypting plaintext: %s', plaintext) try: encrypted_blob = crypto.encrypt( plaintext, list(self.encrypt_keys.values())) except gpg.errors.GPGMEError as e: raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_ENCRYPT) version_str = b'Version: 1' encryption_mime = MIMEPart(policy = to_encrypt.policy) encryption_mime.set_content(version_str, maintype = 'application', subtype = 'pgp-encrypted') encrypted_mime = MIMEPart(policy = to_encrypt.policy) encrypted_mime.set_content(encrypted_blob, maintype = 'application', subtype = 'octet-stream') mail = MIMEPart(policy = to_encrypt.policy) mail._make_multipart('encrypted', (), None) mail.set_param('protocol', 'application/pgp-encrypted') mail.attach(encryption_mime) mail.attach(encrypted_mime) headers = self.headers.copy() # add Date header if HDR.DATE not in headers: headers[HDR.DATE] = email.utils.formatdate(localtime=True) # add Message-ID if HDR.MESSAGE_ID not in headers: domain = settings.get('message_id_domain') headers[HDR.MESSAGE_ID] = email.utils.make_msgid(domain=domain) if HDR.USER_AGENT in headers: uastring_format = headers[HDR.USER_AGENT] else: uastring_format = settings.get('user_agent').strip() uastring = uastring_format.format(version=__version__) if uastring: headers[HDR.USER_AGENT] = uastring # copy headers from envelope to mail for k, v in headers.items(): mail.add_header(k, v) # as we are using MIMEPart instead of EmailMessage, set the # MIME version manually del mail[HDR.MIME_VERSION] mail[HDR.MIME_VERSION] = '1.0' return mail def parse_template(self, raw, reset=False, only_body=False): """parses a template or user edited string to fills this envelope. :param raw: the string to parse. :type raw: str :param reset: remove previous envelope content :type reset: bool :param only_body: do not parse headers :type only_body: bool """ logging.debug('GoT: """\n%s\n"""', raw) if self.sent_time: self.modified_since_sent = True if reset: self.headers.clear() if only_body: self.body = raw else: # Split the raw string into headers and body. # The string should be in "pseudo-email" format: a sequence of # (possibly folded) headers, followed by an empty line, followed by # the body. Since it may come from the user's text editor, we try to # be lenient in parsing it. # # The differences from the real email format are: # - line breaks do not have to be CRLF as per the RFC, but can be # whatever the user's text editor wrote; # - all text is a plain Unicode string, with no email encoding # applied. # - Attach headers are interpreted as instruction to us to attach # the specified files. # Since the email package cannot parse this (FIXME: might actually # be possible with a custom policy - check this), we do it manually # ourselves. # Use bytes.splitlines(), so that only ASCII CR, LF, or CRLF are # considered. str.splitlines() would also split on various Unicode # linebreaks which we probably? want to preserve. lines = list(map(lambda l: l.decode('utf-8'), raw.encode('utf-8').splitlines())) # list of the last seen [header name, header value], for unfolding prev_header = None for i, l in enumerate(lines): if l and l[0] in ' \t' and prev_header: # continuation of a folded header prev_header[1] += l elif re.match('[!-9;-~]+:', l): # beginning of a new header # as per RFC5322 2.2, header names are ASCII chars 33-126 # except colon if prev_header: self.add(*prev_header) prev_header = l.split(':', maxsplit = 1) else: # anything else is assumed to start the body # skip the empty line separating headers from the body, # if present idx = i if (len(l) > 0 or i == 0) else i + 1 self.body = '\n'.join(lines[idx:]) break if prev_header: self.add(*prev_header) # interpret 'Attach' pseudo header if 'Attach' in self: to_attach = [] for line in self.get_all('Attach'): gpath = os.path.expanduser(line.strip()) to_attach += [g for g in glob.glob(gpath) if os.path.isfile(g)] logging.debug('Attaching: %s', to_attach) for path in to_attach: self.attach_file(path) del self['Attach'] _MAILTO_PREFIX = 'mailto:' _MAILTO_SAFE_HEADERS = (HDR.SUBJECT, HDR.CC, HDR.KEYWORDS) @classmethod def from_mailto(cls, mailto): if not mailto.startswith(cls._MAILTO_PREFIX): raise ValueError('Invalid mailto string: %s' % mailto) mailto = mailto[len(cls._MAILTO_PREFIX):] headers = {} body = '' to, _, hfields = mailto.partition('?') to = unquote(to) if to: headers[HDR.TO] = to for hfield in hfields.split('&'): key, _, value = hfield.partition('=') key = key.capitalize() value = unquote(value) if not value: continue if key == 'Body': body = value elif key in cls._MAILTO_SAFE_HEADERS: headers[key] = value return cls(headers = headers, bodytext = body)