# Copyright (C) 2011-2012 Patrick Totzke # This file is released under the GNU GPL, version 3 or a later revision. # For further details see the COPYING file """ Widgets specific to thread mode """ import logging import re import urwid try: import pygments.lexers, pygments.formatters, pygments.util have_pygments = True except ImportError: have_pygments = False from .globals import TagWidget from .globals import AttachmentWidget from ..settings.const import settings from ..utils import ansi_term class MessageSummaryWidget(urwid.WidgetWrap): """ one line summary of a :class:`~alot.db.message.Message`. """ def __init__(self, message, attr, attr_focus): """ :param message: a message :type message: alot.db.Message """ author, address = message.get_author() text = author if author else address subj = self._extract_subj(message) if subj: text += ': ' + subj date = message.get_datestring() if date is not None: text += " (%s)" % date text = text.translate(settings.sanitize_header_table) cols = [urwid.Text(text)] if settings.get('msg_summary_hides_threadwide_tags'): thread_tags = message.thread.get_tags(intersection=True) outstanding_tags = set(message.get_tags()).difference(thread_tags) tag_widgets = sorted(TagWidget(t, attr, attr_focus) for t in outstanding_tags) else: tag_widgets = sorted(TagWidget(t, attr, attr_focus) for t in message.get_tags()) for tag_widget in tag_widgets: if not tag_widget.hidden: cols.append(('fixed', tag_widget.width(), tag_widget)) line = urwid.AttrMap(urwid.Columns(cols, dividechars=1), attr, attr_focus) super().__init__(line) def _extract_subj(self, msg): subj_style = settings.get('thread_message_summary_subject') if subj_style == 'no': return '' try: subj = ','.join(msg.headers['Subject']) except KeyError: return '' # for "smart" display style, check if normalized subject # is the same as parent's if subj_style == 'smart' and msg.parent is not None: try: parent_subj = ','.join(msg.parent.headers['Subject']) except KeyError: pass else: def normalize_subj(subj): # normalize subject for comparison - replace all # whitespace sequences with a single space and strip # leading re:'s subj = re.sub(r'\s+', ' ', subj) return re.sub(r'(re:? *)*', '', subj, flags = re.IGNORECASE) if normalize_subj(subj) == normalize_subj(parent_subj): return '' return re.sub(r'\n\s+', r' ', subj) class _MIMEPartWidget(urwid.WidgetWrap): # when not None, a list of child _MIMEPartWidget subclasses _children = None _fold_level = None def __init__(self, body_wgt, children = None): self._children = children self._fold_level = 0 super().__init__(body_wgt) @property def foldlevel(self): return self._fold_level @foldlevel.setter def foldlevel(self, val): if self._children is None: return val_in = max(0, val) val_out = 0 for ch in self._children: ch.foldlevel = val_in val_out = max(val_out, ch.foldlevel) self._fold_level = val_out def cycle_alt(self): if self._children is None: return for c in self._children: c.cycle_alt() class _CryptPartWidget(_MIMEPartWidget): def __init__(self, mime_tree, alternative_pref, force_inline_types): children = None # handle broken crypto parts where we could not get the payload if (mime_tree.children is None or len(mime_tree.children) != 1): text = 'Invalid "%s" MIME part' % mime_tree.content_type if mime_tree.crypt_error: text += ': ' + mime_tree.crypt_error body_wgt = urwid.Text(text, align = 'center') else: text_parts = [] if mime_tree.is_encrypted and mime_tree.is_signed: desc = 'encrypted+signed' if mime_tree.is_encrypted: desc = 'encrypted' elif mime_tree.is_signed: desc = 'signed' text_parts.append(desc + ' MIME part') if mime_tree.is_signed and mime_tree.sig_valid: if mime_tree.sig_trusted: trust = 'trusted' attr_name = 'crypt_trusted' else: trust = 'untrusted' attr_name = 'crypt_untrusted' t = 'valid %s signature from "%s"' % (trust, mime_tree.signer_id) text_parts.append(t) else: attr_name = 'crypt_unsigned' if mime_tree.crypt_error: text_parts.append('crypto processing error: ' + mime_tree.crypt_error) attr_name = 'crypt_invalid' child = _render_mime_tree(mime_tree.children[0], alternative_pref, force_inline_types) \ or _EmptyPartWidget() attr = settings.get_theming_attribute('thread', attr_name) body_wgt = urwid.AttrMap(urwid.LineBox(child, title = ':'.join(text_parts)), attr) children = [child] super().__init__(body_wgt, children) class _MultiMixedWidget(_MIMEPartWidget): def __init__(self, children): super().__init__(urwid.Pile(children), children) class _MultiAltWidget(_MIMEPartWidget): _cur_idx = None _child_parts = None _sel_placeholder = None def __init__(self, child_widgets, child_parts, alternative_pref): self._child_parts = child_parts child_idx = None for idx, ch in enumerate(child_parts): if ch.content_type == alternative_pref: child_idx = idx break if child_idx is None: child_idx = 0 self._sel_placeholder = urwid.WidgetPlaceholder(urwid.Text('')) body_wgt = urwid.LineBox(self._sel_placeholder) super().__init__(body_wgt, child_widgets) self._select_alt(child_idx) def _select_alt(self, idx): child_part = self._child_parts[idx] child_wgt = self._children[idx] self._w.set_title('Alternative MIME part %d/%d: %s' % (idx + 1, len(self._children), child_part.content_type)) self._sel_placeholder.original_widget = child_wgt self._cur_idx = idx def cycle_alt(self): self._select_alt((self._cur_idx + 1) % len(self._children)) class _Fold: level = None start = None end = None children = None def __init__(self, level, start, end): self.children = [] self.level = level self.start = start self.end = end def fold(self, lines, level, context): ret = [] if level == self.level: # this level is the lowest folded one # display only the context ctx_start, ctx_end = context length = self.end - self.start if length <= ctx_start + ctx_end + 1: ret.extend(lines[self.start:self.end]) else: ret.extend(lines[self.start:self.start + ctx_start]) attr = ret[-1][0] ret.append((attr, '%s [...%d lines folded...]\n' % ('> ' * level, length - ctx_start - ctx_end))) ret.extend(lines[self.end - ctx_end:self.end]) elif level > self.level: # this level is not folded, display as-is # but we still need to fold the children individually # start by adding lines before the first child, or all # lines when there are no children end = self.children[0].start if self.children else self.end ret.extend(lines[self.start:end]) for i, c in enumerate(self.children): # fold the child recursively ret.extend(c.fold(lines, level, context)) # add lines inbetween this child's end and next child's start # or our end if this is the last child next_start = self.children[i + 1].start if (i + 1 < len(self.children)) \ else self.end ret.extend(lines[c.end:next_start]) return ret def __repr__(self): return 'Fold(%d, %d, %d, %s)' % \ (self.level, self.start, self.end, self.children) class _TextPart(_MIMEPartWidget): _QUOTE_CHARS = '>|:}#' _QUOTE_REGEX = '(([ \t]*[{quote_chars}])+)'.format(quote_chars = _QUOTE_CHARS) _body_placeholder = None _max_level = None _lines = None _fold = None def __init__(self, text, part): attr_text = settings.get_theming_attribute('thread', 'body') attrs_quote = settings.get_quote_theming() self._fold_context = settings.get('thread_fold_context') text = text.translate(settings.sanitize_text_table) # try highlighting with pygments first hilit = self._highlight_pygments(text, part, attr_text) if hilit is None: # fallback - process as a normal text mail body: # detect quoted blocks hilit = self._parse_quotes(text, attr_text, attrs_quote) self._lines, self._fold, self._max_level = hilit self._body_placeholder = urwid.WidgetPlaceholder(urwid.Text('')) # decorate inline-forced attachments with a linebox if part.attachment: title = 'attachment %s' % part.content_type fname = part.attachment.filename if fname: title += ': %s' % fname wgt = urwid.LineBox(self._body_placeholder, title = title) else: wgt = self._body_placeholder super().__init__(wgt) def _parse_quotes(self, text, attr_text, attrs_quote): """ Parse quotes in email body, generate folds and theming from them. """ # first pass: assign a level to each line based on the number of quote # characters at the beginning lines = text.splitlines(keepends = True) blocks = [] for i, line in enumerate(lines): level = 0 m = re.match(self._QUOTE_REGEX, line) if m is not None: quote_prefix = m.group(0) remainder = line[len(quote_prefix):] for c in self._QUOTE_CHARS: level += quote_prefix.count(c) else: quote_prefix = '' remainder = line if len(blocks) == 0 or blocks[-1]['level'] != level: # start a new block blocks.append({ 'level' : level, 'start' : i, 'end' : i, 'data_empty' : True}) blocks[-1]['end'] += 1 if blocks[-1]['data_empty'] and len(remainder.strip()) > 0: blocks[-1]['data_empty'] = False # second pass: every block where the remainder (line minus the # leading quote characters) is empty gets merged into an adjacent quoted # block, higher level preferred for idx, b in enumerate(blocks): if b['data_empty']: b_prev = blocks[idx - 1] if idx > 0 else None b_next = blocks[idx + 1] if idx + 1 < len(blocks) else None merge_level = None if (b_prev is not None and not b_prev['data_empty'] and b_prev['level'] > 0): merge_level = b_prev['level'] if (b_next is not None and not b_next['data_empty'] and (merge_level is None or merge_level < b_next['level'])): merge_level = b_next['level'] if merge_level is not None: b['level'] = merge_level fold_stack = [] cur_fold = _Fold(0, 0, 0) max_level = 0 for b in blocks: max_level = max(max_level, b['level']) if b['level'] > cur_fold.level: fold_stack.append(cur_fold) cur_fold = _Fold(b['level'], b['start'], b['end']) else: while b['level'] < cur_fold.level: child = cur_fold if fold_stack[-1].level < cur_fold.level - 1: cur_fold = _Fold(cur_fold.level - 1, cur_fold.start, cur_fold.end) else: cur_fold = fold_stack.pop() cur_fold.children.append(child) cur_fold.end = max(cur_fold.end, child.end) cur_fold.end = b['end'] while fold_stack: fold_stack[-1].children.append(cur_fold) cur_fold = fold_stack.pop() assert(cur_fold.level == 0) cur_fold.end = len(lines) def color_lines(fold, lines, colored_lines): if fold.level == 0 or len(attrs_quote) < 1: attr = attr_text else: attr = attrs_quote[(fold.level - 1) % len(attrs_quote)] colored_lines[fold.start:fold.end] = [(attr, line) for line in lines[fold.start:fold.end]] for c in fold.children: color_lines(c, lines, colored_lines) colored_lines = lines[:] color_lines(cur_fold, lines, colored_lines) return colored_lines, cur_fold, max_level def _highlight_pygments(self, text, part, attr_text): """Color text using pygments""" if not have_pygments: return None # get formatter for terminal # TODO: support terminal256 and theming try: formatter = pygments.formatters.get_formatter_by_name('terminal') except pygments.util.ClassNotFound: logging.warning('Could not get pygments formatter for terminal') return None # try finding a lexer for this part lexer = None # first, try guessing by filename/content try: if part.filename is not None: lexer = pygments.lexers.guess_lexer_for_filename( part.filename, text) else: lexer = pygments.lexers.guess_lexer(text) except pygments.util.ClassNotFound: logging.debug('No pygments lexer for filename: %s', part.filename) # second, try the MIME type try: lexer = pygments.lexers.get_lexer_for_mimetype(part.content_type) except pygments.util.ClassNotFound: logging.debug('No pygments lexer for MIME type: %s', part.content_type) # handle git-send-email patches, which are sent as text/plain if all(s in text for s in ('\ndiff --git', '\nindex', '\n---', '\n+++')): try: lexer = pygments.lexers.get_lexer_by_name('diff') except pygments.util.ClassNotFound: logging.warning('Could not get a lexer/formatter for diff highlighting') # FIXME: this is a hack to prevent getting a dummy lexer for plaintext # that does nothing and prevents our quote detection code from running # or the html lexer, which is not necessary since we already processed # html with an external renderer if lexer is None or 'text' in lexer.aliases or 'html' in lexer.aliases: logging.info('No pygments lexer for MIME part: %s', part) return None text = pygments.highlight(text, lexer, formatter) lines = ansi_term.parse_escapes_to_urwid(text, attr_text) return lines, _Fold(0, 0, len(lines)), 0 @property def foldlevel(self): return self._fold_level @foldlevel.setter def foldlevel(self, val): val = max(1, min(self._max_level + 1, val)) logging.info('settting foldlevel to %d', val) markup_lines = self._fold.fold(self._lines, val, self._fold_context) if len(markup_lines) == 0: markup_lines = [''] self._body_placeholder.original_widget = urwid.Text(markup_lines) self._fold_level = val class _EmptyPartWidget(_MIMEPartWidget): def __init__(self): body_wgt = urwid.Text('<<< No displayable content >>>', align = 'center') super().__init__(body_wgt) def _handle_multipart(mime_tree, alternative_pref, force_inline_types): children = [] for child in mime_tree.children: ch = _render_mime_tree(child, alternative_pref, force_inline_types) if ch is not None and not isinstance(ch, _EmptyPartWidget): children.append(ch) if len(children) > 0: # there exist some intelligence-challenged systems in the wild producing # multipart/alternative with just a single alternative - treat those as # mixed and so avoid displaying pointless decorations if mime_tree.is_alternative and len(children) > 1: return _MultiAltWidget(children, mime_tree.children, alternative_pref) return _MultiMixedWidget(children) def _render_mime_tree(mime_tree, alternative_pref, force_inline_types): # handle encrypted/signed parts if mime_tree.is_signed or mime_tree.is_encrypted: return _CryptPartWidget(mime_tree, alternative_pref, force_inline_types) if mime_tree.children is not None: # multipart MIME parts return _handle_multipart(mime_tree, alternative_pref, force_inline_types) ## no children - this is a leaf node # skip attachment parts, unless they are of a user-specified type if mime_tree.attachment: force_inline = False for t in force_inline_types: maintype, _, subtype = t.partition('/') if (maintype == mime_tree.content_maintype and (subtype == mime_tree.content_subtype or subtype == '*')): force_inline = True break if not force_inline: return None # try rendering the message text = mime_tree.render_str() if text is not None: return _TextPart(text, mime_tree) if len(text.strip()) else _EmptyPartWidget() body_wgt = urwid.Text('Undisplayable "%s" MIME part.' % mime_tree.content_type, align = 'center') return _MIMEPartWidget(body_wgt) class HeadersWidget(urwid.WidgetWrap): """ A flow widget displaying message headers. """ _key_attr = None _value_attr = None _gaps_attr = None def __init__(self, key_attr, value_attr, gaps_attr): """ :param headerslist: list of key/value pairs to display :type headerslist: list of (str, str) :param key_attr: theming attribute to use for keys :type key_attr: urwid.AttrSpec :param value_attr: theming attribute to use for values :type value_attr: urwid.AttrSpec :param gaps_attr: theming attribute to wrap lines in :type gaps_attr: urwid.AttrSpec """ self._key_attr = key_attr self._value_attr = value_attr self._gaps_attr = gaps_attr super().__init__(urwid.Pile([])) def set_headers(self, headers): if len(headers) == 0: self._w.contents.clear() return max_key_len = max(map(lambda x: len(x[0]), headers)) widgets = [] for key, value in headers: value = value.translate(settings.sanitize_header_table) # TODO even/odd keyw = ('fixed', max_key_len + 1, urwid.Text((self._key_attr, key))) valuew = urwid.Text((self._value_attr, value)) linew = urwid.AttrMap(urwid.Columns([keyw, valuew]), self._gaps_attr) widgets.append(linew) self._w.contents = [(w, ('pack', None)) for w in widgets] def _make_ascii_trans(): trans = { 0x09 : ' ' * 8, # tab 0x0b : '<\\v>\n', # vertical tab 0x0c : '<\\f>\n', # form feed 0x0d : '', # carriage return } for i in range(256): if (i >= 0x20 and i <= 0x7e) or i == 0x0a: # printables and \n trans[i] = chr(i) elif not i in trans: trans[i] = r'<\x%x>' % i return trans _ascii_trans = _make_ascii_trans() def _ascii_to_printable(data): """ Prepare ASCII data for printing. :param data: input data :type data: bytes :returns: Data suitable for printing. It will contain only printable characters (U+0020-U+007E) and a newline (U+000A). :rtype: str """ return data.decode('latin1').translate(_ascii_trans) class _RawMessageWidget(urwid.WidgetWrap): """ A flow widget displaying the "raw" message. """ def __init__(self, msg_bytes): sourcetxt = _ascii_to_printable(msg_bytes) super().__init__(urwid.Text(sourcetxt)) class MessageWidget(urwid.WidgetWrap): """ A flow widget displaying the contents of a single :class:`alot.db.Message`. """ _display_all_headers = None _display_source = None _headers_wgt = None _source_wgt = None _body_wgt = None _attach_wgt = None def __init__(self, message): """ :param message: Message to display :type message: alot.db.Message """ self._message = message force_inline_types = settings.get('thread_force_inline_mimetypes') if settings.get('prefer_plaintext'): alternative_pref = 'text/plain' else: alternative_pref = 'text/html' self._headers_wgt = self._get_headers() self._source_wgt = _RawMessageWidget(message.as_bytes()) self._attach_wgt = self._get_attachments() self._body_wgt = _render_mime_tree(message.body, alternative_pref, force_inline_types) if self._body_wgt is None: self._body_wgt = _EmptyPartWidget() super().__init__(urwid.ListBox(urwid.SimpleListWalker([]))) self.display_all_headers = False self.display_source = False self.foldlevel = settings.get('thread_fold_initial_level') def get_message(self): return self._message def _get_headers(self): key_attr = settings.get_theming_attribute('thread', 'header_key') value_attr = settings.get_theming_attribute('thread', 'header_value') gaps_attr = settings.get_theming_attribute('thread', 'header') return HeadersWidget(key_attr, value_attr, gaps_attr) def _get_attachments(self): alist = [] for a in self._message.iter_attachments(): alist.append(AttachmentWidget(a)) if alist: return urwid.Pile(alist) return None @property def display_all_headers(self): return self._display_all_headers @display_all_headers.setter def display_all_headers(self, val): val = bool(val) if val == self._display_all_headers: return headers = self._message.headers lines = [] if val: # collect all header/value pairs in the order they appear lines = list(headers.items()) else: # only a selection of headers should be displayed. # use order of the `headers` parameter for key in settings.get('displayed_headers'): if key in headers: if key.lower() in ['cc', 'bcc', 'to']: lines.append((key, ', '.join(headers[key]))) else: for value in headers[key]: lines.append((key, value)) elif key.lower() == 'tags': # TODO this should be in a separate widget logging.debug('want tags header') values = [] for t in self._message.get_tags(): tagrep = settings.get_tagstring_representation(t) if t is not tagrep['translated']: t = '%s (%s)' % (tagrep['translated'], t) values.append(t) lines.append((key, ', '.join(values))) self._headers_wgt.set_headers(lines) self._display_all_headers = val @property def display_source(self): return self._display_source @display_source.setter def display_source(self, val): val = bool(val) if val == self._display_source: return widgets = [] if val: widgets.append(self._source_wgt) else: widgets.append(self._headers_wgt) if self._attach_wgt is not None: widgets.append(self._attach_wgt) widgets.append(self._body_wgt) self._w.body[:] = widgets self._display_source = val @property def foldlevel(self): return self._body_wgt.foldlevel @foldlevel.setter def foldlevel(self, val): self._body_wgt.foldlevel = val def cycle_alt(self): self._body_wgt.cycle_alt() def get_selected_attachment(self): """ If an AttachmentWidget is currently focused, return it. Otherwise return None. """ if self._w.focus is self._attach_wgt: return self._attach_wgt.focus.attachment return None class ThreadNode(urwid.WidgetWrap): _ARROW = '➤' _HBAR = '─' _VBAR = '│' _TNODE = '├' _CORNER = '└' _decor_text = None _have_next_sibling = None def __init__(self, msg, thread, pos, indent): parity = 'odd' if pos & 1 else 'even' attr = settings.get_theming_attribute('thread', 'summary', parity) attr_focus = settings.get_theming_attribute('thread', 'summary', 'focus') msg_summary = MessageSummaryWidget(msg, attr, attr_focus) if msg.depth == 0: wgt = msg_summary else: self._decor_text = urwid.Text('') wgt = urwid.Columns([('pack', self._decor_text), msg_summary]) ancestor_chain = [msg] for p in msg.parents(): ancestor_chain.insert(0, p) have_sibling = [] for d, m in enumerate(ancestor_chain): if d == 0: have_sibling.append(m != thread.toplevel_messages[-1]) else: have_sibling.append(m != ancestor_chain[d - 1].replies[-1]) self._have_next_sibling = have_sibling super().__init__(wgt) self.set_indent(indent) def set_indent(self, indent): if self._decor_text is None: return seg_text = [] if indent > 0: depth = len(self._have_next_sibling) - 1 for d in range(depth): if d == depth - 1: corner = self._TNODE if self._have_next_sibling[d + 1] else self._CORNER seg_text.append(corner + self._ARROW) else: bar = self._VBAR if self._have_next_sibling[d + 1] else ' ' seg_text.append(bar + ' ') self._decor_text.set_text(''.join(seg_text))