summaryrefslogtreecommitdiff
path: root/lbup/_mountinfo.py
blob: caa9dddae50b2b0597e0077566a4189137ee2fba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os.path

from ._path import AbsPath

# oct-unescape certain characters as they are in /proc/mountinfo
# see seq_path[_root]() in linux source
def _oct_unescape(b):
    trans = {
        br'\040' : b' ',  br'\011'  : b'\t',
        br'\012' : b'\n', br'\0134' : b'\\',
    }

    for src, dst in trans.items():
        b = b.replace(src, dst)

    return b

class _MountEntry:
    "mount ID, bytes"
    mount_id      = None
    "parent ID, bytes"
    parent_id     = None
    "device number, int"
    devnum        = None
    "root of the mount, bytes"
    root          = None
    "path where the fs is mounted, bytes"
    mount_point   = None
    "mount options bytes"
    mount_opts    = None
    "optional fields, list of bytes"
    opt_fields    = None
    "filesystem type, bytes or None"
    fstype        = None
    "mount source, bytes or None"
    source        = None
    "superblock options, bytes or None"
    super_opts    = None

    "raw mountinfo line, bytes"
    raw_entry     = None

    def __init__(self, line):
        self.raw_entry = line

        items = line.split()
        if len(items) < 6:
            raise ValueError('Not enough items in a mountinfo line', line)

        mount_id, parent_id, devnum, root, mount_point, mount_opts = items[:6]
        self.mount_id  = mount_id
        self.parent_id = parent_id

        major, minor = map(int, devnum.split(b':'))
        if major < 0 or major > 255 or minor < 0 or minor > 255:
            raise ValueError('Invalid device number', devnum)
        self.devnum = (major << 8) + minor

        self.root        = AbsPath(_oct_unescape(root))
        self.mount_point = AbsPath(_oct_unescape(mount_point))
        self.mount_opts  = mount_opts

        remainder  = items[6:]
        opt_fields = []
        while len(remainder) > 0 and remainder[0] != b'-':
            opt_fields.append(remainder.pop(0))
        self.opt_fields = opt_fields

        if len(remainder) > 0:
            sep = remainder.pop(0)
            if sep != b'-':
                raise ValueError('Expected separator "-", got', sep)
        if len(remainder) > 0:
            self.fstype = remainder.pop(0)
        if len(remainder) > 0:
            self.source = _oct_unescape(remainder.pop(0))
        if len(remainder) > 0:
            self.super_opts = remainder.pop(0)

    def __str__(self):
        return '({major}:{minor}){src}{root}->{mp}'.format(
            major = self.devnum >> 8, minor = self.devnum & 255,
            src = ("{%s}" % self.source) if self.source else "",
            root = "[%s]" % self.root, mp = self.mount_point)

class MountInfo:
    """
    A wrapper around the contents of the Linux /proc/<pid>/mountinfo file.
    """

    "a list of _MountEntry"
    entries = None

    def __init__(self, data):
        self.entries = []
        for line in data.splitlines():
            self.entries.append(_MountEntry(line))

    def __str__(self):
        return '%s(%d entries)' % (self.__class__.__name__, len(self.entries))

    def entries_for_mountpoint(self, mountpoint):
        """
        Iterate over all mountinfo entries mounted at the given mountpoint.
        """
        return filter(lambda entry: entry.mount_point == mountpoint, self.entries)

    def mountpoint_for_path(self, path):
        """
        Find the longest mountpoint that is a parent of path.
        """
        best_match = None
        for e in self.entries:
            if (path in e.mount_point and
                (best_match is None or len(best_match) < len(e.mount_point))):
                best_match = e.mount_point

        if best_match is None:
            raise LookupError('No mountpoint for', path)

        print(best_match, len(best_match))

        return best_match