-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathutil.py
More file actions
386 lines (299 loc) · 10.7 KB
/
util.py
File metadata and controls
386 lines (299 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# Copyright (c) 2021 The Toltec Contributors
# SPDX-License-Identifier: MIT
"""Collection of useful functions."""
import argparse
from collections.abc import Iterable
import hashlib
import logging
import itertools
import functools
import os
import shutil
import sys
from typing import (
Any,
Callable,
cast,
IO,
Protocol,
)
import warnings
import zipfile
import tarfile
# Date format used in HTTP headers such as Last-Modified
HTTP_DATE_FORMAT = "%a, %d %b %Y %H:%M:%S %Z"
# Logging format for build scripts
LOGGING_FORMAT = "[%(levelname)8s] %(name)s: %(message)s"
def argparse_add_verbose(parser: argparse.ArgumentParser) -> argparse.Action:
"""Add a CLI option for setting the verbosity level."""
return parser.add_argument(
"-v",
"--verbose",
action="store_const",
const=logging.DEBUG,
default=logging.INFO,
help="show debugging information",
)
def argparse_add_warning(parser: argparse.ArgumentParser) -> argparse.Action:
"""Add a CLI option for controlling warnings."""
return parser.add_argument(
"-W",
"--warnings",
choices=("default", "error", "ignore"),
default="default",
help="""control warnings. 'default' sends warnings to stderr and
removes duplicate warnings, 'ignore' silences any warning, and 'error' turns
warnings into exceptions""",
)
def setup_logging(args: argparse.Namespace) -> None:
"""Configure logging and warning control based on passed CLI flags."""
if hasattr(args, "verbose"):
logging.basicConfig(format=LOGGING_FORMAT, level=args.verbose)
def formatwarning(
message: str | Warning,
category: type[Warning],
filename: str,
lineno: int,
line: str | None = None,
) -> str:
del filename, lineno, line
return f"[{category.__name__}] {message}"
warnings.formatwarning = formatwarning
logging.captureWarnings(True)
if hasattr(args, "warnings"):
warnings.simplefilter(args.warnings)
def file_sha256(path: str) -> str:
"""Compute the SHA-256 checksum of a file."""
sha256 = hashlib.sha256()
buffer = bytearray(128 * 1024)
view = memoryview(buffer)
with open(path, "rb", buffering=0) as file:
for length in iter(lambda: file.readinto(view), 0): # type:ignore
sha256.update(view[:length])
return sha256.hexdigest()
def split_all_parts(path: str) -> list[str]:
"""Split a file path into all its directory components."""
parts = []
prefix = path
while prefix not in ("", "/"):
prefix, base = os.path.split(prefix)
if base:
parts.append(base)
parts.reverse()
return parts
def split_all_exts(path: str) -> list[str]:
"""Get the list of extensions in a file path."""
exts = []
remaining = path
while True:
remaining, ext = os.path.splitext(remaining)
if ext:
exts.append(ext)
else:
break
return exts
def all_equal(seq: Iterable) -> bool:
"""Check that all elements of a sequence are equal."""
grouped = itertools.groupby(seq)
first = next(grouped, (None, grouped))
second = next(grouped, None)
return first and not second
def remove_prefix(filenames: list[str]) -> dict[str, str]:
"""Find and remove the longest directory prefix shared by all files."""
split_filenames = [split_all_parts(filename) for filename in filenames]
# Find the longest directory prefix shared by all files
min_len = min(len(filename) for filename in split_filenames)
prefix = 0
while prefix < min_len and all_equal(
filename[prefix] for filename in split_filenames
):
prefix += 1
# If there’s only one file, keep the last component
if len(filenames) == 1:
prefix -= 1
mapping = {}
for filename, split_filename in zip(filenames, split_filenames):
if split_filename[prefix:]:
mapping[filename] = os.path.join(*split_filename[prefix:])
return mapping
def auto_extract(archive_path: str, dest_path: str) -> bool:
"""
Automatically extract an archive and strip useless components.
:param archive_path: path to the archive to extract
:param dest_path: destination folder for the archive contents
:returns: true if something was extracted, false if not a supported archive
"""
exts = split_all_exts(archive_path)
if not exts:
return False
if exts[0] == ".zip":
with zipfile.ZipFile(archive_path) as zip_archive:
_auto_extract(
zip_archive.namelist(),
zip_archive.getinfo,
zip_archive.open,
lambda member: member.is_dir(),
lambda member: False,
lambda member: member.external_attr >> 16 & 0x1FF,
dest_path,
)
return True
if exts[0] == ".tar" or (
len(exts) >= 2
and exts[0] in (".gz", ".bz2", ".xz")
and exts[1] == ".tar"
):
with tarfile.open(archive_path, mode="r") as tar_archive:
_auto_extract(
tar_archive.getnames(),
tar_archive.getmember,
tar_archive.extractfile,
lambda member: member.isdir(),
lambda member: member.issym(),
lambda member: member.mode,
dest_path,
)
return True
return False
def _auto_extract( # pylint:disable=too-many-arguments,disable=too-many-locals,disable=too-many-positional-arguments
members: list[str],
getinfo: Callable[[str], Any],
extract: Callable[[Any], IO[bytes] | None],
isdir: Callable[[Any], bool],
issym: Callable[[Any], bool],
getmode: Callable[[Any], int],
dest_path: str,
) -> None:
"""
Generic implementation of automatic archive extraction.
:param members: list of members of the archive
:param getinfo: get an entry object from an entry name in the archive
:param extract: get a reading stream corresponding to an archive entry
:param isdir: get whether an entry is a directory or not
:param issym: get whether an entry is a symbolic link or not
:param getmode: get the permission bits for an entry
:param destpath: destinatio folder for the archive contents
"""
stripped_map = remove_prefix(members)
for filename, stripped in stripped_map.items():
member = getinfo(filename)
file_path = os.path.join(dest_path, stripped)
if isdir(member):
os.makedirs(file_path, exist_ok=True)
else:
if issym(member):
os.symlink(member.linkname, file_path)
else:
basedir = os.path.dirname(file_path)
if not os.path.exists(basedir):
os.makedirs(basedir, exist_ok=True)
source = extract(member)
assert source is not None
with source, open(file_path, "wb") as target:
shutil.copyfileobj(source, target)
mode = getmode(member)
if mode != 0:
os.chmod(file_path, mode)
def query_user(
question: str,
default: str,
options: list[str] | None = None,
aliases: dict[str, str] | None = None,
) -> str:
"""
Ask the user to make a choice.
:param question: message to display before the choice
:param default: default choice if the user inputs an empty string
:param options: list of valid options (should be lowercase strings)
:param aliases: accepted aliases for the valid options
:returns: option chosen by the user
"""
options = options or ["y", "n"]
aliases = aliases or {"yes": "y", "no": "n"}
if default not in options:
raise ValueError(f"Default value {default} is not a valid option")
prompt = "/".join(
option if option != default else option.upper() for option in options
)
while True:
sys.stdout.write(f"{question} [{prompt}] ")
choice = input().lower()
if not choice:
return default
if choice in options:
return choice
if choice in aliases:
return aliases[choice]
print("Invalid answer. Please choose among the valid options.")
def check_directory(path: str, message: str) -> bool:
"""
Create a directory and ask the user what to do if it already exists.
:param path: path to the directory to create
:param message: message to display before asking the user interactively
:returns: false if the user chose to cancel the current operation
"""
try:
os.mkdir(path)
except FileExistsError:
ans = query_user(
message,
default="c",
options=["c", "r", "k"],
aliases={
"cancel": "c",
"remove": "r",
"keep": "k",
},
)
if ans == "c":
return False
if ans == "r":
shutil.rmtree(path)
os.mkdir(path)
return True
def list_tree(root: str) -> list[str]:
"""
Get a sorted list of all files and folders under a given root folder.
:param root: root folder to start from
:returns: sorted list of items under the root folder
"""
result = []
for directory, _, files in os.walk(root):
result.append(directory)
for file in files:
result.append(os.path.join(directory, file))
return sorted(result)
HookTrigger = Callable[..., None]
HookListener = Callable[..., None]
class Hook(Protocol): # pylint:disable=too-few-public-methods
"""Protocol for hooks."""
@staticmethod
def register(new_listener: HookListener) -> None:
"""Add a new listener to this hook."""
# Invoke all listeners for this hook
__call__: HookTrigger
def hook(func: HookTrigger) -> Hook:
"""
Decorator for turning a function into a hook.
:param func: empty function to declare as a hook
:returns: usable hook
"""
listeners: list[HookListener] = []
def register(new_listener: HookListener) -> None:
listeners.append(new_listener)
@functools.wraps(func)
def call(*args: Any, **kwargs: Any) -> None:
for call_listener in listeners:
call_listener(*args, **kwargs)
setattr(call, "register", register)
return cast(Hook, call)
def listener(target_hook: Hook) -> Callable[[HookListener], HookListener]:
"""
Decorator for subscribing a function to a hook.
:param target_hook: hook to add this function as a listener of
:returns: identity
"""
def decorator(new_listener: HookListener) -> HookListener:
target_hook.register(new_listener)
return new_listener
return decorator