# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file """ Widgets specific to thread mode """ import logging import re import urwid from urwid.canvas import SolidCanvas, TextCanvas from urwid.util import apply_target_encoding try: import pygments.lexers, pygments.formatters, pygments.util have_pygments = True except ImportError: have_pygments = False from .globals import TagWidget from .globals import AttachmentWidget from ..settings.const import settings from ..utils import ansi_term class MessageSummaryWidget(urwid.WidgetWrap): """ one line summary of a :class:`~alot.db.message.Message`. """ def __init__(self, message, attr, attr_focus): """ :param message: a message :type message: alot.db.Message """ author, address = message.get_author() text = author if author else address subj = self._extract_subj(message) if subj: text += ': ' + subj date = message.get_datestring() if date is not None: text += " (%s)" % date text = text.translate(settings.sanitize_header_table) cols = [urwid.Text(text)] if settings.get('msg_summary_hides_threadwide_tags'): thread_tags = message.thread.get_tags(intersection=True) outstanding_tags = set(message.get_tags()).difference(thread_tags) tag_widgets = sorted(TagWidget(t, attr, attr_focus) for t in outstanding_tags) else: tag_widgets = sorted(TagWidget(t, attr, attr_focus) for t in message.get_tags()) for tag_widget in tag_widgets: if not tag_widget.hidden: cols.append(('fixed', tag_widget.width(), tag_widget)) line = urwid.AttrMap(urwid.Columns(cols, dividechars=1), attr, attr_focus) super().__init__(line) def _extract_subj(self, msg): subj_style = settings.get('thread_message_summary_subject') if subj_style == 'no': return '' try: subj = ','.join(msg.headers['Subject']) except KeyError: return '' # for "smart" display style, check if normalized subject # is the same as parent's if subj_style == 'smart' and msg.parent is not None: try: parent_subj = ','.join(msg.parent.headers['Subject']) except KeyError: pass else: def normalize_subj(subj): # normalize subject for comparison - replace all # whitespace sequences with a single space and strip # leading re:'s subj = re.sub(r'\s+', ' ', subj) return re.sub(r'(re:? *)*', '', subj, flags = re.IGNORECASE) if normalize_subj(subj) == normalize_subj(parent_subj): return '' return re.sub(r'\n\s+', r' ', subj) class _MIMEPartWidget(urwid.WidgetWrap): # when not None, a list of child _MIMEPartWidget subclasses _children = None _fold_level = None def __init__(self, body_wgt, children = None): self._children = children self._fold_level = 0 super().__init__(body_wgt) @property def foldlevel(self): return self._fold_level @foldlevel.setter def foldlevel(self, val): if self._children is None: return val_in = max(0, val) val_out = 0 for ch in self._children: ch.foldlevel = val_in val_out = max(val_out, ch.foldlevel) self._fold_level = val_out def cycle_alt(self): if self._children is None: return for c in self._children: c.cycle_alt() class _CryptPartWidget(_MIMEPartWidget): def __init__(self, mime_tree, alternative_pref, force_inline_types): children = None # handle broken crypto parts where we could not get the payload if (mime_tree.children is None or len(mime_tree.children) != 1): text = 'Invalid "%s" MIME part' % mime_tree.content_type if mime_tree.crypt_error: text += ': ' + mime_tree.crypt_error body_wgt = urwid.Text(text, align = 'center') else: text_parts = [] if mime_tree.is_encrypted and mime_tree.is_signed: desc = 'encrypted+signed' if mime_tree.is_encrypted: desc = 'encrypted' elif mime_tree.is_signed: desc = 'signed' text_parts.append(desc + ' MIME part') if mime_tree.is_signed and mime_tree.sig_valid: if mime_tree.sig_trusted: trust = 'trusted' attr_name = 'crypt_trusted' else: trust = 'untrusted' attr_name = 'crypt_untrusted' t = 'valid %s signature from "%s"' % (trust, mime_tree.signer_id) text_parts.append(t) else: attr_name = 'crypt_unsigned' if mime_tree.crypt_error: text_parts.append('crypto processing error: ' + mime_tree.crypt_error) attr_name = 'crypt_invalid' child = _render_mime_tree(mime_tree.children[0], alternative_pref, force_inline_types) \ or _EmptyPartWidget() attr = settings.get_theming_attribute('thread', attr_name) body_wgt = urwid.AttrMap(urwid.LineBox(child, title = ':'.join(text_parts)), attr) children = [child] super().__init__(body_wgt, children) class _MultiMixedWidget(_MIMEPartWidget): def __init__(self, children): super().__init__(urwid.Pile(children), children) class _MultiAltWidget(_MIMEPartWidget): _cur_idx = None _child_parts = None _sel_placeholder = None def __init__(self, child_widgets, child_parts, alternative_pref): self._child_parts = child_parts child_idx = None for idx, ch in enumerate(child_parts): if ch.content_type == alternative_pref: child_idx = idx break if child_idx is None: child_idx = 0 self._sel_placeholder = urwid.WidgetPlaceholder(urwid.Text('')) body_wgt = urwid.LineBox(self._sel_placeholder) super().__init__(body_wgt, child_widgets) self._select_alt(child_idx) def _select_alt(self, idx): child_part = self._child_parts[idx] child_wgt = self._children[idx] self._w.set_title('Alternative MIME part %d/%d: %s' % (idx + 1, len(self._children), child_part.content_type)) self._sel_placeholder.original_widget = child_wgt self._cur_idx = idx def cycle_alt(self): self._select_alt((self._cur_idx + 1) % len(self._children)) class _Fold: level = None start = None end = None children = None def __init__(self, level, start, end): self.children = [] self.level = level self.start = start self.end = end def fold(self, lines, level, context): ret = [] if level == self.level: # this level is the lowest folded one # display only the context ctx_start, ctx_end = context length = self.end - self.start if length <= ctx_start + ctx_end + 1: ret.extend(lines[self.start:self.end]) else: ret.extend(lines[self.start:self.start + ctx_start]) attr = ret[-1][0] ret.append((attr, '%s [...%d lines folded...]\n' % ('> ' * level, length - ctx_start - ctx_end))) ret.extend(lines[self.end - ctx_end:self.end]) elif level > self.level: # this level is not folded, display as-is # but we still need to fold the children individually # start by adding lines before the first child, or all # lines when there are no children end = self.children[0].start if self.children else self.end ret.extend(lines[self.start:end]) for i, c in enumerate(self.children): # fold the child recursively ret.extend(c.fold(lines, level, context)) # add lines inbetween this child's end and next child's start # or our end if this is the last child next_start = self.children[i + 1].start if (i + 1 < len(self.children)) \ else self.end ret.extend(lines[c.end:next_start]) return ret def __repr__(self): return 'Fold(%d, %d, %d, %s)' % \ (self.level, self.start, self.end, self.children) class _TextPart(_MIMEPartWidget): _QUOTE_CHARS = '>|:}#' _QUOTE_REGEX = '(([ \t]*[{quote_chars}])+)'.format(quote_chars = _QUOTE_CHARS) _body_placeholder = None _max_level = None _lines = None _fold = None def __init__(self, text, part): attr_text = settings.get_theming_attribute('thread', 'body') attrs_quote = settings.get_quote_theming() self._fold_context = settings.get('thread_fold_context') text = text.translate(settings.sanitize_text_table) # try highlighting with pygments first hilit = self._highlight_pygments(text, part, attr_text) if hilit is None: # fallback - process as a normal text mail body: # detect quoted blocks hilit = self._parse_quotes(text, attr_text, attrs_quote) self._lines, self._fold, self._max_level = hilit self._body_placeholder = urwid.WidgetPlaceholder(urwid.Text('')) # decorate inline-forced attachments with a linebox if part.attachment: title = 'attachment %s' % part.content_type fname = part.attachment.filename if fname: title += ': %s' % fname wgt = urwid.LineBox(self._body_placeholder, title = title) else: wgt = self._body_placeholder super().__init__(wgt) def _parse_quotes(self, text, attr_text, attrs_quote): """ Parse quotes in email body, generate folds and theming from them. """ # first pass: assign a level to each line based on the number of quote # characters at the beginning lines = text.splitlines(keepends = True) blocks = [] for i, line in enumerate(lines): level = 0 m = re.match(self._QUOTE_REGEX, line) if m is not None: quote_prefix = m.group(0) remainder = line[len(quote_prefix):] for c in self._QUOTE_CHARS: level += quote_prefix.count(c) else: quote_prefix = '' remainder = line if len(blocks) == 0 or blocks[-1]['level'] != level: # start a new block blocks.append({ 'level' : level, 'start' : i, 'end' : i, 'data_empty' : True}) blocks[-1]['end'] += 1 if blocks[-1]['data_empty'] and len(remainder.strip()) > 0: blocks[-1]['data_empty'] = False # second pass: every block where the remainder (line minus the # leading quote characters) is empty gets merged into an adjacent quoted # block, higher level preferred for idx, b in enumerate(blocks): if b['data_empty']: b_prev = blocks[idx - 1] if idx > 0 else None b_next = blocks[idx + 1] if idx + 1 < len(blocks) else None merge_level = None if (b_prev is not None and not b_prev['data_empty'] and b_prev['level'] > 0): merge_level = b_prev['level'] if (b_next is not None and not b_next['data_empty'] and (merge_level is None or merge_level < b_next['level'])): merge_level = b_next['level'] if merge_level is not None: b['level'] = merge_level fold_stack = [] cur_fold = _Fold(0, 0, 0) max_level = 0 for b in blocks: max_level = max(max_level, b['level']) if b['level'] > cur_fold.level: fold_stack.append(cur_fold) cur_fold = _Fold(b['level'], b['start'], b['end']) else: while b['level'] < cur_fold.level: child = cur_fold if fold_stack[-1].level < cur_fold.level - 1: cur_fold = _Fold(cur_fold.level - 1, cur_fold.start, cur_fold.end) else: cur_fold = fold_stack.pop() cur_fold.children.append(child) cur_fold.end = max(cur_fold.end, child.end) cur_fold.end = b['end'] while fold_stack: fold_stack[-1].children.append(cur_fold) cur_fold = fold_stack.pop() assert(cur_fold.level == 0) cur_fold.end = len(lines) def color_lines(fold, lines, colored_lines): if fold.level == 0 or len(attrs_quote) < 1: attr = attr_text else: attr = attrs_quote[(fold.level - 1) % len(attrs_quote)] colored_lines[fold.start:fold.end] = [(attr, line) for line in lines[fold.start:fold.end]] for c in fold.children: color_lines(c, lines, colored_lines) colored_lines = lines[:] color_lines(cur_fold, lines, colored_lines) return colored_lines, cur_fold, max_level def _highlight_pygments(self, text, part, attr_text): """Color text using pygments""" if not have_pygments: return None # get formatter for terminal # TODO: support terminal256 and theming try: formatter = pygments.formatters.get_formatter_by_name('terminal') except pygments.util.ClassNotFound: logging.warning('Could not get pygments formatter for terminal') return None # try finding a lexer for this part lexer = None # first, try guessing by filename/content try: if part.filename is not None: lexer = pygments.lexers.guess_lexer_for_filename( part.filename, text) else: lexer = pygments.lexers.guess_lexer(text) except pygments.util.ClassNotFound: logging.debug('No pygments lexer for filename: %s', part.filename) # second, try the MIME type try: lexer = pygments.lexers.get_lexer_for_mimetype(part.content_type) except pygments.util.ClassNotFound: logging.debug('No pygments lexer for MIME type: %s', part.content_type) # handle git-send-email patches, which are sent as text/plain if all(s in text for s in ('\ndiff --git', '\nindex', '\n---', '\n+++')): try: lexer = pygments.lexers.get_lexer_by_name('diff') except pygments.util.ClassNotFound: logging.warning('Could not get a lexer/formatter for diff highlighting') # FIXME: this is a hack to prevent getting a dummy lexer for plaintext # that does nothing and prevents our quote detection code from running # or the html lexer, which is not necessary since we already processed # html with an external renderer if lexer is None or 'text' in lexer.aliases or 'html' in lexer.aliases: logging.info('No pygments lexer for MIME part: %s', part) return None text = pygments.highlight(text, lexer, formatter) lines = ansi_term.parse_escapes_to_urwid(text, attr_text) return lines, _Fold(0, 0, len(lines)), 0 @property def foldlevel(self): return self._fold_level @foldlevel.setter def foldlevel(self, val): val = max(1, min(self._max_level + 1, val)) logging.info('settting foldlevel to %d', val) markup_lines = self._fold.fold(self._lines, val, self._fold_context) if len(markup_lines) == 0: markup_lines = [''] self._body_placeholder.original_widget = urwid.Text(markup_lines) self._fold_level = val class _EmptyPartWidget(_MIMEPartWidget): def __init__(self): body_wgt = urwid.Text('<<< No displayable content >>>', align = 'center') super().__init__(body_wgt) def _handle_multipart(mime_tree, alternative_pref, force_inline_types): children = [] for child in mime_tree.children: ch = _render_mime_tree(child, alternative_pref, force_inline_types) if ch is not None and not isinstance(ch, _EmptyPartWidget): children.append(ch) if len(children) > 0: # there exist some intelligence-challenged systems in the wild producing # multipart/alternative with just a single alternative - treat those as # mixed and so avoid displaying pointless decorations if mime_tree.is_alternative and len(children) > 1: return _MultiAltWidget(children, mime_tree.children, alternative_pref) return _MultiMixedWidget(children) def _render_mime_tree(mime_tree, alternative_pref, force_inline_types): # handle encrypted/signed parts if mime_tree.is_signed or mime_tree.is_encrypted: return _CryptPartWidget(mime_tree, alternative_pref, force_inline_types) if mime_tree.children is not None: # multipart MIME parts return _handle_multipart(mime_tree, alternative_pref, force_inline_types) ## no children - this is a leaf node # skip attachment parts, unless they are of a user-specified type if mime_tree.attachment: force_inline = False for t in force_inline_types: maintype, _, subtype = t.partition('/') if (maintype == mime_tree.content_maintype and (subtype == mime_tree.content_subtype or subtype == '*')): force_inline = True break if not force_inline: return None # try rendering the message text = mime_tree.render_str() if text is not None: return _TextPart(text, mime_tree) if len(text.strip()) else _EmptyPartWidget() body_wgt = urwid.Text('Undisplayable "%s" MIME part.' % mime_tree.content_type, align = 'center') return _MIMEPartWidget(body_wgt) class HeadersWidget(urwid.WidgetWrap): """ A flow widget displaying message headers. """ _key_attr = None _value_attr = None _gaps_attr = None def __init__(self, key_attr, value_attr, gaps_attr): """ :param headerslist: list of key/value pairs to display :type headerslist: list of (str, str) :param key_attr: theming attribute to use for keys :type key_attr: urwid.AttrSpec :param value_attr: theming attribute to use for values :type value_attr: urwid.AttrSpec :param gaps_attr: theming attribute to wrap lines in :type gaps_attr: urwid.AttrSpec """ self._key_attr = key_attr self._value_attr = value_attr self._gaps_attr = gaps_attr super().__init__(urwid.Pile([])) def set_headers(self, headers): if len(headers) == 0: self._w.contents.clear() return max_key_len = max(map(lambda x: len(x[0]), headers)) widgets = [] for key, value in headers: value = value.translate(settings.sanitize_header_table) # TODO even/odd keyw = ('fixed', max_key_len + 1, urwid.Text((self._key_attr, key))) valuew = urwid.Text((self._value_attr, value)) linew = urwid.AttrMap(urwid.Columns([keyw, valuew]), self._gaps_attr) widgets.append(linew) self._w.contents = [(w, ('pack', None)) for w in widgets] def _make_ascii_trans(): trans = { 0x09 : ' ' * 8, # tab 0x0b : '<\\v>\n', # vertical tab 0x0c : '<\\f>\n', # form feed 0x0d : '', # carriage return } for i in range(256): if (i >= 0x20 and i <= 0x7e) or i == 0x0a: # printables and \n trans[i] = chr(i) elif not i in trans: trans[i] = r'<\x%x>' % i return trans _ascii_trans = _make_ascii_trans() def _ascii_to_printable(data): """ Prepare ASCII data for printing. :param data: input data :type data: bytes :returns: Data suitable for printing. It will contain only printable characters (U+0020-U+007E) and a newline (U+000A). :rtype: str """ return data.decode('latin1').translate(_ascii_trans) class _RawMessageWidget(urwid.WidgetWrap): """ A flow widget displaying the "raw" message. """ def __init__(self, msg_bytes): sourcetxt = _ascii_to_printable(msg_bytes) super().__init__(urwid.Text(sourcetxt)) class MessageWidget(urwid.WidgetWrap): """ A flow widget displaying the contents of a single :class:`alot.db.Message`. """ _display_all_headers = None _display_source = None _headers_wgt = None _source_wgt = None _body_wgt = None _attach_wgt = None def __init__(self, message): """ :param message: Message to display :type message: alot.db.Message """ self._message = message force_inline_types = settings.get('thread_force_inline_mimetypes') if settings.get('prefer_plaintext'): alternative_pref = 'text/plain' else: alternative_pref = 'text/html' self._headers_wgt = self._get_headers() self._source_wgt = _RawMessageWidget(message.as_bytes()) self._attach_wgt = self._get_attachments() self._body_wgt = _render_mime_tree(message.body, alternative_pref, force_inline_types) if self._body_wgt is None: self._body_wgt = _EmptyPartWidget() super().__init__(urwid.ListBox(urwid.SimpleListWalker([]))) self.display_all_headers = False self.display_source = False self.foldlevel = settings.get('thread_fold_initial_level') def get_message(self): return self._message def _get_headers(self): key_attr = settings.get_theming_attribute('thread', 'header_key') value_attr = settings.get_theming_attribute('thread', 'header_value') gaps_attr = settings.get_theming_attribute('thread', 'header') return HeadersWidget(key_attr, value_attr, gaps_attr) def _get_attachments(self): alist = [] for a in self._message.iter_attachments(): alist.append(AttachmentWidget(a)) if alist: return urwid.Pile(alist) return None @property def display_all_headers(self): return self._display_all_headers @display_all_headers.setter def display_all_headers(self, val): val = bool(val) if val == self._display_all_headers: return headers = self._message.headers lines = [] if val: # collect all header/value pairs in the order they appear lines = list(headers.items()) else: # only a selection of headers should be displayed. # use order of the `headers` parameter for key in settings.get('displayed_headers'): if key in headers: if key.lower() in ['cc', 'bcc', 'to']: lines.append((key, ', '.join(headers[key]))) else: for value in headers[key]: lines.append((key, value)) elif key.lower() == 'tags': # TODO this should be in a separate widget logging.debug('want tags header') values = [] for t in self._message.get_tags(): tagrep = settings.get_tagstring_representation(t) if t is not tagrep['translated']: t = '%s (%s)' % (tagrep['translated'], t) values.append(t) lines.append((key, ', '.join(values))) self._headers_wgt.set_headers(lines) self._display_all_headers = val @property def display_source(self): return self._display_source @display_source.setter def display_source(self, val): val = bool(val) if val == self._display_source: return widgets = [] if val: widgets.append(self._source_wgt) else: widgets.append(self._headers_wgt) if self._attach_wgt is not None: widgets.append(self._attach_wgt) widgets.append(self._body_wgt) self._w.body[:] = widgets self._display_source = val @property def foldlevel(self): return self._body_wgt.foldlevel @foldlevel.setter def foldlevel(self, val): self._body_wgt.foldlevel = val def cycle_alt(self): self._body_wgt.cycle_alt() def get_selected_attachment(self): """ If an AttachmentWidget is currently focused, return it. Otherwise return None. """ if self._w.focus is self._attach_wgt: return self._attach_wgt.focus.attachment return None class _ThreadNodeDecoration(urwid.Widget): """ Decoration indicating thread structure in rows corresponding to a single message. This is a box widget that is rendered alongside the message summary widget and is sized by their common container to have a matching height. The decoration consists of a list of blocks, where nth entry decorates the structure at the nth depth level. Each block is rendered with the same 'indent' width. The last block is treated specially, since its first row contains the arrow pointing to the message itself. """ _ARROW = '➤' _HBAR = '─' _VBAR = '│' _TNODE = '├' _CORNER = '└' _sizing = frozenset(['box']) # depth of the correponding message in the thread _depth = None # decorations for all the blocks before the last one _deco_leading = None # decoration for the first row of the last block _deco_last_row0 = None # decoration for the other rows of the last block _deco_last = None def __init__(self, have_next_sibling): self._depth = len(have_next_sibling) - 1 deco_leading = [] for d in range(self._depth - 1): deco_leading.append(self._VBAR if have_next_sibling[d + 1] else '') self._deco_leading = deco_leading if self._depth > 0: if have_next_sibling[self._depth]: self._deco_last_row0 = self._TNODE self._deco_last = self._VBAR else: self._deco_last_row0 = self._CORNER self._deco_last = '' def render(self, size, focus = False): cols, rows = size indent = cols // self._depth if indent < 1: return SolidCanvas(' ', cols, rows) # pad each leading block with spaces according to indent blocks_leading = [] for d in self._deco_leading: pad = max(indent - len(d), 0) blocks_leading.append(d + ' ' * pad) # build the arrow pointing to the message in the first row # of the last block pad_last0 = max(indent - len(self._deco_last_row0), 0) block_last0 = self._deco_last_row0 + self._HBAR * (pad_last0 - 1) + self._ARROW * (pad_last0 > 0) lines = [''.join(blocks_leading + [block_last0])] pad_last = max(indent - len(self._deco_last), 0) block_last = self._deco_last + ' ' * pad_last lines.extend([''.join(blocks_leading + [block_last])] * (rows - 1)) text, cs = list(zip(*(apply_target_encoding(l) for l in lines))) return TextCanvas(text, cs = cs) class ThreadNode(urwid.WidgetWrap): """ A node in the thread tree. It is a flow widget that consists of 2 columns: - (optional) decoration indicating the thread structure - message summary """ _decor = None _msg_depth = None def __init__(self, msg, thread, pos, indent): parity = 'odd' if pos & 1 else 'even' attr = settings.get_theming_attribute('thread', 'summary', parity) attr_focus = settings.get_theming_attribute('thread', 'summary', 'focus') msg_summary = MessageSummaryWidget(msg, attr, attr_focus) if msg.depth == 0: wgt = msg_summary else: ancestor_chain = [msg] for p in msg.parents(): ancestor_chain.insert(0, p) have_sibling = [] for d, m in enumerate(ancestor_chain): if d == 0: have_sibling.append(m != thread.toplevel_messages[-1]) else: have_sibling.append(m != ancestor_chain[d - 1].replies[-1]) self._decor = _ThreadNodeDecoration(have_sibling) wgt = urwid.Columns([(indent * msg.depth, self._decor), msg_summary], box_columns = [0]) super().__init__(wgt) self._msg_depth = msg.depth self.set_indent(indent) def set_indent(self, indent): if self._decor is None: return self._w.contents[0] = (self._decor, ('given', self._msg_depth * indent, True))