# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file """ Widgets specific to thread mode """ import logging import re import urwid from .globals import TagWidget from .globals import AttachmentWidget from ..settings.const import settings class MessageSummaryWidget(urwid.WidgetWrap): """ one line summary of a :class:`~alot.db.message.Message`. """ def __init__(self, message, attr, attr_focus): """ :param message: a message :type message: alot.db.Message """ try: subj = message.headers['Subject'] except KeyError: subj = '' subj = re.sub(r'\n\s+', r' ', ','.join(subj)) author, address = message.get_author() text = '%s: %s' % (author if author != '' else address, subj) date = message.get_datestring() if date is not None: text += " (%s)" % date cols = [urwid.Text(text)] if settings.get('msg_summary_hides_threadwide_tags'): thread_tags = message.thread.get_tags(intersection=True) outstanding_tags = set(message.get_tags()).difference(thread_tags) tag_widgets = sorted(TagWidget(t, attr, attr_focus) for t in outstanding_tags) else: tag_widgets = sorted(TagWidget(t, attr, attr_focus) for t in message.get_tags()) for tag_widget in tag_widgets: if not tag_widget.hidden: cols.append(('fixed', tag_widget.width(), tag_widget)) line = urwid.AttrMap(urwid.Columns(cols, dividechars=1), attr, attr_focus) super().__init__(line) class _MIMEPartWidget(urwid.WidgetWrap): # when not None, a list of child _MIMEPartWidget subclasses _children = None _fold_level = None def __init__(self, body_wgt, children = None): self._children = children self._fold_level = 0 super().__init__(body_wgt) @property def foldlevel(self): return self._fold_level @foldlevel.setter def foldlevel(self, val): if self._children is None: return val_in = max(0, val) val_out = 0 for ch in self._children: ch.foldlevel = val_in val_out = max(val_out, ch.foldlevel) self._fold_level = val_out def cycle_alt(self): if self._children is None: return for c in self._children: c.cycle_alt() class _CryptPartWidget(_MIMEPartWidget): def __init__(self, mime_tree, alternative_pref): children = None # handle broken crypto parts where we could not get the payload if (mime_tree.children is None or len(mime_tree.children) != 1): text = 'Invalid "%s" MIME part' % mime_tree.content_type if mime_tree.crypt_error: text += ': ' + mime_tree.crypt_error body_wgt = urwid.Text(text, align = 'center') else: text_parts = [] if mime_tree.is_encrypted and mime_tree.is_signed: desc = 'encrypted+signed' if mime_tree.is_encrypted: desc = 'encrypted' elif mime_tree.is_signed: desc = 'signed' text_parts.append(desc + ' MIME part') if mime_tree.is_signed and mime_tree.sig_valid: if mime_tree.sig_trusted: trust = 'trusted' attr_name = 'crypt_trusted' else: trust = 'untrusted' attr_name = 'crypt_untrusted' t = 'valid %s signature from "%s"' % (trust, mime_tree.signer_id) text_parts.append(t) else: attr_name = 'crypt_unsigned' if mime_tree.crypt_error: text_parts.append('crypto processing error: ' + mime_tree.crypt_error) attr_name = 'crypt_invalid' child = _render_mime_tree(mime_tree.children[0], alternative_pref) \ or _EmptyPartWidget() attr = settings.get_theming_attribute('thread', attr_name) body_wgt = urwid.AttrMap(urwid.LineBox(child, title = ':'.join(text_parts)), attr) children = [child] super().__init__(body_wgt, children) class _MultiMixedWidget(_MIMEPartWidget): def __init__(self, children): super().__init__(urwid.Pile(children), children) class _MultiAltWidget(_MIMEPartWidget): _cur_idx = None _child_parts = None _sel_placeholder = None def __init__(self, child_widgets, child_parts, alternative_pref): self._child_parts = child_parts child_idx = None for idx, ch in enumerate(child_parts): if ch.content_type == alternative_pref: child_idx = idx break if child_idx is None: child_idx = 0 self._sel_placeholder = urwid.WidgetPlaceholder(urwid.Text('')) body_wgt = urwid.LineBox(self._sel_placeholder) super().__init__(body_wgt, child_widgets) self._select_alt(child_idx) def _select_alt(self, idx): child_part = self._child_parts[idx] child_wgt = self._children[idx] self._w.set_title('Alternative MIME part %d/%d: %s' % (idx + 1, len(self._children), child_part.content_type)) self._sel_placeholder.original_widget = child_wgt self._cur_idx = idx def cycle_alt(self): self._select_alt((self._cur_idx + 1) % len(self._children)) class _Fold: level = None start = None end = None children = None def __init__(self, level, start, end): self.children = [] self.level = level self.start = start self.end = end def fold(self, lines, level, context): ret = [] if level == self.level: # this level is the lowest folded one # display only the context ctx_start, ctx_end = context length = self.end - self.start if length <= ctx_start + ctx_end + 1: ret.extend(lines[self.start:self.end]) else: ret.extend(lines[self.start:self.start + ctx_start]) attr = ret[-1][0] ret.append((attr, '%s [...%d lines folded...]\n' % ('> ' * level, length - ctx_start - ctx_end))) ret.extend(lines[self.end - ctx_end:self.end]) elif level > self.level: # this level is not folded, display as-is # but we still need to fold the children individually # start by adding lines before the first child, or all # lines when there are no children end = self.children[0].start if self.children else self.end ret.extend(lines[self.start:end]) for i, c in enumerate(self.children): # fold the child recursively ret.extend(c.fold(lines, level, context)) # add lines inbetween this child's end and next child's start # or our end if this is the last child next_start = self.children[i + 1].start if (i + 1 < len(self.children)) \ else self.end ret.extend(lines[c.end:next_start]) return ret def __repr__(self): return 'Fold(%d, %d, %d, %s)' % \ (self.level, self.start, self.end, self.children) class _TextPart(_MIMEPartWidget): _QUOTE_CHARS = '>|:}#' _QUOTE_REGEX = '(([ \t]*[{quote_chars}])+)'.format(quote_chars = _QUOTE_CHARS) _max_level = None _lines = None _fold = None def __init__(self, text): attr_text = settings.get_theming_attribute('thread', 'body') attrs_quote = settings.get_quote_theming() self._fold_context = settings.get('thread_fold_context') self._lines, self._fold, self._max_level = \ self._parse_quotes(text, attr_text, attrs_quote) super().__init__(urwid.Text('')) def _parse_quotes(self, text, attr_text, attrs_quote): """ Parse quotes in email body, generate folds and theming from them. """ # first pass: assign a level to each line based on the number of quote # characters at the beginning lines = text.splitlines(keepends = True) blocks = [] for i, line in enumerate(lines): level = 0 m = re.match(self._QUOTE_REGEX, line) if m is not None: quote_prefix = m.group(0) remainder = line[len(quote_prefix):] for c in self._QUOTE_CHARS: level += quote_prefix.count(c) else: quote_prefix = '' remainder = line if len(blocks) == 0 or blocks[-1]['level'] != level: # start a new block blocks.append({ 'level' : level, 'start' : i, 'end' : i, 'data_empty' : True}) blocks[-1]['end'] += 1 if len(remainder) > 0: blocks[-1]['data_empty'] = False # second pass: every quoted block where the remainder (line minus the # leading quote characters) is empty gets merged into an adjacent quoted # block, higher level preferred for idx, b in enumerate(blocks): if b['level'] > 0 and b['data_empty']: b_prev = blocks[idx - 1] if idx > 0 else None b_next = blocks[idx + 1] if idx + 1 < len(blocks) else None merge_level = None if (b_prev is not None and not b_prev['data_empty'] and b_prev['level'] > 0): merge_level = b_prev['level'] if (b_next is not None and not b_next['data_empty'] and (merge_level is None or merge_level < b_next['level'])): merge_level = b_next['level'] if merge_level is not None: b['level'] = merge_level fold_stack = [] cur_fold = _Fold(0, 0, None) max_level = 0 for b in blocks: max_level = max(max_level, b['level']) if b['level'] > cur_fold.level: fold_stack.append(cur_fold) cur_fold = _Fold(b['level'], b['start'], b['end']) elif b['level'] == cur_fold.level: cur_fold.end = b['end'] else: child = cur_fold cur_fold = fold_stack.pop() cur_fold.end = b['end'] cur_fold.children.append(child) while fold_stack: fold_stack[-1].children.append(cur_fold) cur_fold = fold_stack.pop() assert(cur_fold.level == 0) cur_fold.end = len(lines) def color_lines(fold, lines, colored_lines): if fold.level == 0 or len(attrs_quote) < 1: attr = attr_text else: attr = attrs_quote[(fold.level - 1) % len(attrs_quote)] colored_lines[fold.start:fold.end] = [(attr, line) for line in lines[fold.start:fold.end]] for c in fold.children: color_lines(c, lines, colored_lines) colored_lines = lines[:] color_lines(cur_fold, lines, colored_lines) return colored_lines, cur_fold, max_level @property def foldlevel(self): return self._fold_level @foldlevel.setter def foldlevel(self, val): val = max(1, min(self._max_level + 1, val)) logging.info('settting foldlevel to %d', val) markup_lines = self._fold.fold(self._lines, val, self._fold_context) if len(markup_lines) == 0: markup_lines = [''] self._w = urwid.Text(markup_lines) self._fold_level = val class _EmptyPartWidget(_MIMEPartWidget): def __init__(self): body_wgt = urwid.Text('<<< No displayable content >>>', align = 'center') super().__init__(body_wgt) def _handle_multipart(mime_tree, alternative_pref): children = [] for child in mime_tree.children: ch = _render_mime_tree(child, alternative_pref) if ch is not None and not isinstance(ch, _EmptyPartWidget): children.append(ch) if len(children) > 0: # there exist some intelligence-challenged systems in the wild producing # multipart/alternative with just a single alternative - treat those as # mixed and so avoid displaying pointless decorations if mime_tree.is_alternative and len(children) > 1: return _MultiAltWidget(children, mime_tree.children, alternative_pref) return _MultiMixedWidget(children) def _render_mime_tree(mime_tree, alternative_pref): # handle encrypted/signed parts if mime_tree.is_signed or mime_tree.is_encrypted: return _CryptPartWidget(mime_tree, alternative_pref) if mime_tree.children is not None: # multipart MIME parts return _handle_multipart(mime_tree, alternative_pref) # no children - this is a leaf node # skip attachment parts if mime_tree.attachment: return None # try rendering the message text = mime_tree.render_str() if text is not None: return _TextPart(text) if len(text) else _EmptyPartWidget() body_wgt = urwid.Text('Undisplayable "%s" MIME part.' % mime_tree.content_type, align = 'center') return _MIMEPartWidget(body_wgt) class HeadersWidget(urwid.WidgetWrap): """ A flow widget displaying message headers. """ _key_attr = None _value_attr = None _gaps_attr = None def __init__(self, key_attr, value_attr, gaps_attr): """ :param headerslist: list of key/value pairs to display :type headerslist: list of (str, str) :param key_attr: theming attribute to use for keys :type key_attr: urwid.AttrSpec :param value_attr: theming attribute to use for values :type value_attr: urwid.AttrSpec :param gaps_attr: theming attribute to wrap lines in :type gaps_attr: urwid.AttrSpec """ self._key_attr = key_attr self._value_attr = value_attr self._gaps_attr = gaps_attr super().__init__(urwid.Pile([])) def set_headers(self, headers): if len(headers) == 0: self._w.contents.clear() return max_key_len = max(map(lambda x: len(x[0]), headers)) widgets = [] for key, value in headers: # TODO even/odd keyw = ('fixed', max_key_len + 1, urwid.Text((self._key_attr, key))) valuew = urwid.Text((self._value_attr, value)) linew = urwid.AttrMap(urwid.Columns([keyw, valuew]), self._gaps_attr) widgets.append(linew) self._w.contents = [(w, ('pack', None)) for w in widgets] def _make_ascii_trans(): trans = { 0x09 : ' ' * 8, # tab 0x0b : '<\\v>\n', # vertical tab 0x0c : '<\\f>\n', # form feed 0x0d : '', # carriage return } for i in range(256): if (i >= 0x20 and i <= 0x7e) or i == 0x0a: # printables and \n trans[i] = chr(i) elif not i in trans: trans[i] = r'<\x%x>' % i return trans _ascii_trans = _make_ascii_trans() def _ascii_to_printable(data): """ Prepare ASCII data for printing. :param data: input data :type data: bytes :returns: Data suitable for printing. It will contain only printable characters (U+0020-U+007E) and a newline (U+000A). :rtype: str """ return data.decode('latin1').translate(_ascii_trans) class _RawMessageWidget(urwid.WidgetWrap): """ A flow widget displaying the "raw" message. """ def __init__(self, msg_bytes): sourcetxt = _ascii_to_printable(msg_bytes) super().__init__(urwid.Text(sourcetxt)) class MessageWidget(urwid.WidgetWrap): """ A flow widget displaying the contents of a single :class:`alot.db.Message`. """ _display_all_headers = None _display_source = None _headers_wgt = None _source_wgt = None _body_wgt = None _attach_wgt = None def __init__(self, message): """ :param message: Message to display :type message: alot.db.Message """ self._message = message if settings.get('prefer_plaintext'): alternative_pref = 'text/plain' else: alternative_pref = 'text/html' self._headers_wgt = self._get_headers() self._source_wgt = _RawMessageWidget(message.as_bytes()) self._attach_wgt = self._get_attachments() self._body_wgt = _render_mime_tree(message.body, alternative_pref) if self._body_wgt is None: self._body_wgt = _EmptyPartWidget() super().__init__(urwid.ListBox(urwid.SimpleListWalker([]))) self.display_all_headers = False self.display_source = False self.foldlevel = settings.get('thread_fold_initial_level') def get_message(self): return self._message def _get_headers(self): key_attr = settings.get_theming_attribute('thread', 'header_key') value_attr = settings.get_theming_attribute('thread', 'header_value') gaps_attr = settings.get_theming_attribute('thread', 'header') return HeadersWidget(key_attr, value_attr, gaps_attr) def _get_attachments(self): alist = [] for a in self._message.iter_attachments(): alist.append(AttachmentWidget(a)) if alist: return urwid.Pile(alist) return None @property def display_all_headers(self): return self._display_all_headers @display_all_headers.setter def display_all_headers(self, val): val = bool(val) if val == self._display_all_headers: return headers = self._message.headers lines = [] if val: # collect all header/value pairs in the order they appear lines = list(headers.items()) else: # only a selection of headers should be displayed. # use order of the `headers` parameter for key in settings.get('displayed_headers'): if key in headers: if key.lower() in ['cc', 'bcc', 'to']: lines.append((key, ', '.join(headers[key]))) else: for value in headers[key]: lines.append((key, value)) elif key.lower() == 'tags': # TODO this should be in a separate widget logging.debug('want tags header') values = [] for t in self._message.get_tags(): tagrep = settings.get_tagstring_representation(t) if t is not tagrep['translated']: t = '%s (%s)' % (tagrep['translated'], t) values.append(t) lines.append((key, ', '.join(values))) self._headers_wgt.set_headers(lines) self._display_all_headers = val @property def display_source(self): return self._display_source @display_source.setter def display_source(self, val): val = bool(val) if val == self._display_source: return widgets = [] if val: widgets.append(self._source_wgt) else: widgets.append(self._headers_wgt) if self._attach_wgt is not None: widgets.append(self._attach_wgt) widgets.append(self._body_wgt) self._w.body[:] = widgets self._display_source = val @property def foldlevel(self): return self._body_wgt.foldlevel @foldlevel.setter def foldlevel(self, val): self._body_wgt.foldlevel = val def cycle_alt(self): self._body_wgt.cycle_alt() def get_selected_attachment(self): """ If an AttachmentWidget is currently focused, return it. Otherwise return None. """ if self._w.focus is self._attach_wgt: return self._attach_wgt.focus.attachment return None class ThreadNode(urwid.WidgetWrap): _ARROW = '➤' _HBAR = '─' _VBAR = '│' _TNODE = '├' _CORNER = '└' _decor_text = None _have_next_sibling = None def __init__(self, msg, thread, pos, indent): parity = 'odd' if pos & 1 else 'even' attr = settings.get_theming_attribute('thread', 'summary', parity) attr_focus = settings.get_theming_attribute('thread', 'summary', 'focus') msg_summary = MessageSummaryWidget(msg, attr, attr_focus) if msg.depth == 0: wgt = msg_summary else: self._decor_text = urwid.Text('') wgt = urwid.Columns([('pack', self._decor_text), msg_summary]) ancestor_chain = [msg] for p in msg.parents(): ancestor_chain.insert(0, p) have_sibling = [] for d, m in enumerate(ancestor_chain): if d == 0: have_sibling.append(m != thread.toplevel_messages[-1]) else: have_sibling.append(m != ancestor_chain[d - 1].replies[-1]) self._have_next_sibling = have_sibling super().__init__(wgt) self.set_indent(indent) def set_indent(self, indent): if self._decor_text is None: return seg_text = [] if indent > 0: depth = len(self._have_next_sibling) - 1 for d in range(depth): if d == depth - 1: corner = self._TNODE if self._have_next_sibling[d + 1] else self._CORNER seg_text.append(corner + self._ARROW) else: bar = self._VBAR if self._have_next_sibling[d + 1] else ' ' seg_text.append(bar + ' ') self._decor_text.set_text(''.join(seg_text))