summaryrefslogtreecommitdiff
path: root/alot/db
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2021-01-30 14:12:33 +0100
committerAnton Khirnov <anton@khirnov.net>2021-01-30 14:12:33 +0100
commitcd35ec5f89cff3ba8c7780209efa7e8b0628744d (patch)
treea5f21a0b08ab2ce8a182f6cce7283f15d8f14dea /alot/db
parent5dfe5a2831adbe3ec129d2ede1d9039739e98b71 (diff)
db/envelope: move to a new module "mail"
It has nothing to do with the database.
Diffstat (limited to 'alot/db')
-rw-r--r--alot/db/envelope.py494
1 files changed, 0 insertions, 494 deletions
diff --git a/alot/db/envelope.py b/alot/db/envelope.py
deleted file mode 100644
index f9909308..00000000
--- a/alot/db/envelope.py
+++ /dev/null
@@ -1,494 +0,0 @@
-# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
-# This file is released under the GNU GPL, version 3 or a later revision.
-# For further details see the COPYING file
-import glob
-import logging
-import os
-import re
-import email
-import email.policy
-from email.encoders import encode_7or8bit
-from email.message import MIMEPart
-from email.mime.audio import MIMEAudio
-from email.mime.base import MIMEBase
-from email.mime.image import MIMEImage
-from email.mime.text import MIMEText
-import email.charset as charset
-from urllib.parse import unquote
-
-import gpg
-import magic
-
-from .attachment import Attachment
-from .. import __version__
-from .. import helper
-from .. import crypto
-from ..settings.const import settings
-from ..errors import GPGProblem, GPGCode
-
-charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
-
-def _libmagic_version_at_least(version):
- """
- checks if the libmagic library installed is more recent than a given
- version.
-
- :param version: minimum version expected in the form XYY (i.e. 5.14 -> 514)
- with XYY >= 513
- """
- if hasattr(magic, 'open'):
- magic_wrapper = magic._libraries['magic']
- elif hasattr(magic, 'from_buffer'):
- magic_wrapper = magic.libmagic
- else:
- raise Exception('Unknown magic API')
-
- if not hasattr(magic_wrapper, 'magic_version'):
- # The magic_version function has been introduced in libmagic 5.13,
- # if it's not present, we can't guess right, so let's assume False
- return False
-
- # Depending on the libmagic/ctypes version, magic_version is a function or
- # a callable:
- if callable(magic_wrapper.magic_version):
- return magic_wrapper.magic_version() >= version
-
- return magic_wrapper.magic_version >= version
-
-def _guess_encoding(blob):
- """
- uses file magic to determine the encoding of the given data blob.
-
- :param blob: file content as read by file.read()
- :type blob: data
- :returns: encoding
- :rtype: str
- """
- # this is a bit of a hack to support different versions of python magic.
- # Hopefully at some point this will no longer be necessary
- #
- # the version with open() is the bindings shipped with the file source from
- # http://darwinsys.com/file/ - this is what is used by the python-magic
- # package on Debian/Ubuntu. However it is not available on pypi/via pip.
- #
- # the version with from_buffer() is available at
- # https://github.com/ahupp/python-magic and directly installable via pip.
- #
- # for more detail see https://github.com/pazz/alot/pull/588
- if hasattr(magic, 'open'):
- m = magic.open(magic.MAGIC_MIME_ENCODING)
- m.load()
- return m.buffer(blob)
- elif hasattr(magic, 'from_buffer'):
- m = magic.Magic(mime_encoding=True)
- return m.from_buffer(blob)
- else:
- raise Exception('Unknown magic API')
-
-# TODO: make this work on blobs, not paths
-def _mimewrap(path, filename, ctype):
- """Take the contents of the given path and wrap them into an email MIME
- part according to the content type. The content type is auto detected from
- the actual file contents and the file name if it is not given.
-
- :param path: the path to the file contents
- :type path: str
- :param filename: the file name to use in the generated MIME part
- :type filename: str or None
- :param ctype: the content type of the file contents in path
- :type ctype: str or None
- :returns: the message MIME part storing the data from path
- :rtype: subclasses of email.mime.base.MIMEBase
- """
-
- with open(path, 'rb') as f:
- content = f.read()
- if not ctype:
- ctype = helper.guess_mimetype(content)
- # libmagic < 5.12 incorrectly detects excel/powerpoint files as
- # 'application/msword' (see #179 and #186 in libmagic bugtracker)
- # This is a workaround, based on file extension, useful as long
- # as distributions still ship libmagic 5.11.
- if (ctype == 'application/msword' and
- not _libmagic_version_at_least(513)):
- mimetype, _ = mimetypes.guess_type(path)
- if mimetype:
- ctype = mimetype
-
- maintype, subtype = ctype.split('/', 1)
- if maintype == 'text':
- part = MIMEText(content.decode(_guess_encoding(content), 'replace'),
- _subtype=subtype,
- _charset='utf-8')
- elif maintype == 'image':
- part = MIMEImage(content, _subtype=subtype)
- elif maintype == 'audio':
- part = MIMEAudio(content, _subtype=subtype)
- else:
- part = MIMEBase(maintype, subtype)
- part.set_payload(content)
- # Encode the payload using Base64
- email.encoders.encode_base64(part)
- # Set the filename parameter
- if not filename:
- filename = os.path.basename(path)
- part.add_header('Content-Disposition', 'attachment',
- filename=filename)
- return part
-
-class Envelope:
- """
- a message that is not yet sent and still editable.
-
- It holds references to unencoded! body text and mail headers among other
- things. Envelope implements the python container API for easy access of
- header values. So `e['To']`, `e['To'] = 'foo@bar.baz'` and
- 'e.get_all('To')' would work for an envelope `e`..
- """
-
- headers = None
- """
- dict containing the mail headers (a list of strings for each header key)
- """
- body = None
- """mail body as unicode string"""
- tmpfile = None
- """template text for initial content"""
- attachments = None
- """list of :class:`Attachments <alot.db.attachment.Attachment>`"""
- tags = None
- """tags to add after successful sendout"""
- account = None
- """account to send from"""
-
- def __init__(
- self, template=None, bodytext=None, headers=None, attachments=None,
- sign=False, sign_key=None, encrypt=False, tags=None, replied=None,
- passed=None, account=None):
- """
- :param template: if not None, the envelope will be initialised by
- :meth:`parsing <parse_template>` this string before
- setting any other values given to this constructor.
- :type template: str
- :param bodytext: text used as body part
- :type bodytext: str
- :param headers: unencoded header values
- :type headers: dict (str -> [unicode])
- :param attachments: file attachments to include
- :type attachments: list of :class:`~alot.db.attachment.Attachment`
- :param tags: tags to add after successful sendout and saving this msg
- :type tags: set of str
- :param replied: message being replied to
- :type replied: :class:`~alot.db.message.Message`
- :param passed: message being passed on
- :type replied: :class:`~alot.db.message.Message`
- :param account: account to send from
- :type account: :class:`Account`
- """
- logging.debug('TEMPLATE: %s', template)
- if template:
- self.parse_template(template)
- logging.debug('PARSED TEMPLATE: %s', template)
- logging.debug('BODY: %s', self.body)
- self.body = bodytext or ''
- # TODO: if this was as collections.defaultdict a number of methods
- # could be simplified.
- self.headers = headers or {}
- self.attachments = list(attachments) if attachments is not None else []
- self.sign = sign
- self.sign_key = sign_key
- self.encrypt = encrypt
- self.encrypt_keys = {}
- self.tags = tags or frozenset() # tags to add after successful sendout
- self.replied = replied # message being replied to
- self.passed = passed # message being passed on
- self.sent_time = None
- self.modified_since_sent = False
- self.sending = False # semaphore to avoid accidental double sendout
- self.account = account
-
- def __str__(self):
- return "Envelope (%s)\n%s" % (self.headers, self.body)
-
- def __setitem__(self, name, val):
- """setter for header values. This allows adding header like so:
- envelope['Subject'] = 'sm\xf8rebr\xf8d'
- """
- if name not in self.headers:
- self.headers[name] = []
- self.headers[name].append(val)
-
- if self.sent_time:
- self.modified_since_sent = True
-
- def __getitem__(self, name):
- """getter for header values.
- :raises: KeyError if undefined
- """
- return self.headers[name][0]
-
- def __delitem__(self, name):
- del self.headers[name]
-
- if self.sent_time:
- self.modified_since_sent = True
-
- def __contains__(self, name):
- return name in self.headers
-
- def get(self, key, fallback=None):
- """secure getter for header values that allows specifying a `fallback`
- return string (defaults to None). This returns the first matching value
- and doesn't raise KeyErrors"""
- if key in self.headers:
- value = self.headers[key][0]
- else:
- value = fallback
- return value
-
- def get_all(self, key, fallback=None):
- """returns all header values for given key"""
- if key in self.headers:
- value = self.headers[key]
- else:
- value = fallback or []
- return value
-
- def add(self, key, value):
- """add header value"""
- if key not in self.headers:
- self.headers[key] = []
- self.headers[key].append(value)
-
- if self.sent_time:
- self.modified_since_sent = True
-
- def attach(self, attachment, filename=None, ctype=None):
- """
- attach a file
-
- :param attachment: File to attach, given as
- :class:`~alot.db.attachment.Attachment` object or path to a file.
- :type attachment: :class:`~alot.db.attachment.Attachment` or str
- :param filename: filename to use in content-disposition.
- Will be ignored if `path` matches multiple files
- :param ctype: force content-type to be used for this attachment
- :type ctype: str
- """
-
- if isinstance(attachment, Attachment):
- self.attachments.append(attachment)
- elif isinstance(attachment, str):
- path = os.path.expanduser(attachment)
- part = _mimewrap(path, filename, ctype)
- self.attachments.append(Attachment(part))
- else:
- raise TypeError('attach accepts an Attachment or str')
-
- if self.sent_time:
- self.modified_since_sent = True
-
- def construct_mail(self):
- """
- compiles the information contained in this envelope into a
- :class:`email.message.EmailMessage`.
- """
- # make suire everything is 7-bit clean to avoid
- # compatibility problems
- # TODO: consider SMTPUTF8 support?
- policy = email.policy.SMTP.clone(cte_type = '7bit')
-
- # we actually use MIMEPart instead of EmailMessage, to
- # avoid the subparts getting spurious MIME-Version everywhere
- mail = MIMEPart(policy = policy)
- mail.set_content(self.body, subtype = 'plain', charset = 'utf-8')
-
- # add attachments
- for a in self.attachments:
- maintype, _, subtype = a.get_content_type().partition('/')
- fname = a.get_filename()
- data = a.get_data()
- mail.add_attachment(data, filename = fname,
- maintype = maintype, subtype = subtype)
-
- if self.sign:
- to_sign = mail
- plaintext = to_sign.as_bytes()
- logging.debug('signing plaintext: %s', plaintext)
-
- try:
- signatures, signature_blob = crypto.detached_signature_for(
- plaintext, [self.sign_key])
- if len(signatures) != 1:
- raise GPGProblem("Could not sign message (GPGME "
- "did not return a signature)",
- code=GPGCode.KEY_CANNOT_SIGN)
- except gpg.errors.GPGMEError as e:
- if e.getcode() == gpg.errors.BAD_PASSPHRASE:
- # If GPG_AGENT_INFO is unset or empty, the user just does
- # not have gpg-agent running (properly).
- if os.environ.get('GPG_AGENT_INFO', '').strip() == '':
- msg = "Got invalid passphrase and GPG_AGENT_INFO\
- not set. Please set up gpg-agent."
- raise GPGProblem(msg, code=GPGCode.BAD_PASSPHRASE)
- else:
- raise GPGProblem("Bad passphrase. Is gpg-agent "
- "running?",
- code=GPGCode.BAD_PASSPHRASE)
- raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_SIGN)
-
- signature_mime = MIMEPart(policy = to_sign.policy)
- signature_mime.set_content(signature_blob, maintype = 'application',
- subtype = 'pgp-signature')
- signature_mime.set_param('name', 'signature.asc')
- signature_mime['Content-Description'] = 'signature'
-
- # FIXME: this uses private methods, because
- # python's "new" EmailMessage API does not
- # allow arbitrary multipart constructs
- mail = MIMEPart(policy = to_sign.policy)
- mail._make_multipart('signed', (), None)
- mail.set_param('protocol', 'application/pgp-signature')
- mail.set_param('micalg', crypto.RFC3156_micalg_from_algo(signatures[0].hash_algo))
- mail.attach(to_sign)
- mail.attach(signature_mime)
-
- if self.encrypt:
- to_encrypt = mail
- plaintext = to_encrypt.as_bytes()
- logging.debug('encrypting plaintext: %s', plaintext)
-
- try:
- encrypted_blob = crypto.encrypt(
- plaintext, list(self.encrypt_keys.values()))
- except gpg.errors.GPGMEError as e:
- raise GPGProblem(str(e), code=GPGCode.KEY_CANNOT_ENCRYPT)
-
- version_str = b'Version: 1'
- encryption_mime = MIMEPart(policy = to_encrypt.policy)
- encryption_mime.set_content(version_str, maintype = 'application',
- subtype = 'pgp-encrypted')
-
- encrypted_mime = MIMEPart(policy = to_encrypt.policy)
- encrypted_mime.set_content(encrypted_blob, maintype = 'application',
- subtype = 'octet-stream')
-
- mail = MIMEPart(policy = to_encrypt.policy)
- mail._make_multipart('encrypted', (), None)
- mail.set_param('protocol', 'application/pgp-encrypted')
- mail.attach(encryption_mime)
- mail.attach(encrypted_mime)
-
- headers = self.headers.copy()
-
- # add Date header
- if 'Date' not in headers:
- headers['Date'] = [email.utils.formatdate(localtime=True)]
-
- # add Message-ID
- if 'Message-ID' not in headers:
- domain = settings.get('message_id_domain')
- headers['Message-ID'] = [email.utils.make_msgid(domain=domain)]
-
- if 'User-Agent' in headers:
- uastring_format = headers['User-Agent'][0]
- else:
- uastring_format = settings.get('user_agent').strip()
- uastring = uastring_format.format(version=__version__)
- if uastring:
- headers['User-Agent'] = [uastring]
-
- # copy headers from envelope to mail
- for k, vlist in headers.items():
- for v in vlist:
- mail.add_header(k, v)
-
- # as we are using MIMEPart instead of EmailMessage, set the
- # MIME version manually
- del mail['MIME-Version']
- mail['MIME-Version'] = '1.0'
-
- return mail
-
- def parse_template(self, raw, reset=False, only_body=False):
- """parses a template or user edited string to fills this envelope.
-
- :param raw: the string to parse.
- :type raw: str
- :param reset: remove previous envelope content
- :type reset: bool
- :param only_body: do not parse headers
- :type only_body: bool
- """
- logging.debug('GoT: """\n%s\n"""', raw)
-
- if self.sent_time:
- self.modified_since_sent = True
-
- if reset:
- self.headers = {}
-
- headerEndPos = 0
- if not only_body:
- # go through multiline, utf-8 encoded headers
- # locally, lines are separated by a simple LF, not CRLF
- # we decode the edited text ourselves here as
- # email.message_from_file can't deal with raw utf8 header values
- headerRe = re.compile(r'^(?P<k>.+?):(?P<v>(.|\n[ \t\r\f\v])+)$',
- re.MULTILINE)
- for header in headerRe.finditer(raw):
- if header.start() > headerEndPos + 1:
- break # switched to body
-
- key = header.group('k')
- # simple unfolding as decribed in
- # https://tools.ietf.org/html/rfc2822#section-2.2.3
- unfoldedValue = header.group('v').replace('\n', '')
- self.add(key, unfoldedValue.strip())
- headerEndPos = header.end()
-
- # interpret 'Attach' pseudo header
- if 'Attach' in self:
- to_attach = []
- for line in self.get_all('Attach'):
- gpath = os.path.expanduser(line.strip())
- to_attach += [g for g in glob.glob(gpath)
- if os.path.isfile(g)]
- logging.debug('Attaching: %s', to_attach)
- for path in to_attach:
- self.attach(path)
- del self['Attach']
-
- self.body = raw[headerEndPos:].strip()
-
- _MAILTO_PREFIX = 'mailto:'
- _MAILTO_SAFE_HEADERS = ('Subject', 'Cc', 'Keywords')
-
- @classmethod
- def from_mailto(cls, mailto):
- if not mailto.startswith(cls._MAILTO_PREFIX):
- raise ValueError('Invalid mailto string: %s' % mailto)
- mailto = mailto[len(cls._MAILTO_PREFIX):]
-
- headers = {}
- body = ''
-
- to, _, hfields = mailto.partition('?')
- to = unquote(to)
- if to:
- headers['To'] = [to]
-
- for hfield in hfields.split('&'):
- key, _, value = hfield.partition('=')
-
- key = key.capitalize()
- value = unquote(value)
- if not value:
- continue
-
- if key == 'Body':
- body = value
- elif key in cls._MAILTO_SAFE_HEADERS:
- headers[key] = [value]
-
- return cls(headers = headers, bodytext = body)