""" This file is part of alot. Alot is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Alot is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with notmuch. If not, see . Copyright (C) 2011 Patrick Totzke """ import os import code import logging import threading import subprocess from cmd import Cmd import StringIO import email from email.parser import Parser from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email import Charset from email.header import Header import buffer import settings from db import DatabaseROError from db import DatabaseLockedError from completion import ContactsCompleter from completion import AccountCompleter import helper from message import decode_to_unicode from message import decode_header from message import encode_header class Command: """base class for commands""" def __init__(self, prehook=None, posthook=None, **ignored): self.prehook = prehook self.posthook = posthook self.undoable = False self.help = self.__doc__ def apply(self, caller): pass class ExitCommand(Command): """shuts the MUA down cleanly""" def apply(self, ui): ui.shutdown() class OpenThreadCommand(Command): """open a new thread-view buffer""" def __init__(self, thread=None, **kwargs): self.thread = thread Command.__init__(self, **kwargs) def apply(self, ui): if not self.thread: self.thread = ui.current_buffer.get_selected_thread() ui.logger.info('open thread view for %s' % self.thread) # in case the thread is yet unread, remove this tag if 'unread' in self.thread.get_tags(): self.thread.remove_tags(['unread'], sync_maildir_flags=True) ui.apply_command(FlushCommand()) self.thread.refresh() sb = buffer.SingleThreadBuffer(ui, self.thread) ui.buffer_open(sb) class SearchCommand(Command): """open a new search buffer""" def __init__(self, query, force_new=False, **kwargs): """ @param query initial querystring @param force_new True forces a new buffer """ self.query = query self.force_new = force_new Command.__init__(self, **kwargs) def apply(self, ui): if not self.force_new: open_searches = ui.get_buffers_of_type(buffer.SearchBuffer) to_be_focused = None for sb in open_searches: if sb.querystring == self.query: to_be_focused = sb if to_be_focused: ui.buffer_focus(to_be_focused) else: ui.buffer_open(buffer.SearchBuffer(ui, self.query)) else: ui.buffer_open(buffer.SearchBuffer(ui, self.query)) class PromptCommand(Command): def __init__(self, startstring=u'', **kwargs): self.startstring = startstring Command.__init__(self, **kwargs) def apply(self, ui): ui.commandprompt(self.startstring) class RefreshCommand(Command): """refreshes the current buffer""" def apply(self, ui): ui.current_buffer.rebuild() ui.update() class ExternalCommand(Command): """ calls external command """ def __init__(self, commandstring, spawn=False, refocus=True, in_thread=False, on_success=None, **kwargs): """ :param commandstring: the command to call :type commandstring: str :param spawn: run command in a new terminal :type spawn: boolean :param refocus: refocus calling buffer after cmd termination :type refocus: boolean :param on_success: code to execute after command successfully exited :type on_success: callable """ self.commandstring = commandstring self.spawn = spawn self.refocus = refocus self.in_thread = in_thread self.on_success = on_success Command.__init__(self, **kwargs) def apply(self, ui): callerbuffer = ui.current_buffer def afterwards(data): if callable(self.on_success) and data == 'success': self.on_success() if self.refocus and callerbuffer in ui.buffers: ui.logger.info('refocussing') ui.buffer_focus(callerbuffer) write_fd = ui.mainloop.watch_pipe(afterwards) def thread_code(*args): cmd = self.commandstring if self.spawn: cmd = '%s %s' % (settings.config.get('general', 'terminal_cmd'), cmd) ui.logger.info('calling external command: %s' % cmd) returncode = subprocess.call(cmd, shell=True) if returncode == 0: os.write(write_fd, 'success') if self.in_thread: thread = threading.Thread(target=thread_code) thread.start() else: ui.mainloop.screen.stop() thread_code() ui.mainloop.screen.start() class EditCommand(ExternalCommand): def __init__(self, path, spawn=None, **kwargs): self.path = path if spawn != None: self.spawn = spawn else: self.spawn = settings.config.getboolean('general', 'spawn_editor') editor_cmd = settings.config.get('general', 'editor_cmd') cmd = editor_cmd + ' ' + self.path ExternalCommand.__init__(self, cmd, spawn=self.spawn, in_thread=self.spawn, **kwargs) class PythonShellCommand(Command): """ opens an interactive shell for introspection """ def apply(self, ui): ui.mainloop.screen.stop() code.interact(local=locals()) ui.mainloop.screen.start() class BufferCloseCommand(Command): """ close a buffer @param buffer the selected buffer """ def __init__(self, buffer=None, focussed=False, **kwargs): self.buffer = buffer self.focussed = focussed Command.__init__(self, **kwargs) def apply(self, ui): if self.focussed: self.buffer = ui.current_buffer.get_selected_buffer() elif not self.buffer: self.buffer = ui.current_buffer ui.buffer_close(self.buffer) ui.buffer_focus(ui.current_buffer) class BufferFocusCommand(Command): """ focus a buffer @param buffer the selected buffer """ def __init__(self, buffer=None, offset=0, **kwargs): self.buffer = buffer self.offset = offset Command.__init__(self, **kwargs) def apply(self, ui): if self.offset: idx = ui.buffers.index(ui.current_buffer) num = len(ui.buffers) self.buffer = ui.buffers[(idx + self.offset) % num] else: if not self.buffer: self.buffer = ui.current_buffer.get_selected_buffer() ui.buffer_focus(self.buffer) class OpenBufferListCommand(Command): """ open a bufferlist """ def __init__(self, filtfun=None, **kwargs): self.filtfun = filtfun Command.__init__(self, **kwargs) def apply(self, ui): blists = ui.get_buffers_of_type(buffer.BufferListBuffer) if blists: ui.buffer_focus(blists[0]) else: ui.buffer_open(buffer.BufferListBuffer(ui, self.filtfun)) class TagListCommand(Command): """ open a taglist """ def __init__(self, filtfun=None, **kwargs): self.filtfun = filtfun Command.__init__(self, **kwargs) def apply(self, ui): tags = ui.dbman.get_all_tags() buf = buffer.TagListBuffer(ui, tags, self.filtfun) ui.buffers.append(buf) buf.rebuild() ui.buffer_focus(buf) class OpenEnvelopeCommand(Command): def __init__(self, email=None, **kwargs): self.email = email Command.__init__(self, **kwargs) def apply(self, ui): ui.buffer_open(envelope.EnvelopeBuffer(ui, email=self.email)) class CommandPromptCommand(Command): """ """ def apply(self, ui): ui.commandprompt() class FlushCommand(Command): """ Flushes writes to the index. Retries until committed """ def apply(self, ui): try: ui.dbman.flush() except DatabaseLockedError: timeout = settings.config.getint('general', 'flush_retry_timeout') def f(*args): self.apply(ui) ui.mainloop.set_alarm_in(timeout, f) ui.notify('index locked, will try again in %d secs' % timeout) ui.update() return class ToggleThreadTagCommand(Command): """ """ def __init__(self, tag, thread=None, **kwargs): assert tag self.thread = thread self.tag = tag Command.__init__(self, **kwargs) def apply(self, ui): if not self.thread: self.thread = ui.current_buffer.get_selected_thread() try: if self.tag in self.thread.get_tags(): self.thread.remove_tags([self.tag]) else: self.thread.add_tags([self.tag]) except DatabaseROError: ui.notify('index in read only mode') return # flush index ui.apply_command(FlushCommand()) # update current buffer # TODO: what if changes not yet flushed? cb = ui.current_buffer if isinstance(cb, buffer.SearchBuffer): # refresh selected threadline threadwidget = cb.get_selected_threadline() threadwidget.rebuild() # rebuild and redraw the line #remove line from searchlist if thread doesn't match the query qs = "(%s) AND thread:%s" % (cb.querystring, self.thread.get_thread_id()) msg_count = ui.dbman.count_messages(qs) if ui.dbman.count_messages(qs) == 0: ui.logger.debug('remove: %s' % self.thread) cb.threadlist.remove(threadwidget) cb.result_count -= self.thread.get_total_messages() ui.update() elif isinstance(cb, buffer.SingleThreadBuffer): pass class ComposeCommand(Command): def __init__(self, mail=None, **kwargs): if not mail: self.mail = MIMEMultipart() self.mail.attach(MIMEText('', 'plain', 'UTF-8')) else: self.mail = mail Command.__init__(self, **kwargs) def apply(self, ui): # TODO: fill with default header (per account) # get From header if not 'From' in self.mail: accounts = ui.accountman.get_accounts() if len(accounts) == 0: ui.notify('no accounts set') return elif len(accounts) == 1: a = accounts[0] else: cmpl = AccountCompleter(ui.accountman) fromaddress = ui.prompt(prefix='From>', completer=cmpl, tab=1) validaddresses = [a.address for a in accounts] + [None] while fromaddress not in validaddresses: ui.notify('no account for this address. ( cancels)') fromaddress = ui.prompt(prefix='From>', completer=cmpl) if not fromaddress: ui.notify('canceled') return a = ui.accountman.get_account_by_address(fromaddress) self.mail['From'] = "%s <%s>" % (a.realname, a.address) #get To header if 'To' not in self.mail: to = ui.prompt(prefix='To>', completer=ContactsCompleter()) self.mail['To'] = encode_header('to', to) if settings.config.getboolean('general', 'ask_subject') and \ not 'Subject' in self.mail: subject = ui.prompt(prefix='Subject>') self.mail['Subject'] = encode_header('subject', subject) ui.apply_command(envelope.EnvelopeEditCommand(mail=self.mail)) class RetagPromptCommand(Command): """start a commandprompt to retag selected threads' tags""" def apply(self, ui): thread = ui.current_buffer.get_selected_thread() initial_tagstring = ','.join(thread.get_tags()) ui.commandprompt('retag ' + initial_tagstring) class RetagCommand(Command): """tag selected thread""" def __init__(self, tagsstring=u'', **kwargs): self.tagsstring = tagsstring Command.__init__(self, **kwargs) def apply(self, ui): thread = ui.current_buffer.get_selected_thread() initial_tagstring = ','.join(thread.get_tags()) tags = filter(lambda x: x, self.tagsstring.split(',')) ui.logger.info("got %s:%s" % (self.tagsstring, tags)) try: thread.set_tags(tags) except DatabaseROError, e: ui.notify('index in read-only mode') return # flush index ui.apply_command(FlushCommand()) # refresh selected threadline sbuffer = ui.current_buffer threadwidget = sbuffer.get_selected_threadline() threadwidget.rebuild() # rebuild and redraw the line class RefineCommand(Command): """refine the query of the currently open searchbuffer""" def __init__(self, query=None, **kwargs): self.querystring = query Command.__init__(self, **kwargs) def apply(self, ui): sbuffer = ui.current_buffer oldquery = sbuffer.querystring if self.querystring not in [None, oldquery]: sbuffer.querystring = self.querystring sbuffer = ui.current_buffer sbuffer.rebuild() ui.update() class RefinePromptCommand(Command): """prompt to change current search buffers query""" def apply(self, ui): sbuffer = ui.current_buffer oldquery = sbuffer.querystring ui.commandprompt('refine ' + oldquery) class ReplyCommand(Command): def __init__(self, groupreply=False, **kwargs): self.groupreply = groupreply Command.__init__(self, **kwargs) def apply(self, ui): msg = ui.current_buffer.get_selected_message() mail = msg.get_email() # set body text mailcontent = '\nOn %s, %s wrote:\n' % (msg.get_datestring(), msg.get_author()[0]) for line in msg.accumulate_body().splitlines(): mailcontent += '>' + line + '\n' Charset.add_charset('utf-8', Charset.QP, Charset.QP, 'utf-8') bodypart = MIMEText(mailcontent.encode('utf-8'), 'plain', 'UTF-8') reply = MIMEMultipart() reply.attach(bodypart) # copy subject subject = mail['Subject'] if not subject.startswith('Re:'): subject = 'Re: ' + subject reply['Subject'] = Header(subject.encode('utf-8'), 'UTF-8').encode() # set From my_addresses = ui.accountman.get_account_addresses() matched_address = '' in_to = [a for a in my_addresses if a in mail['To']] if in_to: matched_address = in_to[0] else: cc = mail.get('Cc', '') + mail.get('Bcc', '') in_cc = [a for a in my_addresses if a in cc] if in_cc: matched_address = in_cc[0] if matched_address: account = ui.accountman.get_account_by_address(matched_address) fromstring = '%s <%s>' % (account.realname, account.address) reply['From'] = encode_header('From', fromstring) # set To #reply['To'] = Header(mail['From'].encode('utf-8'), 'UTF-8').encode() del(reply['To']) if self.groupreply: cleared = self.clear_my_address(my_addresses, mail['To']) if cleared: reply['To'] = mail['From'] + ', ' + cleared else: reply['To'] = mail['From'] # copy cc and bcc for group-replies if 'Cc' in mail: reply['Cc'] = self.clear_my_address(my_addresses, mail['Cc']) if 'Bcc' in mail: reply['Bcc'] = self.clear_my_address(my_addresses, mail['Bcc']) else: reply['To'] = mail['From'] # set In-Reply-To header del(reply['In-Reply-To']) reply['In-Reply-To'] = msg.get_message_id() # set References header old_references = mail['References'] if old_references: old_references = old_references.split() references = old_references[-8:] if len(old_references) > 8: references = old_references[:1] + references references.append(msg.get_message_id()) reply['References'] = ' '.join(references) ui.apply_command(ComposeCommand(mail=reply)) def clear_my_address(self, my_addresses, value): new_value = [] for entry in value.split(','): if not [a for a in my_addresses if a in entry]: new_value.append(entry) return ','.join(new_value) class BounceMailCommand(Command): def apply(self, ui): msg = ui.current_buffer.get_selected_message() mail = msg.get_email() del(mail['To']) ui.apply_command(ComposeCommand(mail=mail)) import envelope