forked from stratis-storage/stratis-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_utils.py
More file actions
306 lines (247 loc) · 8.62 KB
/
_utils.py
File metadata and controls
306 lines (247 loc) · 8.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Miscellaneous functions.
"""
# isort: STDLIB
import errno
import json
import os
import sys
import termios
from enum import Enum
from typing import Any, Optional, Tuple
from uuid import UUID
# isort: THIRDPARTY
from dbus import Dictionary, Struct
from dbus.proxies import ProxyObject
from justbytes import Range
from .._errors import (
StratisCliKeyfileNotFoundError,
StratisCliPassphraseEmptyError,
StratisCliPassphraseMismatchError,
)
from .._stratisd_constants import ClevisInfo, MetadataVersion
try:
_STRICT_POOL_FEATURES = bool(
int(os.environ.get("STRATIS_STRICT_POOL_FEATURES", "0"))
)
except ValueError: # pragma: no cover
_STRICT_POOL_FEATURES = False
class EncryptionInfo: # pylint: disable=too-few-public-methods
"""
Generic information about a single encryption method.
"""
def __init__(self, info: Struct):
"""
Initializer.
:param info: info about an encryption method, as a dbus-python type
"""
(consistent, info) = info
if consistent:
(encrypted, value) = info
self.value = value if encrypted else None
else:
# No tests that generate inconsistent encryption information
self.error = str(info) # pragma: no cover
# This method is only invoked when displaying legacy pool information
def consistent(self): # pragma: no cover
"""
True if consistent, otherwise False.
"""
return not hasattr(self, "error")
class EncryptionInfoClevis(EncryptionInfo): # pylint: disable=too-few-public-methods
"""
Encryption info for Clevis
"""
def __init__(self, info: Struct):
super().__init__(info)
# We don't test with Clevis for coverage
if hasattr(self, "value"): # pragma: no cover
value = self.value
if value is not None:
(pin, config) = value # pyright: ignore [ reportGeneralTypeIssues ]
self.value = ClevisInfo(str(pin), json.loads(str(config)))
class EncryptionInfoKeyDescription(
EncryptionInfo
): # pylint: disable=too-few-public-methods
"""
Encryption info for kernel keyring
"""
def __init__(self, info: Struct):
super().__init__(info)
# Our listing code excludes creating an object of this class without
# it being consistent and set.
if hasattr(self, "value"): # pragma: no cover
value = self.value
if value is not None:
self.value = str(value)
class Device: # pylint: disable=too-few-public-methods
"""
A representation of a device in a stopped pool.
"""
def __init__(self, mapping: Dictionary):
self.uuid = UUID(mapping["uuid"])
self.devnode = str(mapping["devnode"])
class PoolFeature(Enum):
"""
Elements of a pool feature set that may be exposed in the StoppedPools
property.
"""
ENCRYPTION = "encryption"
INTEGRITY = "integrity"
RAID = "raid"
KEY_DESCRIPTION_PRESENT = "key_description_present"
CLEVIS_PRESENT = "clevis_present"
UNRECOGNIZED = None
def __str__(self) -> str:
if self is PoolFeature.UNRECOGNIZED:
return "<UNRECOGNIZED>"
return self.value
@classmethod
def _missing_(cls, value: Any):
if _STRICT_POOL_FEATURES:
return None
return PoolFeature.UNRECOGNIZED
class StoppedPool: # pylint: disable=too-few-public-methods
"""
A representation of a single stopped pool.
"""
def __init__(self, pool_info: Dictionary):
"""
Initializer.
:param pool_info: a D-Bus structure
"""
self.devs = [Device(info) for info in pool_info["devs"]]
clevis_info = pool_info.get("clevis_info")
self.clevis_info = (
None if clevis_info is None else EncryptionInfoClevis(clevis_info)
)
key_description = pool_info.get("key_description")
self.key_description = (
None
if key_description is None
else EncryptionInfoKeyDescription(key_description)
)
name = pool_info.get("name")
self.name = None if name is None else str(name)
metadata_version_valid, metadata_version = pool_info["metadata_version"]
try:
self.metadata_version = (
MetadataVersion(int(metadata_version))
if metadata_version_valid
else None
)
except ValueError: # pragma: no cover
self.metadata_version = None
features_valid, features = pool_info["features"]
self.features = (
frozenset(PoolFeature(f) for f in features) if features_valid else None
)
def get_pass(prompt: str) -> str:
"""
Prompt for a passphrase on stdin.
:param str prompt: prompt to display to user when fetching passphrase
:return: password
:rtype: str or None
"""
print(prompt, end="")
sys.stdout.flush()
old_attrs = None
try: # pragma: no cover
old_attrs = termios.tcgetattr(sys.stdin)
new_attrs = termios.tcgetattr(sys.stdin)
new_attrs[3] &= ~termios.ECHO
new_attrs[3] |= termios.ECHONL
termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, new_attrs)
except termios.error as err: # pragma: no cover
if err.args[0] == errno.ENOTTY:
print(
"Warning: this device is not a TTY so the password may be echoed",
file=sys.stderr,
)
except Exception: # nosec pylint: disable=broad-exception-caught
pass
password = None
try:
password = sys.stdin.readline()
except BaseException as err:
if old_attrs is not None: # pragma: no cover
termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, old_attrs)
raise err
if old_attrs is not None:
termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, old_attrs) # pragma: no cover
return password.rstrip("\n")
def get_passphrase_fd(*, keyfile_path=None, verify=True) -> Tuple[int, int]:
"""
Get a passphrase either from stdin or from a file.
:param str keyfile_path: path to a keyfile, may be None
:param bool verify: If verify is True, check password match
:return: a file descriptor to pass on the D-Bus, what to close when done
"""
if keyfile_path is None:
password = get_pass("Enter passphrase followed by the return key: ")
password_2 = get_pass("Verify passphrase entered: ") if verify else password
if password != password_2:
raise StratisCliPassphraseMismatchError()
if len(password) == 0:
raise StratisCliPassphraseEmptyError()
(read, write) = os.pipe()
os.write(write, password.encode("utf-8"))
file_desc = read
fd_to_close = write
else:
try:
file_desc = os.open(keyfile_path, os.O_RDONLY)
fd_to_close = file_desc
except FileNotFoundError as err:
raise StratisCliKeyfileNotFoundError(keyfile_path) from err
return (file_desc, fd_to_close)
def fetch_stopped_pools_property(proxy: ProxyObject) -> Dictionary:
"""
Fetch the StoppedPools property from stratisd.
:param proxy: proxy to the top object in stratisd
:return: a representation of stopped devices
:rtype: dict
:raises StratisCliEngineError:
"""
# pylint: disable=import-outside-toplevel
from ._data import Manager
return Manager.Properties.StoppedPools.Get(proxy)
class SizeTriple:
"""
Manage values in a size triple.
"""
def __init__(self, total: Optional[Range], used: Optional[Range]):
self._total = total
self._used = used
def total(self) -> Optional[Range]:
"""
Total.
"""
return self._total
def used(self) -> Optional[Range]:
"""
Used.
"""
return self._used
def free(self) -> Optional[Range]:
"""
Total - used.
"""
return (
None
if self._used is None or self.total is None
else self._total - self._used
)