# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file """ Widgets specific to thread mode """ import logging import re import urwid from .globals import TagWidget from .globals import AttachmentWidget from ..settings.const import settings class MessageSummaryWidget(urwid.WidgetWrap): """ one line summary of a :class:`~alot.db.message.Message`. """ _text = None def __init__(self, message, attr, attr_focus): """ :param message: a message :type message: alot.db.Message """ self._text = self._build_text(message) cols = [urwid.Text(self._text)] if settings.get('msg_summary_hides_threadwide_tags'): thread_tags = message.thread.get_tags(intersection=True) outstanding_tags = set(message.get_tags()).difference(thread_tags) tag_widgets = sorted(TagWidget(t, attr, attr_focus) for t in outstanding_tags) else: tag_widgets = sorted(TagWidget(t, attr, attr_focus) for t in message.get_tags()) for tag_widget in tag_widgets: if not tag_widget.hidden: cols.append(('fixed', tag_widget.width(), tag_widget)) line = urwid.AttrMap(urwid.Columns(cols, dividechars=1), attr, attr_focus) super().__init__(line) def _build_text(self, msg): try: subj = msg.headers['Subject'] except KeyError: subj = '' subj = re.sub(r'\n\s+', r' ', ','.join(subj)) author, address = msg.get_author() rep = '%s: %s' % (author if author != '' else address, subj) date = msg.get_datestring() if date is not None: rep += " (%s)" % date return rep def __str__(self): return self._text def selectable(self): return True def keypress(self, size, key): return key class _MIMEPartWidget(urwid.WidgetWrap): # when not None, a list of child _MIMEPartWidget subclasses _children = None _fold_level = None def __init__(self, body_wgt, children = None): self._children = children self._fold_level = 0 super().__init__(body_wgt) @property def foldlevel(self): return self._fold_level @foldlevel.setter def foldlevel(self, val): if self._children is None: return val_in = max(0, val) val_out = 0 for ch in self._children: ch.foldlevel = val_in val_out = max(val_out, ch.foldlevel) self._fold_level = val_out class _CryptPartWidget(_MIMEPartWidget): def __init__(self, mime_tree, alternative_pref): children = None # handle broken crypto parts where we could not get the payload if (mime_tree.children is None or len(mime_tree.children) != 1): text = 'Invalid "%s" MIME part' % mime_tree.content_type if mime_tree.crypt_error: text += ': ' + mime_tree.crypt_error body_wgt = urwid.Text(text, align = 'center') else: text_parts = [] if mime_tree.is_encrypted and mime_tree.is_signed: desc = 'encrypted+signed' if mime_tree.is_encrypted: desc = 'encrypted' elif mime_tree.is_signed: desc = 'signed' text_parts.append(desc + ' MIME part') if mime_tree.is_signed and mime_tree.sig_valid: if mime_tree.sig_trusted: trust = 'trusted' attr_name = 'crypt_trusted' else: trust = 'untrusted' attr_name = 'crypt_untrusted' t = 'valid %s signature from "%s"' % (trust, mime_tree.signer_id) text_parts.append(t) else: attr_name = 'crypt_unsigned' if mime_tree.crypt_error: text_parts.append('crypto processing error: ' + mime_tree.crypt_error) attr_name = 'crypt_invalid' child = _render_mime_tree(mime_tree.children[0], alternative_pref) attr = settings.get_theming_attribute('thread', attr_name) body_wgt = urwid.AttrMap(urwid.LineBox(child, title = ':'.join(text_parts)), attr) children = [child] super().__init__(body_wgt, children) class _AltMixedPart(_MIMEPartWidget): def __init__(self, children): super().__init__(urwid.Pile(children), children) class _TextBlock(urwid.WidgetWrap): _level = None _switch_wgt = None _text_wgt = None _folded_wgt = None def __init__(self, level, lines, attr, fold_context): self._level = level self._text_wgt = urwid.Text((attr, '\n'.join(lines))) context_start, context_end = fold_context if level == 0 or len(lines) <= context_start + context_end + 1: # block is too small for folding self._folded_wgt = self._text_wgt else: folded_lines = lines[:context_start] folded_lines.append('%s [...%d lines folded...]' % ('> ' * level, len(lines) - context_start - context_end)) folded_lines.extend(lines[-context_end:]) self._folded_wgt = urwid.Text((attr, '\n'.join(folded_lines))) self._switch_wgt = urwid.WidgetPlaceholder(self._text_wgt) super().__init__(self._switch_wgt) def set_foldlevel(self, level): wgt = self._text_wgt if level >= self._level else self._folded_wgt self._switch_wgt.original_widget = wgt class _TextPart(_MIMEPartWidget): _QUOTE_CHARS = '>|:}#' _QUOTE_REGEX = '(([ \t]*[{quote_chars}])+)'.format(quote_chars = _QUOTE_CHARS) _max_level = None _block_wgts = None def __init__(self, text): attr_text = settings.get_theming_attribute('thread', 'body') attrs_quote = settings.get_quote_theming() fold_context = settings.get('thread_fold_context') blocks = self._split_quotes(text) max_level = max((b['level'] for b in blocks)) block_wgts = [] for b in blocks: level = b['level'] lines = b['data'] if level == 0 or len(attrs_quote) < 1: attr = attr_text else: attr = attrs_quote[(level - 1) % len(attrs_quote)] wgt = _TextBlock(level, lines, attr, fold_context) block_wgts.append(wgt) self._block_wgts = block_wgts self._max_level = max_level super().__init__(urwid.Pile(block_wgts)) def _split_quotes(self, text): """ Split the text into blocks of quote levels. """ lines = text.splitlines() # first pass: assign a level to each line based on the number of quote # characters at the beginning blocks = [] for line in lines: level = 0 m = re.match(self._QUOTE_REGEX, line) if m is not None: quote_prefix = m.group(0) remainder = line[len(quote_prefix):] for c in self._QUOTE_CHARS: level += quote_prefix.count(c) else: quote_prefix = '' remainder = line if len(blocks) == 0 or blocks[-1]['level'] != level: # start a new block blocks.append({ 'level' : level, 'data' : [], 'data_empty' : True}) blocks[-1]['data'].append(line) if len(remainder) > 0: blocks[-1]['data_empty'] = False # second pass: every quoted block where the remainder (line minus the # leading quote characters) is empty gets merged into an adjacent quoted # block, higher level preferred for idx, b in enumerate(blocks): if b['level'] > 0 and b['data_empty']: b_prev = blocks[idx - 1] if idx > 0 else None b_next = blocks[idx + 1] if idx + 1 < len(blocks) else None merge_level = None if (b_prev is not None and not b_prev['data_empty'] and b_prev['level'] > 0): merge_level = b_prev['level'] if (b_next is not None and not b_next['data_empty'] and (merge_level is None or merge_level < b_next['level'])): merge_level = b_next['level'] if merge_level is not None: b['level'] = merge_level return blocks @property def foldlevel(self): return self._fold_level @foldlevel.setter def foldlevel(self, val): val = max(0, min(self._max_level, val)) logging.info('settting foldlevel to %d', val) for b in self._block_wgts: b.set_foldlevel(val) self._fold_level = val class _EmptyMessageWidget(_MIMEPartWidget): def __init__(self): body_wgt = urwid.Text('<>>', align = 'center') super().__init__(body_wgt) def _handle_mixed(mime_tree, alternative_pref): children = [] for child in mime_tree.children: ch = _render_mime_tree(child, alternative_pref) if ch is not None: children.append(ch) if len(children) > 0: return _AltMixedPart(children) def _handle_alternative(mime_tree, alternative_pref): # TODO: switching between alternatives child = None for ch in mime_tree.children: if ch.content_type == alternative_pref: child = ch break if child is None: child = mime_tree.children[0] return _render_mime_tree(child, alternative_pref) def _render_mime_tree(mime_tree, alternative_pref): # handle encrypted/signed parts if mime_tree.is_signed or mime_tree.is_encrypted: return _CryptPartWidget(mime_tree, alternative_pref) if mime_tree.children is not None: # multipart MIME parts if mime_tree.is_alternative: return _handle_alternative(mime_tree, alternative_pref) return _handle_mixed(mime_tree, alternative_pref) # no children - this is a leaf node # skip attachment parts if mime_tree.attachment: return None # try rendering the message text = mime_tree.render_str() if text is not None: return _TextPart(text) body_wgt = urwid.Text('Undisplayable "%s" MIME part.' % mime_tree.content_type, align = 'center') return _MIMEPartWidget(body_wgt) class HeadersWidget(urwid.WidgetWrap): """ A flow widget displaying message headers. """ _key_attr = None _value_attr = None _gaps_attr = None def __init__(self, key_attr, value_attr, gaps_attr): """ :param headerslist: list of key/value pairs to display :type headerslist: list of (str, str) :param key_attr: theming attribute to use for keys :type key_attr: urwid.AttrSpec :param value_attr: theming attribute to use for values :type value_attr: urwid.AttrSpec :param gaps_attr: theming attribute to wrap lines in :type gaps_attr: urwid.AttrSpec """ self._key_attr = key_attr self._value_attr = value_attr self._gaps_attr = gaps_attr super().__init__(urwid.Pile([])) def set_headers(self, headers): if len(headers) == 0: self._w.contents.clear() return max_key_len = max(map(lambda x: len(x[0]), headers)) widgets = [] for key, value in headers: # TODO even/odd keyw = ('fixed', max_key_len + 1, urwid.Text((self._key_attr, key))) valuew = urwid.Text((self._value_attr, value)) linew = urwid.AttrMap(urwid.Columns([keyw, valuew]), self._gaps_attr) widgets.append(linew) self._w.contents = [(w, ('pack', None)) for w in widgets] def _make_ascii_trans(): trans = { 0x09 : ' ' * 8, # tab 0x0b : '<\\v>\n', # vertical tab 0x0c : '<\\f>\n', # form feed 0x0d : '', # carriage return } for i in range(256): if (i >= 0x20 and i <= 0x7e) or i == 0x0a: # printables and \n trans[i] = chr(i) elif not i in trans: trans[i] = r'<\x%x>' % i return trans _ascii_trans = _make_ascii_trans() def _ascii_to_printable(data): """ Prepare ASCII data for printing. :param data: input data :type data: bytes :returns: Data suitable for printing. It will contain only printable characters (U+0020-U+007E) and a newline (U+000A). :rtype: str """ return data.decode('latin1').translate(_ascii_trans) class _RawMessageWidget(urwid.WidgetWrap): """ A flow widget displaying the "raw" message. """ def __init__(self, msg_bytes): sourcetxt = _ascii_to_printable(msg_bytes) super().__init__(urwid.Text(sourcetxt)) class MessageWidget(urwid.WidgetWrap): """ A flow widget displaying the contents of a single :class:`alot.db.Message`. Collapsing this message corresponds to showing the summary only. """ _display_all_headers = None _display_source = None _headers_wgt = None _source_wgt = None _body_wgt = None _attach_wgt = None def __init__(self, message): """ :param message: Message to display :type message: alot.db.Message """ self._message = message if settings.get('prefer_plaintext'): alternative_pref = 'text/plain' else: alternative_pref = 'text/html' self._headers_wgt = self._get_headers() self._source_wgt = _RawMessageWidget(message.as_bytes()) self._attach_wgt = self._get_attachments() self._body_wgt = _render_mime_tree(message.body, alternative_pref) if self._body_wgt is None: self._body_wgt = _EmptyMessageWidget() super().__init__(urwid.ListBox(urwid.SimpleListWalker([]))) self.display_all_headers = False self.display_source = False self.foldlevel = settings.get('thread_fold_initial_level') def get_message(self): return self._message def _get_headers(self): key_attr = settings.get_theming_attribute('thread', 'header_key') value_attr = settings.get_theming_attribute('thread', 'header_value') gaps_attr = settings.get_theming_attribute('thread', 'header') return HeadersWidget(key_attr, value_attr, gaps_attr) def _get_attachments(self): alist = [] for a in self._message.iter_attachments(): alist.append(AttachmentWidget(a)) if alist: return urwid.Pile(alist) return None @property def display_all_headers(self): return self._display_all_headers @display_all_headers.setter def display_all_headers(self, val): val = bool(val) if val == self._display_all_headers: return headers = self._message.headers lines = [] if val: # collect all header/value pairs in the order they appear lines = list(headers.items()) else: # only a selection of headers should be displayed. # use order of the `headers` parameter for key in settings.get('displayed_headers'): if key in headers: if key.lower() in ['cc', 'bcc', 'to']: lines.append((key, ', '.join(headers[key]))) else: for value in headers[key]: lines.append((key, value)) elif key.lower() == 'tags': # TODO this should be in a separate widget logging.debug('want tags header') values = [] for t in self._message.get_tags(): tagrep = settings.get_tagstring_representation(t) if t is not tagrep['translated']: t = '%s (%s)' % (tagrep['translated'], t) values.append(t) lines.append((key, ', '.join(values))) self._headers_wgt.set_headers(lines) self._display_all_headers = val @property def display_source(self): return self._display_source @display_source.setter def display_source(self, val): val = bool(val) if val == self._display_source: return widgets = [] if val: widgets.append(self._source_wgt) else: widgets.append(self._headers_wgt) if self._attach_wgt is not None: widgets.append(self._attach_wgt) widgets.append(self._body_wgt) self._w.body[:] = widgets self._display_source = val @property def foldlevel(self): return self._body_wgt.foldlevel @foldlevel.setter def foldlevel(self, val): self._body_wgt.foldlevel = val def get_selected_attachment(self): """ If an AttachmentWidget is currently focused, return it. Otherwise return None. """ if self._w.focus is self._attach_wgt: return self._attach_wgt.focus.attachment return None class ThreadNode(urwid.WidgetWrap): _ARROW = '➤' _HBAR = '─' _VBAR = '│' _TNODE = '├' _CORNER = '└' _decor_text = None _have_next_sibling = None def __init__(self, msg, thread, pos, indent): if pos & 1: attr = settings.get_theming_attribute('thread', 'summary', 'odd') else: attr = settings.get_theming_attribute('thread', 'summary', 'even') attr_focus = settings.get_theming_attribute('thread', 'summary', 'focus') msg_summary = MessageSummaryWidget(msg, attr, attr_focus) if msg.depth == 0: wgt = msg_summary else: self._decor_text = urwid.Text('') wgt = urwid.Columns([('pack', self._decor_text), msg_summary]) ancestor_chain = [msg] for p in msg.parents(): ancestor_chain.insert(0, p) have_sibling = [] for d, m in enumerate(ancestor_chain): if d == 0: have_sibling.append(m != thread.toplevel_messages[-1]) else: have_sibling.append(m != ancestor_chain[d - 1].replies[-1]) self._have_next_sibling = have_sibling super().__init__(wgt) self.set_indent(indent) def set_indent(self, indent): if self._decor_text is None: return seg_text = [] if indent > 0: depth = len(self._have_next_sibling) - 1 for d in range(depth): if d == depth - 1: corner = self._TNODE if self._have_next_sibling[d + 1] else self._CORNER seg_text.append(corner + self._ARROW) else: bar = self._VBAR if self._have_next_sibling[d + 1] else ' ' seg_text.append(bar + ' ') self._decor_text.set_text(''.join(seg_text))