from collections import OrderedDict import os.path from ._path import AbsPath # oct-unescape certain characters as they are in /proc/mountinfo # see seq_path[_root]() in linux source def _oct_unescape(b): trans = { br'\040' : b' ', br'\011' : b'\t', br'\012' : b'\n', br'\0134' : b'\\', } for src, dst in trans.items(): b = b.replace(src, dst) return b class NotAbsRoot(Exception): # This exception is raised for mount entries whose root is not # a valid absolute path (this happens e.g. for persistent namespaces bound # to a file). Such entries are skipped from MountInfo pass class _MountEntry: index = None "index of this entry in mountinfo" mount_id = None "mount ID, bytes" parent_id = None "parent ID, bytes" devnum = None "device number, int" root = None "root of the mount, AbsPath" mount_point = None "path where the fs is mounted, AbsPath" mount_opts = None "mount options bytes" opt_fields = None "optional fields, list of bytes" fstype = None "filesystem type, bytes or None" source = None "mount source, bytes or None" super_opts = None "superblock options, bytes or None" raw_entry = None "raw mountinfo line, bytes" parent = None "parent mountinfo entry, None if it does not exit" def __init__(self, index, line): self.index = index self.raw_entry = line items = line.split() if len(items) < 6: raise ValueError('Not enough items in a mountinfo line', line) mount_id, parent_id, devnum, root, mount_point, mount_opts = items[:6] self.mount_id = mount_id self.parent_id = parent_id major, minor = map(int, devnum.split(b':')) # see Documentation/admin-guide/devices.txt in linux source if major < 0 or major > 511 or minor < 0 or minor > 255: raise ValueError('Invalid device number', devnum) self.devnum = (major << 8) + minor root = _oct_unescape(root) if len(root) < 1 or not root.startswith(b'/'): raise NotAbsRoot() self.root = AbsPath(root) self.mount_point = AbsPath(_oct_unescape(mount_point)) self.mount_opts = mount_opts remainder = items[6:] opt_fields = [] while len(remainder) > 0 and remainder[0] != b'-': opt_fields.append(remainder.pop(0)) self.opt_fields = opt_fields if len(remainder) > 0: sep = remainder.pop(0) if sep != b'-': raise ValueError('Expected separator "-", got', sep) if len(remainder) > 0: self.fstype = remainder.pop(0) if len(remainder) > 0: self.source = _oct_unescape(remainder.pop(0)) if len(remainder) > 0: self.super_opts = remainder.pop(0) def __repr__(self): return self.raw_entry.decode('ascii', errors = 'backslashreplace') def __str__(self): return '({major}:{minor}){src}{root}->{mp}'.format( major = self.devnum >> 8, minor = self.devnum & 255, src = ("{%s}" % self.source) if self.source else "", root = "[%s]" % self.root, mp = self.mount_point) class MountInfo: """ A wrapper around the contents of the Linux /proc//mountinfo file. """ mounts = None "OrderedDict of 'mount ID' -> _MountEntry" def __init__(self, data): mounts = OrderedDict() for i, line in enumerate(data.splitlines()): try: e = _MountEntry(i, line) except NotAbsRoot: pass if e.mount_id in mounts: raise ValueError('Duplicate mount ID: %s' % str(e.mount_id)) mounts[e.mount_id] = e self.mounts = mounts for e in mounts.values(): try: e.parent = mounts[e.parent_id] except KeyError: pass def __str__(self): return '%s(%d entries)' % (self.__class__.__name__, len(self.mounts)) def mountpoint_for_path(self, path): """ Find the mount entry which contains path. """ for mount in reversed(self.mounts.values()): if path in mount.mount_point: return mount raise LookupError('No mountpoint for', path)