#
# This file is part of Notmuch.
#
# Notmuch is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Notmuch is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Notmuch. If not, see .
#
# python-notmuch wrapper for the notmuch vim client
import codecs
import datetime
import email, email.header, email.charset, email.utils, email.message
import email.mime.text, email.mime.audio, email.mime.image, email.mime.multipart, email.mime.application
import mailbox
import mailcap
import mimetypes
import notmuch
import os, os.path
import Queue
import shlex
import smtplib
import subprocess
import tempfile
import threading
import vim
#### classes ####
class NMBuffer(object):
"""
An object mapping line ranges in current buffer to notmuch structures.
"""
# a string identifying the buffer
id = None
# a list of NMBufferElement subclasses representing objects in current buffer
objects = None
def __new__(cls, *args, **kwargs):
bufnr = vim.eval('bufnr("%")')
if bufnr in nm_buffers:
return nm_buffers[bufnr]
ret = object.__new__(cls)
nm_buffers[bufnr] = ret
return ret
def __init__(self):
self.objects = []
def get_object(self, line, offset):
"""
Get an object that's offset objects away from given line or None.
E.g. offset = 0 gets the object on the line, offset = 1 gets
next, offset = -1 previous.
"""
if not self.objects:
return None
if offset > 0 and line < self.objects[0].start:
return self.objects[min(len(self.objects), offset) - 1]
if offset < 0 and line > self.objects[-1].end:
return self.objects[-min(len(self.objects), offset)]
for i in xrange(len(self.objects)):
obj = self.objects[i]
if line >= obj.start and line <= obj.end:
return self.objects[ max(min(len(self.objects) - 1, i + offset), 0) ]
def tag(self, tags, querystr = None):
if querystr:
querystr = '( %s ) and ( %s )'%(self.id, querystr)
else:
querystr = self.id
db = notmuch.Database(mode = notmuch.Database.MODE.READ_WRITE)
map_query(db, querystr, lambda m, l: self._tag_message(m, tags))
def _tag_message(self, message, tags):
for tag in tags:
if tag[0] == '+':
message.add_tag(tag[1:])
elif tag[0] == '-':
message.remove_tag(tag[1:])
class SavedSearches(NMBuffer):
"""
This buffer displays a list of saved searches ('folders').
"""
def __init__(self, folders):
"""
@param folders A list of (folder name, query string) tuples.
"""
super(SavedSearches, self).__init__()
for folder in folders:
self.objects.append(Folder(0, 0, folder[1], folder[0]))
self.refresh()
def refresh(self):
b = vim.current.buffer
db = notmuch.Database()
for obj in self.objects:
q = db.create_query(obj.id)
q1 = db.create_query('( %s ) and tag:unread'%(obj.id))
b.append('{0:>7} {1:7} {2: <30s} ({3})'.format(q.count_messages(), '({0})'.format(q1.count_messages()),
obj.name, obj.id))
obj.start = obj.end = len(b) - 1
def __repr__(self):
return ''
def tag(self):
raise TypeError('Attempted to tag in folders view.')
class Search(NMBuffer):
"""
This buffer displays results of a db search -- a list of found threads.
"""
# a fifo of pending results that should be inserted into vim buffer
# written to by _refresh_thread, read by process_results
_results = None
_search_thread = None
_finish = 0
def __init__(self, querystr = '', parent = None):
super(Search, self).__init__()
assert(querystr or parent)
# FIXME simplify
if parent:
self.id = parent.id
if querystr:
if self.id:
self.id = '( %s ) and ( %s )'%(self.id, querystr)
else:
self.id = querystr
self.refresh()
def refresh(self):
if self._search_thread:
self._finish = 1
self._search_thread.join()
self._finish = 0
self.objects = []
self._results = Queue.Queue()
vim.command('augroup nm_vimpy')
vim.command('autocmd CursorMoved python nm_vim.get_current_buffer().process_results()')
vim.command('autocmd CursorHold python nm_vim.get_current_buffer().process_results()')
vim.command('augroup END')
self._search_thread = threading.Thread(target = self._refresh_thread)
self._search_thread.start()
def _refresh_thread(self):
db = notmuch.Database()
q = db.create_query(self.id)
q.set_sort(q.SORT.NEWEST_FIRST) # FIXME allow different search orders
for t in q.search_threads():
if self._finish:
break
datestr = get_relative_date(t.get_newest_date())
authors = t.get_authors()
# notmuch returns different thread subjects depending on
# sort order
# we assume that 'thread subject' == 'subject of the first mail in the thread'
subj = None
for m in t.get_toplevel_messages():
subj = m.get_header('subject')
break
if not subj:
subj = t.get_subject()
tags = ' '.join(t.get_tags())
line = (u'%-12s %3s/%3s %-20.20s | %s (%s)'%(datestr, t.get_matched_messages(), t.get_total_messages(),
authors, subj, tags)).encode('utf-8')
self._results.put((line, t.get_thread_id()))
def process_results(self):
b = vim.current.buffer
vim.command('setlocal modifiable')
try:
while True:
start = len(b)
line, tid = self._results.get(block = False)
b.append(line)
self.objects.append(NMBufferElement(start, len(b) - 1, tid))
except Queue.Empty:
pass
vim.command('setlocal nomodifiable')
if self._search_thread and not self._search_thread.is_alive():
vim.command('autocmd! nm_vimpy')
self._results = None
self._search_thread = None
def __repr__(self):
return ''%(self.id)
class ShowThread(NMBuffer):
"""
This buffer represents a thread view.
"""
# a list of temporary files for viewing attachments
# they will be automagically closed and deleted on this object's demise
_tmpfiles = None
# a list of headers to show
_headers = None
def __init__(self, querystr):
self.id = querystr
self._tmpfiles = []
self.refresh()
def refresh(self):
self._headers = vim.eval('g:nm_vimpy_show_headers')
self.objects = []
db = notmuch.Database()
map_query(db, self.id, self._print_message)
def _get_attachment(self, line):
message = self.get_object(line, 0)
if not message:
print 'No message on this line.'
return None
data = None
for a in message.attachments:
if line == a.start:
return a.data
print 'No attachment on this line'
def view_attachment(self, line):
"""
View attachment corresponding to given line.
"""
data = self._get_attachment(line)
if not data:
return
f = tempfile.NamedTemporaryFile()
f.write(data.get_payload(decode = True))
f.flush()
os.fsync(f.fileno())
caps = mailcap.getcaps()
ret = mailcap.findmatch(caps, data.get_content_type(), filename = f.name)
if ret[0]:
with open(os.devnull, 'w') as null:
subprocess.Popen(shlex.split(ret[0]), stderr = null, stdout = null)
self._tmpfiles.append(f)
def save_attachment(self, line, filename):
"""
Save attachment corresponding to given line to the specified file.
"""
data = self._get_attachment(line)
if not data:
return
if os.path.isdir(filename):
filename = os.path.join(filename, data.get_filename())
with open(os.path.expanduser(filename), 'w') as f:
f.write(data.get_payload(decode = True))
def _read_text_payload(self, part):
"""
Try converting the payload of the MIME part into utf-8.
"""
p = part.get_payload(decode = True)
ch = part.get_content_charset('utf-8')
try:
codec = codecs.lookup(ch)
except LookupError:
print 'Unknown encoding: %s'%ch
return p
if codec.name != 'utf-8':
try:
p = p.decode(codec.name).encode('utf-8')
except UnicodeDecodeError:
return 'Error decoding the message.'
return p
def _print_part(self, part):
"""
Walk through the part recursively and print it
and its subparts to current buffer.
"""
if part.is_multipart():
if part.get_content_subtype() == 'alternative':
# try to find the plaintext version, if that fails just try to print the first
for subpart in part.get_payload():
if subpart.get_content_type() == 'text/plain':
return self._print_part(subpart)
self._print_part(part.get_payload()[0])
else:
for subpart in part.get_payload():
self._print_part(subpart)
else:
b = vim.current.buffer
if part.get('Content-Disposition', '').lower().startswith('attachment'):
self.objects[-1].attachments.append(Attachment(len(b), len(b), part))
b.append(('[ Attachment: %s (%s)]'%(part.get_filename(), part.get_content_type())).split('\n'))
if part.get_content_maintype() == 'text':
p = self._read_text_payload(part)
b.append(p.split('\n'))
def _print_message(self, message, level):
b = vim.current.buffer
msg = Message(len(b), 0, message.get_message_id())
self.objects.append(msg)
msg.tags = list(message.get_tags())
fp = open(message.get_filename())
email_msg = email.message_from_file(fp)
fp.close()
# print the title
b.append('%d/'%level + 20*'-' + 'message start' + 20*'-' + '\\')
name, addr = email.utils.parseaddr(message.get_header('from'))
author = name if name else addr
titlestr = '%s: %s (%s) (%s)'%(author, message.get_header('subject'), get_relative_date(message.get_date()),
' '.join(message.get_tags()))
b.append(titlestr.encode('utf-8'))
# print the headers
for header in self._headers:
if header in email_msg:
b.append(('%s: %s'%(header, decode_header(email_msg[header]))).split('\n'))
b.append('')
self._print_part(email_msg)
b.append('\\' + 20*'-' + 'message end' + 20*'-' + '/')
msg.end = len(b) - 1
def __repr__(self):
return ''%self.id
def get_message_for_tag(self, tag):
"""
Return the first message for given tag.
"""
for msg in self.objects:
if tag in msg.tags:
return msg
return None
class Compose(NMBuffer):
_attachment_prefix = 'Notmuch-Attachment: '
def __init__(self):
super(Compose, self).__init__()
# python wants to use base64 for some reason, force quoted-printable
email.charset.add_charset('utf-8', email.charset.QP, email.charset.QP, 'utf-8')
def _encode_header(self, text):
try:
text.decode('ascii')
return text
except UnicodeDecodeError:
return email.header.Header(text, 'utf-8').encode()
def attach(self, filename):
type, encoding = mimetypes.guess_type(filename)
if encoding or not type:
type = 'application/octet-stream'
vim.current.buffer.append('%s%s:%s'%(self._attachment_prefix, filename, type), 0)
def send(self):
"""
Send the message in current buffer.
"""
b = vim.current.buffer
i = 0
# parse attachments
attachments = []
while b[i].startswith(self._attachment_prefix):
filename, sep, type = b[i][len(self._attachment_prefix):].rpartition(':')
attachments.append((os.path.expanduser(filename), type))
i += 1
# skip the inline help
while b[i].startswith('Notmuch-Help:'):
i += 1
# add the headers
headers = {}
recipients = []
from_addr = None
while i < len(b):
if not b[i]:
break
key, sep, val = b[i].partition(':')
i += 1
try:
key.decode('ascii')
except UnicodeDecodeError:
raise ValueError('Header name must be ASCII only.')
if not val.strip():
# skip empty headers
continue
if key.lower() in ('to', 'cc', 'bcc'):
names, addrs = zip(*email.utils.getaddresses([val]))
names = map(self._encode_header, names)
recipients += addrs
if key.lower() == 'bcc':
continue
val = ','.join(map(email.utils.formataddr, zip(names, addrs)))
else:
if key.lower() == 'from':
from_addr = email.utils.parseaddr(val)[1]
val = self._encode_header(val)
headers[key] = val
body = email.mime.text.MIMEText('\n'.join(b[i:]), 'plain', 'utf-8')
# add the body
if not attachments:
msg = body
else:
msg = email.mime.multipart.MIMEMultipart()
msg.attach(body)
for attachment in attachments:
maintype, subtype = attachment[1].split('/', 1)
if maintype == 'text':
with open(attachment[0]) as f:
part = email.mime.text.MIMEText(f.read(), subtype, 'utf-8')
else:
if maintype == 'image':
obj = email.mime.image.MIMEImage
elif maintype == 'audio':
obj = email.mime.audio.MIMEAudio
else:
obj = email.mime.application.MIMEApplication
with open(attachment[0]) as f:
part = obj(f.read(), subtype)
part.add_header('Content-Disposition', 'attachment',
filename = os.path.basename(attachment[0]))
msg.attach(part)
msg['User-Agent'] = vim.eval('g:nm_vimpy_user_agent')
msg['Message-ID'] = email.utils.make_msgid()
msg['Date'] = email.utils.formatdate(localtime = True)
for key in headers:
msg[key] = headers[key]
# sanity checks
if not from_addr:
# XXX notmuch-python should export the user email address
raise ValueError('No sender address specified.')
if not recipients:
raise ValueError('No recipient specified.')
# send
fcc = vim.eval('g:nm_vimpy_fcc_maildir')
if fcc:
dbroot = notmuch.Database().get_path()
mdir = mailbox.Maildir(os.path.join(dbroot, fcc))
mdir.add(msg)
mdir.close()
# TODO configurable host
s = smtplib.SMTP('localhost')
ret = s.sendmail(from_addr, recipients, msg.as_string())
for key in ret:
print 'Error sending mail to %s: %s'%(key, ret[key])
s.quit()
class RawMessage(NMBuffer):
def __init__(self, msgid):
db = notmuch.Database()
msg = db.find_message(msgid)
if msg:
with open(msg.get_filename()) as fp:
vim.current.buffer.append(fp.readlines())
class NMBufferElement(object):
"""
This object represents a structure (e.g. folder, thread or message)
corresponding to a range of lines in NMBuffer.
"""
# start and end lines, 0-based
start = None
end = None
# a string identifying the element
id = None
def __init__(self, start, end, id):
self.start = start
self.end = end
self.id = id
class Folder(NMBufferElement):
"""
A saved search / 'folder'.
"""
# folder name
name = None
def __init__(self, start, end, id, name):
super(Folder, self).__init__(start, end, id)
self.name = name
class Message(NMBufferElement):
attachments = None
tags = None
def __init__(self, start, end, id):
super(Message, self).__init__(start, end, id)
self.attachments = []
class Attachment(NMBufferElement):
data = None
def __init__(self, start, end, data):
super(Attachment, self).__init__(start, end, '')
self. data = data
#### global variables ####
# this dictionary stores the Python objects corresponding to notmuch-managed
# buffers, each indexed by the buffer number
nm_buffers = {}
#### utility functions ####
def get_current_buffer():
"""
Get the NMBuffer object associated with current buffer or None.
"""
try:
return nm_buffers[vim.eval('bufnr("%")')]
except KeyError:
return None
def delete_current_buffer():
"""
Delete the NMBuffer associated with current buffer.
"""
del nm_buffers[vim.eval('bufnr("%")')]
def get_relative_date(timestamp):
"""
Format a nice representation of 'time' relative to the current time.
Examples include:
5 mins. ago (For times less than 60 minutes ago)
Today 12:30 (For times >60 minutes but still today)
Yest. 12:30
Mon. 12:30 (Before yesterday but fewer than 7 days ago)
October 12 (Between 7 and 180 days ago (about 6 months))
2008-06-30 (More than 180 days ago)
Shamelessly lifted from notmuch-time
TODO: this should probably be in python-notmuch
"""
try:
now = datetime.datetime.now()
then = datetime.datetime.fromtimestamp(timestamp)
except ValueError:
return 'when?'
if then > now:
return 'the future'
delta = now - then
if delta.days > 180:
return then.strftime("%F") # 2008-06-30
total_seconds = delta.seconds + delta.days * 24 * 3600
if total_seconds < 3600:
return "%d min. ago"%(total_seconds / 60)
if delta.days < 7:
if then.day == now.day:
return then.strftime("Today %R") # Today 12:30
if then.day + 1 == now.day:
return then.strftime("Yest. %R") # Yest. 12:30
return then.strftime("%a. %R") # Mon. 12:30
return then.strftime("%B %d") # October 12
def map_query(db, query, run):
"""
Execute runnable run on every message found for the specified query.
"""
def walk_query(message, run, level):
run(message, level)
level += 1
replies = message.get_replies()
if replies is not None:
for reply in replies:
walk_query(reply, run, level)
q = db.create_query(query)
for t in q.search_threads():
for m in t.get_toplevel_messages():
walk_query(m, run, 0)
def decode_header(header):
"""
Decode a RFC 2822 header into a utf-8 string.
"""
ret = []
for part in email.header.decode_header(header):
if part[1]:
try:
ret.append(part[0].decode(part[1]).encode('utf-8'))
continue
except LookupError:
print 'Unknown encoding: %s'%part[1]
ret.append(part[0])
return ' '.join(ret)
#### functions for exporting stuff to viml ####
def vim_get_tags():
"""
Export a string listing all tags in the database, one per line into
a vim variable named 'taglist'.
"""
db = notmuch.Database()
tags = '\n'.join(db.get_all_tags())
cmd = (u'let taglist = \'%s\''%tags).encode('utf-8')
vim.command(cmd)
def vim_get_object(line, offset):
"""
Export start/end lines and id of the object that's offset objects
aways from given line into a dict named 'obj' in viml.
E.g. offset = 0 gets the object on the line, offset = 1 gets
next, offset = -1 previous.
This method is a noop if current line doesn't correspond to anything.
"""
# vim lines are 1-based
line -= 1
assert line >= 0
obj = get_current_buffer().get_object(line, offset)
if obj:
vim.command('let obj = { "start" : %d, "end" : %d, "id" : "%s" }'%(
obj.start + 1, obj.end + 1, obj.id))
def vim_get_id():
"""
Export the id string of current buffer into a vim string named 'buf_id'.
"""
id = get_current_buffer().id
vim.command('let buf_id = "%s"'%(id if id else ''))
def vim_view_attachment(line):
"""
View an attachment corresponding to the given vim line
in an external viewer.
"""
line -= 1 # vim lines are 1-based
get_current_buffer().view_attachment(line)
def vim_save_attachment(line, filename):
"""
Save an attachment corresponding to the given vim line into given file.
"""
line -= 1 # vim lines are 1-based
get_current_buffer().save_attachment(line, filename)
def vim_get_message_for_tag(tag):
"""
Export start line for the first message with the given tag into
a vim variable named 'start'.
"""
msg = get_current_buffer().get_message_for_tag(tag)
if msg:
start = msg.start + 1
else:
start = 0
vim.command('let start = %d'%start)