import os.path from ._path import AbsPath # oct-unescape certain characters as they are in /proc/mountinfo # see seq_path[_root]() in linux source def _oct_unescape(b): trans = { br'\040' : b' ', br'\011' : b'\t', br'\012' : b'\n', br'\0134' : b'\\', } for src, dst in trans.items(): b = b.replace(src, dst) return b class _MountEntry: "mount ID, bytes" mount_id = None "parent ID, bytes" parent_id = None "device number, int" devnum = None "root of the mount, bytes" root = None "path where the fs is mounted, bytes" mount_point = None "mount options bytes" mount_opts = None "optional fields, list of bytes" opt_fields = None "filesystem type, bytes or None" fstype = None "mount source, bytes or None" source = None "superblock options, bytes or None" super_opts = None "raw mountinfo line, bytes" raw_entry = None def __init__(self, line): self.raw_entry = line items = line.split() if len(items) < 6: raise ValueError('Not enough items in a mountinfo line', line) mount_id, parent_id, devnum, root, mount_point, mount_opts = items[:6] self.mount_id = mount_id self.parent_id = parent_id major, minor = map(int, devnum.split(b':')) if major < 0 or major > 255 or minor < 0 or minor > 255: raise ValueError('Invalid device number', devnum) self.devnum = (major << 8) + minor self.root = AbsPath(_oct_unescape(root)) self.mount_point = AbsPath(_oct_unescape(mount_point)) self.mount_opts = mount_opts remainder = items[6:] opt_fields = [] while len(remainder) > 0 and remainder[0] != b'-': opt_fields.append(remainder.pop(0)) self.opt_fields = opt_fields if len(remainder) > 0: sep = remainder.pop(0) if sep != b'-': raise ValueError('Expected separator "-", got', sep) if len(remainder) > 0: self.fstype = remainder.pop(0) if len(remainder) > 0: self.source = _oct_unescape(remainder.pop(0)) if len(remainder) > 0: self.super_opts = remainder.pop(0) def __str__(self): return '({major}:{minor}){src}{root}->{mp}'.format( major = self.devnum >> 8, minor = self.devnum & 255, src = ("{%s}" % self.source) if self.source else "", root = "[%s]" % self.root, mp = self.mount_point) class MountInfo: """ A wrapper around the contents of the Linux /proc//mountinfo file. """ "a list of _MountEntry" entries = None def __init__(self, data): self.entries = [] for line in data.splitlines(): self.entries.append(_MountEntry(line)) def __str__(self): return '%s(%d entries)' % (self.__class__.__name__, len(self.entries)) def entries_for_mountpoint(self, mountpoint): """ Iterate over all mountinfo entries mounted at the given mountpoint. """ return filter(lambda entry: entry.mount_point == mountpoint, self.entries) def mountpoint_for_path(self, path): """ Find the longest mountpoint that is a parent of path. """ best_match = None for e in self.entries: if (path in e.mount_point and (best_match is None or len(best_match) < len(e.mount_point))): best_match = e.mount_point if best_match is None: raise LookupError('No mountpoint for', path) print(best_match, len(best_match)) return best_match