1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
import os.path
from ._path import AbsPath
# oct-unescape certain characters as they are in /proc/mountinfo
# see seq_path[_root]() in linux source
def _oct_unescape(b):
trans = {
br'\040' : b' ', br'\011' : b'\t',
br'\012' : b'\n', br'\0134' : b'\\',
}
for src, dst in trans.items():
b = b.replace(src, dst)
return b
class _MountEntry:
"mount ID, bytes"
mount_id = None
"parent ID, bytes"
parent_id = None
"device number, int"
devnum = None
"root of the mount, bytes"
root = None
"path where the fs is mounted, bytes"
mount_point = None
"mount options bytes"
mount_opts = None
"optional fields, list of bytes"
opt_fields = None
"filesystem type, bytes"
fstype = None
"mount source, bytes or None"
source = None
"superblock options, bytes or None"
super_opts = None
"raw mountinfo line, bytes"
raw_entry = None
def __init__(self, line):
self.raw_entry = line
items = line.split()
if len(items) < 6:
raise ValueError('Not enough items in a mountinfo line', line)
mount_id, parent_id, devnum, root, mount_point, mount_opts = items[:6]
self.mount_id = mount_id
self.parent_id = parent_id
major, minor = map(int, devnum.split(b':'))
if major < 0 or major > 255 or minor < 0 or minor > 255:
raise ValueError('Invalid device number', devnum)
self.devnum = (major << 8) + minor
self.root = AbsPath(_oct_unescape(root))
self.mount_point = AbsPath(_oct_unescape(mount_point))
self.mount_opts = mount_opts
remainder = items[6:]
opt_fields = []
while len(remainder) > 0 and remainder[0] != b'-':
opt_fields.append(remainder.pop(0))
self.opt_fields = opt_fields
if len(remainder) > 0:
sep = remainder.pop(0)
if sep != b'-':
raise ValueError('Expected separator "-", got', sep)
if len(remainder) > 0:
self.fstype = remainder.pop(0)
if len(remainder) > 0:
self.source = _oct_unescape(remainder.pop(0))
if len(remainder) > 0:
self.super_opts = remainder.pop(0)
def __str__(self):
return '({major}:{minor}){src}{root}->{mp}'.format(
major = self.devnum >> 8, minor = self.devnum & 255,
src = ("{%s}" % self.source) if self.source else "",
root = "[%s]" % self.root, mp = self.mount_point)
class MountInfo:
"""
A wrapper around the contents of the Linux /proc/<pid>/mountinfo file.
"""
"a list of _MountEntry"
entries = None
def __init__(self, data):
self.entries = []
for line in data.splitlines():
self.entries.append(_MountEntry(line))
def __str__(self):
return '%s(%d entries)' % (self.__class__.__name__, len(self.entries))
def entries_for_mountpoint(self, mountpoint):
"""
Iterate over all mountinfo entries mounted at the given mountpoint.
"""
return filter(lambda entry: entry.mount_point == mountpoint, self.entries)
def mountpoint_for_path(self, path):
"""
Find the longest mountpoint that is a parent of path.
"""
best_match = None
for e in self.entries:
if (path in e.mount_point and
(best_match is None or len(best_match) < len(e.mount_point))):
best_match = e.mount_point
if best_match is None:
raise LookupError('No mountpoint for', path)
print(best_match, len(best_match))
return best_match
|