forked from ESMCI/inputdataTools
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrimport
More file actions
executable file
·222 lines (186 loc) · 8.14 KB
/
rimport
File metadata and controls
executable file
·222 lines (186 loc) · 8.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#!/glade/u/apps/derecho/24.12/opt/view/bin/python
# TODO: Move all the Python into new file rimport.py for simpler testing. Keep rimport as a
# convenience wrapper.
"""
A drop-in CLI replacement for the legacy `rimport` csh tool.
This script preserves the original command-line interface:
SYNOPSIS
rimport [-file filename] [-list filelist] [-inputdata inputdata_dir] [-help]
OPTIONS
-file filename
provide a single filename relative to the top inputdata directory
-list filelist
provide a file that contains a list of filenames to import. all filenames
in the list are relative to the top inputdata area.
-inputdata inputdata_dir
change the default local top level inputdata directory
-help
get help about this tool
Customize the `do_new_action(path)` function to implement the new behavior.
By default, it prints which files would be processed.
"""
from __future__ import annotations
import argparse
import os
import pwd
import shutil
import sys
from pathlib import Path
from typing import Iterable, List
class PlainHelpFormatter(argparse.RawTextHelpFormatter):
pass
def build_parser() -> argparse.ArgumentParser:
synopsis = (
"rimport [-file filename] [-list filelist] [-inputdata inputdata_dir] [-help]"
)
description = (
"SYNOPSIS\n"
f" {synopsis}\n\n"
"OPTIONS\n"
" -file filename\n"
" provide a single filename relative to the top inputdata directory\n"
" -list filelist\n"
" provide a file that contains a list of filenames to import. all filenames\n"
" in the list are relative to the top inputdata area.\n"
" -inputdata inputdata_dir\n"
" change the default local top level inputdata directory\n"
" -help\n"
" get help about this tool\n"
)
parser = argparse.ArgumentParser(
prog="rimport",
description=description,
formatter_class=PlainHelpFormatter,
add_help=False, # preserve original -help only
usage=synopsis,
)
# Mutually exclusive: -file or -list (one required)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-file", dest="file", metavar="filename", help=argparse.SUPPRESS)
group.add_argument("-list", dest="filelist", metavar="filelist", help=argparse.SUPPRESS)
parser.add_argument(
"-inputdata",
dest="inputdata",
metavar="inputdata_dir",
default=os.path.join("/glade","campaign","cesm","cesmdata","cseg","inputdata"),
help=argparse.SUPPRESS,
)
# Provide -help to mirror legacy behavior
parser.add_argument("-help", action="help", help=argparse.SUPPRESS)
return parser
def read_filelist(list_path: Path) -> List[str]:
"""Read list file, ignoring blank lines and comments starting with #."""
lines: List[str] = []
with list_path.open("r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#"):
continue
lines.append(line)
return lines
def resolve_paths(root: Path, relnames: Iterable[str]) -> List[Path]:
paths: List[Path] = []
for name in relnames:
p = (root / name).resolve() if not Path(name).is_absolute() else Path(name).resolve()
paths.append(p)
return paths
def stage_data(src: Path, inputdata_root: Path, staging_root: Path) -> None:
"""Stage a file by mirroring its path under `staging_root`.
Destination path is computed by replacing the `args.inputdata` prefix of `src`
with `staging_root`, i.e.:
dst = staging_root / src.relative_to(inputdata_root)
Guardrails:
* Raise if `src` is a *live* symlink ("already published").
* Raise if `src` is a broken symlink or is outside the inputdata root.
"""
if src.is_symlink() and src.exists():
raise RuntimeError("File is already published.")
if src.is_symlink() and not src.exists():
raise RuntimeError(f"Source is a broken symlink: {src}")
if not src.exists():
raise FileNotFoundError(f"source not found: {src}")
try:
rel = src.resolve().relative_to(inputdata_root.resolve())
except ValueError:
# TODO: Do not hard-code string here
# TODO: Check whether it's IN THE DIRECTORY, not whether the path contains a string
if "d651077" in str(src):
raise RuntimeError(f"Source file {src.name} is already published.")
else:
raise RuntimeError(f"source not under inputdata root: {src} not in {inputdata_root}")
dst = staging_root / rel
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
print(f"[rimport] staged {src} -> {dst}")
def ensure_running_as(target_user: str, argv: list[str]) -> None:
"""If not running as `target_user`, re-exec via sudo -u target_user (handles 2FA via PAM)."""
try:
target_uid = pwd.getpwnam(target_user).pw_uid
except KeyError:
# TODO: Raise Python error instead of SystemExit
print(f"rimport: target user '{target_user}' not found on this system", file=sys.stderr)
raise SystemExit(2)
if os.geteuid() != target_uid:
if not sys.stdin.isatty():
# TODO: Do not hard-code "cesmdata" here
print("rimport: need interactive TTY to authenticate as 'cesmdata' (2FA).\n"
" Try: sudo -u cesmdata rimport …", file=sys.stderr)
# TODO: Raise Python error instead of SystemExit
raise SystemExit(2)
# Re-exec under target user; this invokes sudo’s normal password/2FA flow.
os.execvp("sudo", ["sudo", "-u", target_user, "--"] + argv)
# TODO: Unused; delete.
def safe_mvandlink(src: Path, dst: Path) -> None:
dst.parent.mkdir(parents=True, exist_ok=True)
# Move (handles cross-filesystem with copy2+remove under the hood)
# This preserves metadata similarly to copy2 when crossing devices.
moved_to = Path(shutil.move(str(src), str(final_dst)))
# Create the symlink at the original src path
link_target = str(moved_to)
os.symlink(link_target, src)
def get_staging_root() -> Path:
"""Return the staging root. Uses $RIMPORT_STAGING if set, otherwise
creates a sibling directory named '<inputdata_root>.staging'."""
env = os.getenv("RIMPORT_STAGING")
if env:
return Path(env).expanduser().resolve()
# TODO: This should be a module-level variable.
return Path("/glade/campaign/collections/gdex/data/d651077/cesmdata/inputdata")
def main(argv: List[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
# Ensure we are running as the cesmdata account before touching the tree
# Set env var RIMPORT_SKIP_USER_CHECK=1 if you prefer to run `sudox -u cesmdata rimport …`
# explicitly (or for testing).
if os.getenv("RIMPORT_SKIP_USER_CHECK") != "1":
ensure_running_as("cesmdata", sys.argv)
root = Path(args.inputdata).expanduser().resolve()
if not root.exists():
print(f"rimport: inputdata directory does not exist: {root}", file=sys.stderr)
return 2
# Determine the list of relative filenames to handle
if args.file is not None:
relnames = [args.file]
else:
list_path = Path(args.filelist).expanduser().resolve()
if not list_path.exists():
print(f"rimport: list file not found: {list_path}", file=sys.stderr)
return 2
relnames = read_filelist(list_path)
if not relnames:
print(f"rimport: no filenames found in list: {list_path}", file=sys.stderr)
return 2
# Resolve to full paths (keep accepting absolute names too)
paths = resolve_paths(root, relnames)
staging_root = get_staging_root()
# Execute the new action per file
errors = 0
for p in paths:
try:
stage_data(p, root, staging_root)
except Exception as e: # Keep CLI robust for batch runs
errors += 1
print(f"rimport: error processing {p}: {e}", file=sys.stderr)
return 0 if errors == 0 else 1
if __name__ == "__main__":
raise SystemExit(main())