forked from openebs/rawfile-localpv
-
Notifications
You must be signed in to change notification settings - Fork 1
/
declarative.py
114 lines (96 loc) · 2.92 KB
/
declarative.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import os
import subprocess
import tempfile
from pathlib import Path
from util import run
def be_absent(path):
path = Path(path)
if path.is_symlink():
path.unlink()
elif path.is_file():
path.unlink()
elif path.is_dir():
path.rmdir()
# XXX: should we `shutil.rmtree(path)` instead?
elif not path.exists():
return
else:
raise Exception("Unknown file type")
def be_symlink(path, to):
path = Path(path)
to = Path(to)
if path.is_symlink():
if os.readlink(path) == str(to):
return
be_absent(path)
path.symlink_to(to)
def be_mounted(dev, mountpoint):
dev = Path(dev).resolve()
mountpoint = Path(mountpoint)
if mountpoint.is_mount():
if True: # TODO: verify that the right device is mounted
return
# noinspection PyUnreachableCode
be_unmounted(mountpoint)
fs = current_fs(dev)
if fs == "ext4":
run(f"mount -t ext4 {dev} {mountpoint}")
elif fs == "btrfs":
run(f"mount -t btrfs -o flushoncommit {dev} {mountpoint}")
elif fs == "xfs":
run(f"mount -t xfs {dev} {mountpoint}")
else:
raise Exception(f"Unsupported fs type: {fs}")
def be_unmounted(path):
path = Path(path)
while path.is_mount():
run(f"umount {path}")
def current_fs(device):
res = subprocess.run(
f"blkid -o value -s TYPE {device}", shell=True, capture_output=True
)
if res.returncode == 2: # specified token was not found
return None
return res.stdout.decode().strip()
def be_formatted(dev, fs):
def init_fs(device):
if fs == "ext4":
run(f"mkfs.ext4 -m 0 {device}")
elif fs == "btrfs":
run(f"mkfs.btrfs {device}")
tmp_mnt = tempfile.mkdtemp(prefix="mnt-")
default_subvol = f"{tmp_mnt}/default"
run(
f"""
set -ex
mkdir -p {tmp_mnt}
mount -t btrfs {device} {tmp_mnt}
btrfs subvolume create {default_subvol}
btrfs subvolume set-default {default_subvol}
umount {tmp_mnt}
rmdir {tmp_mnt}
"""
)
elif fs == "xfs":
run(f"mkfs.xfs {device}")
else:
raise Exception(f"Unsupported fs type: {fs}")
dev = Path(dev).resolve()
current = current_fs(dev)
if current is None:
init_fs(dev)
else:
if current != fs:
raise Exception(f"Existing filesystem does not match: {current}/{fs}")
def be_fs_expanded(dev, path):
dev = Path(dev).resolve()
fs = current_fs(dev)
path = Path(path).resolve()
if fs == "ext4":
run(f"resize2fs {dev}")
elif fs == "btrfs":
run(f"btrfs filesystem resize max {path}")
elif fs == "xfs":
run(f"xfs_growfs -d {path}")
else:
raise Exception(f"Unsupported fsType: {fs}")