"""
This file is part of alot.
Alot is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
Notmuch is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with notmuch. If not, see .
Copyright (C) 2011 Patrick Totzke
"""
import urwid
import os
from urwid.command_map import command_map
from settings import config
from buffer import BufferlistBuffer
from command import commandfactory
from command import interpret_commandline
from widgets import CompleteEdit
from completion import CommandLineCompleter
class MainWidget(urwid.Frame):
def __init__(self, ui, *args, **kwargs):
urwid.Frame.__init__(self, urwid.SolidFill(' '), *args, **kwargs)
self.ui = ui
def keypress(self, size, key):
self.ui.logger.debug('got key: %s' % key)
cmdline = config.get_mapping(self.ui.mode, key)
if cmdline:
cmd = interpret_commandline(cmdline, self.ui.mode)
if cmd:
self.ui.apply_command(cmd)
else:
urwid.Frame.keypress(self, size, key)
class UI:
buffers = []
current_buffer = None
def __init__(self, dbman, log, accountman, initialquery, colourmode):
self.dbman = dbman
self.dbman.ui = self # register ui with dbman
self.logger = log
self.accountman = accountman
if not colourmode:
colourmode = config.getint('general', 'colourmode')
self.logger.info('setup gui in %d colours' % colourmode)
self.mainframe = MainWidget(self)
self.mainloop = urwid.MainLoop(self.mainframe,
config.get_palette(),
handle_mouse=False,
unhandled_input=self.keypress)
self.mainloop.screen.set_terminal_properties(colors=colourmode)
self.show_statusbar = config.getboolean('general', 'show_statusbar')
self.notificationbar = None
self.mode = ''
self.commandprompthistory = []
self.logger.debug('setup bindings')
cmd = commandfactory('search', query=initialquery)
self.apply_command(cmd)
self.mainloop.run()
def keypress(self, key):
self.logger.debug('unhandeled input: %s' % key)
def prompt(self, prefix='>', text=u'', completer=None, tab=0, history=[]):
"""prompt for text input
:param prefix: text to print before the input field
:type prefix: str
:param text: initial content of the input field
:type text: str
:param completer: completion object to use
:type completer: `alot.completion.Completer`
:param tab: number of tabs to press initially
(to select completion results)
:type tab: int
:param history: history to be used for up/down keys
:type history: list of str
"""
self.logger.info('open prompt')
history = list(history) # make a local copy
historypos = None
leftpart = urwid.Text(prefix, align='left')
if completer:
editpart = CompleteEdit(completer, edit_text=text)
for i in range(tab):
editpart.keypress((0,), 'tab')
else:
editpart = urwid.Edit(edit_text=text)
both = urwid.Columns(
[
('fixed', len(prefix), leftpart),
('weight', 1, editpart),
])
prompt_widget = urwid.AttrMap(both, 'prompt', 'prompt')
footer = self.mainframe.get_footer()
self.mainframe.set_footer(prompt_widget)
self.mainframe.set_focus('footer')
self.mainloop.draw_screen()
while True:
keys = None
while not keys:
keys = self.mainloop.screen.get_input()
for key in keys:
self.logger.debug('prompt got key: %s' % key)
if command_map[key] == 'select':
self.mainframe.set_footer(footer)
self.mainframe.set_focus('body')
return editpart.get_edit_text()
elif command_map[key] == 'cancel':
self.mainframe.set_footer(footer)
self.mainframe.set_focus('body')
return None
elif key in ['up', 'down']:
if history:
if historypos == None:
history.append(editpart.get_edit_text())
historypos = len(history) - 1
if key == 'cursor up':
historypos = (historypos - 1) % len(history)
else:
historypos = (historypos + 1) % len(history)
editpart.set_edit_text(history[historypos])
self.mainloop.draw_screen()
else:
size = (20,) # don't know why they want a size here
editpart.keypress(size, key)
self.mainloop.draw_screen()
def commandprompt(self, startstring):
"""prompt for a commandline and interpret/apply it upon enter
:param startstring: initial text in edit part
:type startstring: str
"""
self.logger.info('open command shell')
mode = self.current_buffer.typename
cmdline = self.prompt(prefix=':',
text=startstring,
completer=CommandLineCompleter(self.dbman,
self.accountman,
mode),
history=self.commandprompthistory,
)
if cmdline:
self.interpret_commandline(cmdline)
def interpret_commandline(self, cmdline):
"""interpret and apply a commandstring
:param cmdline: command string to apply
:type cmdline: str
"""
mode = self.current_buffer.typename
self.commandprompthistory.append(cmdline)
cmd = interpret_commandline(cmdline, mode)
if cmd:
self.apply_command(cmd)
else:
self.notify('invalid command')
def buffer_open(self, b):
"""
register and focus new buffer
"""
self.buffers.append(b)
self.buffer_focus(b)
def buffer_close(self, buf):
buffers = self.buffers
if buf not in buffers:
string = 'tried to close unknown buffer: %s. \n\ni have:%s'
self.logger.error(string % (buf, self.buffers))
elif len(buffers) == 1:
self.logger.info('closing the last buffer, exiting')
cmd = commandfactory('exit')
self.apply_command(cmd)
else:
if self.current_buffer == buf:
self.logger.debug('UI: closing current buffer %s' % buf)
index = buffers.index(buf)
buffers.remove(buf)
self.current_buffer = buffers[index % len(buffers)]
else:
string = 'closing buffer %d:%s'
self.logger.debug(string % (buffers.index(buf), buf))
index = buffers.index(buf)
buffers.remove(buf)
def buffer_focus(self, buf):
"""
focus given buffer. must be contained in self.buffers
"""
if buf not in self.buffers:
self.logger.error('tried to focus unknown buffer')
else:
self.current_buffer = buf
self.mode = buf.typename
if isinstance(self.current_buffer, BufferlistBuffer):
self.current_buffer.rebuild()
self.update()
def get_deep_focus(self, startfrom=None):
if not startfrom:
startfrom = self.current_buffer
if 'get_focus' in dir(startfrom):
focus = startfrom.get_focus()
if isinstance(focus, tuple):
focus = focus[0]
if isinstance(focus, urwid.Widget):
return self.get_deep_focus(startfrom=focus)
return startfrom
def get_buffers_of_type(self, t):
"""returns currently open buffers for a given subclass of
`alot.buffer.Buffer`
"""
return filter(lambda x: isinstance(x, t), self.buffers)
def clear_notify(self, messages):
"""clears notification popups. Usually called in order
to ged rid of messages that don't time out
:param messages: The popups to remove. This should be exactly
what notify() returned
"""
footer = self.mainframe.get_footer()
newpile = self.notificationbar.widget_list
for l in messages:
newpile.remove(l)
if newpile:
self.notificationbar = urwid.Pile(newpile)
else:
self.notificationbar = None
self.update()
def choice(self, message, choices={'yes': ['y'], 'no': ['n']}):
"""prompt user to make a choice
:param message: string to display before list of choices
:type message: unicode
:param choices: dict of possible choices
:type choices: str->list of keys
"""
def build_line(msg, prio):
cols = urwid.Columns([urwid.Text(msg)])
return urwid.AttrMap(cols, 'notify_' + prio)
cstrings = ['(%s):%s' % ('/'.join(v), k) for k, v in choices.items()]
line = ', '.join(cstrings)
msgs = [build_line(message + ' ' + line, 'normal')]
footer = self.mainframe.get_footer()
if not self.notificationbar:
self.notificationbar = urwid.Pile(msgs)
else:
newpile = self.notificationbar.widget_list + msgs
self.notificationbar = urwid.Pile(newpile)
self.update()
self.mainloop.draw_screen()
while True:
result = self.mainloop.screen.get_input()
self.logger.info('got: %s ' % result)
if not result:
self.clear_notify(msgs)
self.mainloop.screen.get_input()
return None
for k, v in choices.items():
if result[0] in v:
self.clear_notify(msgs)
return k
def notify(self, message, priority='normal', timeout=0, block=False):
"""notify popup
:param message: message to print
:type message: str
:param priority: priority string, used to format the popup: currently,
'normal' and 'error' are defined. If you use 'X' here,
the attribute 'notify_X' is used to format the popup.
:type priority: str
:param timeout: seconds until message disappears. Defaults to the value
of 'notify_timeout' in the general config section.
A negative value means never time out.
:type timeout: int
:param block: this notification blocks until a keypress is made
:type block: boolean
"""
def build_line(msg, prio):
cols = urwid.Columns([urwid.Text(msg)])
return urwid.AttrMap(cols, 'notify_' + prio)
msgs = [build_line(message, priority)]
if timeout == -1 and block:
msgs.append(build_line('(hit any key to proceed)', 'normal'))
footer = self.mainframe.get_footer()
if not self.notificationbar:
self.notificationbar = urwid.Pile(msgs)
else:
newpile = self.notificationbar.widget_list + msgs
self.notificationbar = urwid.Pile(newpile)
self.update()
def clear(*args):
self.clear_notify(msgs)
self.mainloop.draw_screen()
if block:
keys = self.mainloop.screen.get_input()
clear()
else:
if timeout >= 0:
if timeout == 0:
timeout = config.getint('general', 'notify_timeout')
self.mainloop.set_alarm_in(timeout, clear)
return msgs[0]
def update(self):
"""
redraw interface
"""
#who needs a header?
#head = urwid.Text('notmuch gui')
#h=urwid.AttrMap(head, 'header')
#self.mainframe.set_header(h)
#body
self.mainframe.set_body(self.current_buffer)
#footer
lines = []
if self.notificationbar: # .get_text()[0] != ' ':
lines.append(self.notificationbar)
if self.show_statusbar:
lines.append(self.build_statusbar())
if lines:
self.mainframe.set_footer(urwid.Pile(lines))
else:
self.mainframe.set_footer(None)
def build_statusbar(self):
idx = self.buffers.index(self.current_buffer)
lefttxt = '%d: [%s] %s' % (idx, self.current_buffer.typename,
self.current_buffer)
footerleft = urwid.Text(lefttxt, align='left')
righttxt = 'total messages: %d' % self.dbman.count_messages('*')
pending_writes = len(self.dbman.writequeue)
if pending_writes > 0:
righttxt = ('|' * pending_writes) + ' ' + righttxt
footerright = urwid.Text(righttxt, align='right')
columns = urwid.Columns([
footerleft,
('fixed', len(righttxt), footerright)])
return urwid.AttrMap(columns, 'footer')
def apply_command(self, cmd):
if cmd:
if cmd.prehook:
self.logger.debug('calling pre-hook')
try:
cmd.prehook(self, self.dbman, self.accountman, config)
except:
self.logger.exception('prehook failed')
self.logger.debug('apply command: %s' % cmd)
cmd.apply(self)
if cmd.posthook:
self.logger.debug('calling post-hook')
try:
cmd.posthook(self, self.dbman, self.accountman, config)
except:
self.logger.exception('posthook failed')