"""
This file is part of alot.
Alot is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
Notmuch is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with notmuch. If not, see .
Copyright (C) 2011 Patrick Totzke
"""
import urwid
from twisted.internet import reactor, defer
from settings import config
from buffers import BufferlistBuffer
import commands
from commands import commandfactory
from alot.commands import CommandParseError
import widgets
from completion import CommandLineCompleter
class InputWrap(urwid.WidgetWrap):
def __init__(self, ui, rootwidget):
urwid.WidgetWrap.__init__(self, rootwidget)
self.ui = ui
self.rootwidget = rootwidget
self.select_cancel_only = False
def set_root(self, w):
self._w = w
def get_root(self):
return self._w
def allowed_command(self, cmd):
if not self.select_cancel_only:
return True
elif isinstance(cmd, commands.globals.SendKeypressCommand):
if cmd.key in ['select', 'cancel']:
return True
else:
return False
def keypress(self, size, key):
self.ui.logger.debug('got key: \'%s\'' % key)
mode = self.ui.mode
if self.select_cancel_only:
mode = 'global'
cmdline = config.get_mapping(mode, key)
if cmdline:
try:
cmd = commandfactory(cmdline, mode)
if self.allowed_command(cmd):
self.ui.apply_command(cmd)
return None
except CommandParseError, e:
self.ui.notify(e.message, priority='error')
self.ui.logger.debug('relaying key: %s' % key)
return self._w.keypress(size, key)
class UI(object):
buffers = []
current_buffer = None
def __init__(self, dbman, log, accountman, initialcmd, colourmode):
self.dbman = dbman
self.dbman.ui = self # register ui with dbman
self.logger = log
self.accountman = accountman
if not colourmode:
colourmode = config.getint('general', 'colourmode')
self.logger.info('setup gui in %d colours' % colourmode)
self.mainframe = urwid.Frame(urwid.SolidFill())
self.inputwrap = InputWrap(self, self.mainframe)
self.mainloop = urwid.MainLoop(self.inputwrap,
config.get_palette(),
handle_mouse=False,
event_loop=urwid.TwistedEventLoop(),
unhandled_input=self.unhandeled_input)
self.mainloop.screen.set_terminal_properties(colors=colourmode)
self.show_statusbar = config.getboolean('general', 'show_statusbar')
self.notificationbar = None
self.mode = 'global'
self.commandprompthistory = []
self.logger.debug('fire first command')
self.apply_command(initialcmd)
self.mainloop.run()
def unhandeled_input(self, key):
self.logger.debug('unhandeled input: %s' % key)
def keypress(self, key):
self.inputwrap.keypress((150, 20), key)
def show_as_root_until_keypress(self, w, key, relay_rest=True,
afterwards=None):
def oe():
self.inputwrap.set_root(self.mainframe)
self.inputwrap.select_cancel_only = False
if callable(afterwards):
self.logger.debug('called')
afterwards()
self.logger.debug('relay: %s' % relay_rest)
helpwrap = widgets.CatchKeyWidgetWrap(w, key, on_catch=oe,
relay_rest=relay_rest)
self.inputwrap.set_root(helpwrap)
self.inputwrap.select_cancel_only = not relay_rest
def prompt(self, prefix='>', text=u'', completer=None, tab=0, history=[]):
"""prompt for text input
:param prefix: text to print before the input field
:type prefix: str
:param text: initial content of the input field
:type text: str
:param completer: completion object to use
:type completer: `alot.completion.Completer`
:param tab: number of tabs to press initially
(to select completion results)
:type tab: int
:param history: history to be used for up/down keys
:type history: list of str
:returns: a `twisted.defer.Deferred`
"""
d = defer.Deferred() # create return deferred
oldroot = self.inputwrap.get_root()
def select_or_cancel(text):
# restore main screen
self.inputwrap.set_root(oldroot)
self.inputwrap.select_cancel_only = False
d.callback(text)
#set up widgets
leftpart = urwid.Text(prefix, align='left')
editpart = widgets.CompleteEdit(completer, on_exit=select_or_cancel,
edit_text=text, history=history)
for i in range(tab): # hit some tabs
editpart.keypress((0,), 'tab')
# build promptwidget
both = urwid.Columns(
[
('fixed', len(prefix), leftpart),
('weight', 1, editpart),
])
both = urwid.AttrMap(both, 'prompt', 'prompt')
# put promptwidget as overlay on main widget
overlay = urwid.Overlay(both, oldroot,
('fixed left', 0),
('fixed right', 0),
('fixed bottom', 1),
None)
self.inputwrap.set_root(overlay)
self.inputwrap.select_cancel_only = True
return d # return deferred
def exit(self):
reactor.stop()
raise urwid.ExitMainLoop()
@defer.inlineCallbacks
def commandprompt(self, startstring):
"""prompt for a commandline and interpret/apply it upon enter
:param startstring: initial text in edit part
:type startstring: str
"""
self.logger.info('open command shell')
mode = self.current_buffer.typename
cmdline = yield self.prompt(prefix=':',
text=startstring,
completer=CommandLineCompleter(self.dbman,
self.accountman,
mode),
history=self.commandprompthistory,
)
self.logger.debug('CMDLINE: %s' % cmdline)
self.interpret_commandline(cmdline)
def interpret_commandline(self, cmdline):
"""interpret and apply a commandstring
:param cmdline: command string to apply
:type cmdline: str
"""
if cmdline:
mode = self.current_buffer.typename
self.commandprompthistory.append(cmdline)
try:
cmd = commandfactory(cmdline, mode)
self.apply_command(cmd)
except CommandParseError, e:
self.notify(e.message, priority='error')
def buffer_open(self, b):
"""
register and focus new buffer
"""
self.buffers.append(b)
self.buffer_focus(b)
def buffer_close(self, buf):
buffers = self.buffers
if buf not in buffers:
string = 'tried to close unknown buffer: %s. \n\ni have:%s'
self.logger.error(string % (buf, self.buffers))
elif len(buffers) == 1:
self.logger.info('closing the last buffer, exiting')
cmd = commandfactory('exit')
self.apply_command(cmd)
else:
if self.current_buffer == buf:
self.logger.debug('UI: closing current buffer %s' % buf)
index = buffers.index(buf)
buffers.remove(buf)
offset = config.getint('general', 'bufferclose_focus_offset')
nextbuffer = buffers[(index + offset) % len(buffers)]
self.buffer_focus(nextbuffer)
else:
string = 'closing buffer %d:%s'
self.logger.debug(string % (buffers.index(buf), buf))
buffers.remove(buf)
buf.cleanup()
def buffer_focus(self, buf):
"""
focus given buffer. must be contained in self.buffers
"""
if buf not in self.buffers:
self.logger.error('tried to focus unknown buffer')
else:
if self.current_buffer != buf:
self.current_buffer = buf
self.inputwrap.set_root(self.mainframe)
self.mode = buf.typename
if isinstance(self.current_buffer, BufferlistBuffer):
self.current_buffer.rebuild()
self.update()
def get_deep_focus(self, startfrom=None):
if not startfrom:
startfrom = self.current_buffer
if 'get_focus' in dir(startfrom):
focus = startfrom.get_focus()
if isinstance(focus, tuple):
focus = focus[0]
if isinstance(focus, urwid.Widget):
return self.get_deep_focus(startfrom=focus)
return startfrom
def get_buffers_of_type(self, t):
"""returns currently open buffers for a given subclass of
`alot.buffer.Buffer`
"""
return filter(lambda x: isinstance(x, t), self.buffers)
def clear_notify(self, messages):
"""clears notification popups. Usually called in order
to ged rid of messages that don't time out
:param messages: The popups to remove. This should be exactly
what notify() returned
"""
newpile = self.notificationbar.widget_list
for l in messages:
newpile.remove(l)
if newpile:
self.notificationbar = urwid.Pile(newpile)
else:
self.notificationbar = None
self.update()
def choice(self, message, choices={'y': 'yes', 'n': 'no'},
select=None, cancel=None, msg_position='above'):
"""prompt user to make a choice
:param message: string to display before list of choices
:type message: unicode
:param choices: dict of possible choices
:type choices: keymap->choice (both str)
:param select: choice to return if enter/return is hit.
Ignored if set to None.
:type select: str
:param cancel: choice to return if escape is hit.
Ignored if set to None.
:type cancel: str
:returns: a `twisted.defer.Deferred`
"""
assert select in choices.values() + [None]
assert cancel in choices.values() + [None]
assert msg_position in ['left', 'above']
d = defer.Deferred() # create return deferred
oldroot = self.inputwrap.get_root()
def select_or_cancel(text):
self.inputwrap.set_root(oldroot)
self.inputwrap.select_cancel_only = False
d.callback(text)
#set up widgets
msgpart = urwid.Text(message)
choicespart = widgets.ChoiceWidget(choices, callback=select_or_cancel,
select=select, cancel=cancel)
# build widget
if msg_position == 'left':
both = urwid.Columns(
[
('fixed', len(message), msgpart),
('weight', 1, choicespart),
], dividechars=1)
else: # above
both = urwid.Pile([msgpart, choicespart])
both = urwid.AttrMap(both, 'prompt', 'prompt')
# put promptwidget as overlay on main widget
overlay = urwid.Overlay(both, oldroot,
('fixed left', 0),
('fixed right', 0),
('fixed bottom', 1),
None)
self.inputwrap.set_root(overlay)
self.inputwrap.select_cancel_only = True
return d # return deferred
def notify(self, message, priority='normal', timeout=0, block=False):
"""notify popup
:param message: message to print
:type message: str
:param priority: priority string, used to format the popup: currently,
'normal' and 'error' are defined. If you use 'X' here,
the attribute 'notify_X' is used to format the popup.
:type priority: str
:param timeout: seconds until message disappears. Defaults to the value
of 'notify_timeout' in the general config section.
A negative value means never time out.
:type timeout: int
:param block: this notification blocks until a keypress is made
:type block: boolean
"""
def build_line(msg, prio):
cols = urwid.Columns([urwid.Text(msg)])
return urwid.AttrMap(cols, 'notify_' + prio)
msgs = [build_line(message, priority)]
if not self.notificationbar:
self.notificationbar = urwid.Pile(msgs)
else:
newpile = self.notificationbar.widget_list + msgs
self.notificationbar = urwid.Pile(newpile)
self.update()
def clear(*args):
self.clear_notify(msgs)
if block:
# put "cancel to continue" widget as overlay on main widget
txt = urwid.Text('(cancel continues)')
overlay = urwid.Overlay(txt, self.mainframe,
('fixed left', 0),
('fixed right', 0),
('fixed bottom', 0),
None)
self.show_as_root_until_keypress(overlay, 'cancel',
relay_rest=False,
afterwards=clear)
else:
if timeout >= 0:
if timeout == 0:
timeout = config.getint('general', 'notify_timeout')
self.mainloop.set_alarm_in(timeout, clear)
return msgs[0]
def update(self):
"""
redraw interface
"""
#who needs a header?
#head = urwid.Text('notmuch gui')
#h=urwid.AttrMap(head, 'header')
#self.mainframe.set_header(h)
# body
if self.current_buffer:
self.mainframe.set_body(self.current_buffer)
else:
# this happens iff update gets called during
# initial command before a first buffer is displayed.
# in compose, a prompt is cancelled
self.exit()
# footer
lines = []
if self.notificationbar: # .get_text()[0] != ' ':
lines.append(self.notificationbar)
if self.show_statusbar:
lines.append(self.build_statusbar())
if lines:
self.mainframe.set_footer(urwid.Pile(lines))
else:
self.mainframe.set_footer(None)
def build_statusbar(self):
idx = self.buffers.index(self.current_buffer)
lefttxt = '%d: [%s] %s' % (idx, self.current_buffer.typename,
self.current_buffer)
footerleft = urwid.Text(lefttxt, align='left')
righttxt = 'total messages: %d' % self.dbman.count_messages('*')
pending_writes = len(self.dbman.writequeue)
if pending_writes > 0:
righttxt = ('|' * pending_writes) + ' ' + righttxt
footerright = urwid.Text(righttxt, align='right')
columns = urwid.Columns([
footerleft,
('fixed', len(righttxt), footerright)])
return urwid.AttrMap(columns, 'footer')
def apply_command(self, cmd):
if cmd:
if cmd.prehook:
self.logger.debug('calling pre-hook')
try:
cmd.prehook(ui=self, dbm=self.dbman, aman=self.accountman,
log=self.logger, config=config)
except:
self.logger.exception('prehook failed')
self.logger.debug('apply command: %s' % cmd)
cmd.apply(self)
if cmd.posthook:
self.logger.debug('calling post-hook')
try:
cmd.posthook(ui=self, dbm=self.dbman, aman=self.accountman,
log=self.logger, config=config)
except:
self.logger.exception('posthook failed')