diff options
author | Anton Khirnov <anton@khirnov.net> | 2021-01-30 14:12:33 +0100 |
---|---|---|
committer | Anton Khirnov <anton@khirnov.net> | 2021-01-30 14:12:33 +0100 |
commit | cd35ec5f89cff3ba8c7780209efa7e8b0628744d (patch) | |
tree | a5f21a0b08ab2ce8a182f6cce7283f15d8f14dea /alot/db | |
parent | 5dfe5a2831adbe3ec129d2ede1d9039739e98b71 (diff) |
db/envelope: move to a new module "mail"
It has nothing to do with the database.
Diffstat (limited to 'alot/db')
-rw-r--r-- | alot/db/envelope.py | 494 |
1 files changed, 0 insertions, 494 deletions
diff --git a/alot/db/envelope.py b/alot/db/envelope.py deleted file mode 100644 index f9909308..00000000 --- a/alot/db/envelope.py +++ /dev/null @@ -1,494 +0,0 @@ -# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> -# This file is released under the GNU GPL, version 3 or a later revision. -# For further details see the COPYING file -import glob -import logging -import os -import re -import email -import email.policy -from email.encoders import encode_7or8bit -from email.message import MIMEPart -from email.mime.audio import MIMEAudio -from email.mime.base import MIMEBase -from email.mime.image import MIMEImage -from email.mime.text import MIMEText -import email.charset as charset -from urllib.parse import unquote - -import gpg -import magic - -from .attachment import Attachment -from .. import __version__ -from .. import helper -from .. import crypto -from ..settings.const import settings -from ..errors import GPGProblem, GPGCode - -charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') - -def _libmagic_version_at_least(version): - """ - checks if the libmagic library installed is more recent than a given - version. - - :param version: minimum version expected in the form XYY (i.e. 5.14 -> 514) - with XYY >= 513 - """ - if hasattr(magic, 'open'): - magic_wrapper = magic._libraries['magic'] - elif hasattr(magic, 'from_buffer'): - magic_wrapper = magic.libmagic - else: - raise Exception('Unknown magic API') - - if not hasattr(magic_wrapper, 'magic_version'): - # The magic_version function has been introduced in libmagic 5.13, - # if it's not present, we can't guess right, so let's assume False - return False - - # Depending on the libmagic/ctypes version, magic_version is a function or - # a callable: - if callable(magic_wrapper.magic_version): - return magic_wrapper.magic_version() >= version - - return magic_wrapper.magic_version >= version - -def _guess_encoding(blob): - """ - uses file magic to determine the encoding of the given data blob. - - :param blob: file content as read by file.read() - :type blob: data - :returns: encoding - :rtype: str - """ - # this is a bit of a hack to support different versions of python magic. - # Hopefully at some point this will no longer be necessary - # - # the version with open() is the bindings shipped with the file source from - # http://darwinsys.com/file/ - this is what is used by the python-magic - # package on Debian/Ubuntu. However it is not available on pypi/via pip. - # - # the version with from_buffer() is available at - # https://github.com/ahupp/python-magic and directly installable via pip. - # - # for more detail see https://github.com/pazz/alot/pull/588 - if hasattr(magic, 'open'): - m = magic.open(magic.MAGIC_MIME_ENCODING) - m.load() - return m.buffer(blob) - elif hasattr(magic, 'from_buffer'): - m = magic.Magic(mime_encoding=True) - return m.from_buffer(blob) - else: - raise Exception('Unknown magic API') - -# TODO: make this work on blobs, not paths -def _mimewrap(path, filename, ctype): - """Take the contents of the given path and wrap them into an email MIME - part according to the content type. The content type is auto detected from - the actual file contents and the file name if it is not given. - - :param path: the path to the file contents - :type path: str - :param filename: the file name to use in the generated MIME part - :type filename: str or None - :param ctype: the content type of the file contents in path - :type ctype: str or None - :returns: the message MIME part storing the data from path - :rtype: subclasses of email.mime.base.MIMEBase - """ - - with open(path, 'rb') as f: - content = f.read() - if not ctype: - ctype = helper.guess_mimetype(content) - # libmagic < 5.12 incorrectly detects excel/powerpoint files as - # 'application/msword' (see #179 and #186 in libmagic bugtracker) - # This is a workaround, based on file extension, useful as long - # as distributions still ship libmagic 5.11. - if (ctype == 'application/msword' and - not _libmagic_version_at_least(513)): - mimetype, _ = mimetypes.guess_type(path) - if mimetype: - ctype = mimetype - - maintype, subtype = ctype.split('/', 1) - if maintype == 'text': - part = MIMEText(content.decode(_guess_encoding(content), 'replace'), - _subtype=subtype, - _charset='utf-8') - elif maintype == 'image': - part = MIMEImage(content, _subtype=subtype) - elif maintype == 'audio': - part = MIMEAudio(content, _subtype=subtype) - else: - part = MIMEBase(maintype, subtype) - part.set_payload(content) - # Encode the payload using Base64 - email.encoders.encode_base64(part) - # Set the filename parameter - if not filename: - filename = os.path.basename(path) - part.add_header('Content-Disposition', 'attachment', - filename=filename) - return part - -class Envelope: - """ - a message that is not yet sent and still editable. - - It holds references to unencoded! body text and mail headers among other - things. Envelope implements the python container API for easy access of - header values. So `e['To']`, `e['To'] = 'foo@bar.baz'` and - 'e.get_all('To')' would work for an envelope `e`.. - """ - - headers = None - """ - dict containing the mail headers (a list of strings for each header key) - """ - body = None - """mail body as unicode string""" - tmpfile = None - """template text for initial content""" - attachments = None - """list of :class:`Attachments <alot.db.attachment.Attachment>`""" - tags = None - """tags to add after successful sendout""" - account = None - """account to send from""" - - def __init__( - self, template=None, bodytext=None, headers=None, attachments=None, - sign=False, sign_key=None, encrypt=False, tags=None, replied=None, - passed=None, account=None): - """ - :param template: if not None, the envelope will be initialised by - :meth:`parsing <parse_template>` this string before - setting any other values given to this constructor. - :type template: str - :param bodytext: text used as body part - :type bodytext: str - :param headers: unencoded header values - :type headers: dict (str -> [unicode]) - :param attachments: file attachments to include - :type attachments: list of :class:`~alot.db.attachment.Attachment` - :param tags: tags to add after successful sendout and saving this msg - :type tags: set of str - :param replied: message being replied to - :type replied: :class:`~alot.db.message.Message` - :param passed: message being passed on - :type replied: :class:`~alot.db.message.Message` - :param account: account to send from - :type account: :class:`Account` - """ - logging.debug('TEMPLATE: %s', template) - if template: - self.parse_template(template) - logging.debug('PARSED TEMPLATE: %s', template) - logging.debug('BODY: %s', self.body) - self.body = bodytext or '' - # TODO: if this was as collections.defaultdict a number of methods - # could be simplified. - self.headers = headers or {} - self.attachments = list(attachments) if attachments is not None else [] - self.sign = sign - self.sign_key = sign_key - self.encrypt = encrypt - self.encrypt_keys = {} - self.tags = tags or frozenset() # tags to add after successful sendout - self.replied = replied # message being replied to - self.passed = passed # message being passed on - self.sent_time = None - self.modified_since_sent = False - self.sending = False # semaphore to avoid accidental double sendout - self.account = account - - def __str__(self): - return "Envelope (%s)\n%s" % (self.headers, self.body) - - def __setitem__(self, name, val): - """setter for header values. This allows adding header like so: - envelope['Subject'] = 'sm\xf8rebr\xf8d' - """ - if name not in self.headers: - self.headers[name] = [] - self.headers[name].append(val) - - if self.sent_time: - self.modified_since_sent = True - - def __getitem__(self, name): - """getter for header values. - :raises: KeyError if undefined - """ - return self.headers[name][0] - - def __delitem__(self, name): - del self.headers[name] - - if self.sent_time: - self.modified_since_sent = True - - def __contains__(self, name): - return name in self.headers - - def get(self, key, fallback=None): - """secure getter for header values that allows specifying a `fallback` - return string (defaults to None). This returns the first matching value - and doesn't raise KeyErrors""" - if key in self.headers: - value = self.headers[key][0] - else: - value = fallback - return value - - def get_all(self, key, fallback=None): - """returns all header values for given key""" - if key in self.headers: - value = self.headers[key] - else: - value = fallback or [] - return value - - def add(self, key, value): - """add header value""" - if key not in self.headers: - self.headers[key] = [] - self.headers[key].append(value) - - if self.sent_time: - self.modified_since_sent = True - - def attach(self, attachment, filename=None, ctype=None): - """ - attach a file - - :param attachment: File to attach, given as - :class:`~alot.db.attachment.Attachment` object or path to a file. - :type attachment: :class:`~alot.db.attachment.Attachment` or str - :param filename: filename to use in content-disposition. - Will be ignored if `path` matches multiple files - :param ctype: force content-type to be used for this attachment - :type ctype: str - """ - - if isinstance(attachment, Attachment): - self.attachments.append(attachment) - elif isinstance(attachment, str): - path = os.path.expanduser(attachment) - part = _mimewrap(path, filename, ctype) - self.attachments.append(Attachment(part)) - else: - raise TypeError('attach accepts an Attachment or str') - - if self.sent_time: - self.modified_since_sent = True - - def construct_mail(self): - """ - compiles the information contained in this envelope into a - :class:`email.message.EmailMessage`. - """ - # make suire everything is 7-bit clean to avoid - # compatibility problems - # TODO: consider SMTPUTF8 support? - policy = email.policy.SMTP.clone(cte_type = '7bit') - - # we actually use MIMEPart instead of EmailMessage, to - # avoid the subparts getting spurious MIME-Version everywhere - mail = MIMEPart(policy = policy) - mail.set_content(self.body, subtype = 'plain', charset = 'utf-8') - - # add attachments - for a in self.attachments: - maintype, _, subtype = a.get_content_type().partition('/') - fname = a.get_filename() - data = a.get_data() - mail.add_attachment(data, filename = fname, - maintype = maintype, subtype = subtype) - - if self.sign: - to_sign = mail - plaintext = to_sign.as_bytes() - logging.debug('signing plaintext: %s', plaintext) - - try: - signatures, signature_blob = crypto.detached_signature_for( - plaintext, [self.sign_key]) - if len(signatures) != 1: - raise GPGProblem("Could not sign message (GPGME " - "did not return a signature)", - code=GPGCode.KEY_CANNOT_SIGN) - except gpg.errors.GPGMEError as e: - if e.getcode() == gpg.errors.BAD_PASSPHRASE: - # If GPG_AGENT_INFO is unset or empty, the user just does - # not have gpg-agent running (properly). - if os.environ.get('GPG_AGENT_INFO', '').strip() == '': - msg = "Got invalid passphrase and GPG_AGENT_INFO\ - not set. Please set up gpg-agent." - raise GPGProblem(msg, code=GPGCode.BAD_PASSPHRASE) - else: - raise GPGProblem("Bad passphrase. Is gpg-agent " - "running?", - code=GPGCode.BAD_PASSPHRASE) - raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_SIGN) - - signature_mime = MIMEPart(policy = to_sign.policy) - signature_mime.set_content(signature_blob, maintype = 'application', - subtype = 'pgp-signature') - signature_mime.set_param('name', 'signature.asc') - signature_mime['Content-Description'] = 'signature' - - # FIXME: this uses private methods, because - # python's "new" EmailMessage API does not - # allow arbitrary multipart constructs - mail = MIMEPart(policy = to_sign.policy) - mail._make_multipart('signed', (), None) - mail.set_param('protocol', 'application/pgp-signature') - mail.set_param('micalg', crypto.RFC3156_micalg_from_algo(signatures[0].hash_algo)) - mail.attach(to_sign) - mail.attach(signature_mime) - - if self.encrypt: - to_encrypt = mail - plaintext = to_encrypt.as_bytes() - logging.debug('encrypting plaintext: %s', plaintext) - - try: - encrypted_blob = crypto.encrypt( - plaintext, list(self.encrypt_keys.values())) - except gpg.errors.GPGMEError as e: - raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_ENCRYPT) - - version_str = b'Version: 1' - encryption_mime = MIMEPart(policy = to_encrypt.policy) - encryption_mime.set_content(version_str, maintype = 'application', - subtype = 'pgp-encrypted') - - encrypted_mime = MIMEPart(policy = to_encrypt.policy) - encrypted_mime.set_content(encrypted_blob, maintype = 'application', - subtype = 'octet-stream') - - mail = MIMEPart(policy = to_encrypt.policy) - mail._make_multipart('encrypted', (), None) - mail.set_param('protocol', 'application/pgp-encrypted') - mail.attach(encryption_mime) - mail.attach(encrypted_mime) - - headers = self.headers.copy() - - # add Date header - if 'Date' not in headers: - headers['Date'] = [email.utils.formatdate(localtime=True)] - - # add Message-ID - if 'Message-ID' not in headers: - domain = settings.get('message_id_domain') - headers['Message-ID'] = [email.utils.make_msgid(domain=domain)] - - if 'User-Agent' in headers: - uastring_format = headers['User-Agent'][0] - else: - uastring_format = settings.get('user_agent').strip() - uastring = uastring_format.format(version=__version__) - if uastring: - headers['User-Agent'] = [uastring] - - # copy headers from envelope to mail - for k, vlist in headers.items(): - for v in vlist: - mail.add_header(k, v) - - # as we are using MIMEPart instead of EmailMessage, set the - # MIME version manually - del mail['MIME-Version'] - mail['MIME-Version'] = '1.0' - - return mail - - def parse_template(self, raw, reset=False, only_body=False): - """parses a template or user edited string to fills this envelope. - - :param raw: the string to parse. - :type raw: str - :param reset: remove previous envelope content - :type reset: bool - :param only_body: do not parse headers - :type only_body: bool - """ - logging.debug('GoT: """\n%s\n"""', raw) - - if self.sent_time: - self.modified_since_sent = True - - if reset: - self.headers = {} - - headerEndPos = 0 - if not only_body: - # go through multiline, utf-8 encoded headers - # locally, lines are separated by a simple LF, not CRLF - # we decode the edited text ourselves here as - # email.message_from_file can't deal with raw utf8 header values - headerRe = re.compile(r'^(?P<k>.+?):(?P<v>(.|\n[ \t\r\f\v])+)$', - re.MULTILINE) - for header in headerRe.finditer(raw): - if header.start() > headerEndPos + 1: - break # switched to body - - key = header.group('k') - # simple unfolding as decribed in - # https://tools.ietf.org/html/rfc2822#section-2.2.3 - unfoldedValue = header.group('v').replace('\n', '') - self.add(key, unfoldedValue.strip()) - headerEndPos = header.end() - - # interpret 'Attach' pseudo header - if 'Attach' in self: - to_attach = [] - for line in self.get_all('Attach'): - gpath = os.path.expanduser(line.strip()) - to_attach += [g for g in glob.glob(gpath) - if os.path.isfile(g)] - logging.debug('Attaching: %s', to_attach) - for path in to_attach: - self.attach(path) - del self['Attach'] - - self.body = raw[headerEndPos:].strip() - - _MAILTO_PREFIX = 'mailto:' - _MAILTO_SAFE_HEADERS = ('Subject', 'Cc', 'Keywords') - - @classmethod - def from_mailto(cls, mailto): - if not mailto.startswith(cls._MAILTO_PREFIX): - raise ValueError('Invalid mailto string: %s' % mailto) - mailto = mailto[len(cls._MAILTO_PREFIX):] - - headers = {} - body = '' - - to, _, hfields = mailto.partition('?') - to = unquote(to) - if to: - headers['To'] = [to] - - for hfield in hfields.split('&'): - key, _, value = hfield.partition('=') - - key = key.capitalize() - value = unquote(value) - if not value: - continue - - if key == 'Body': - body = value - elif key in cls._MAILTO_SAFE_HEADERS: - headers[key] = [value] - - return cls(headers = headers, bodytext = body) |