1010import argparse
1111import logging
1212import time
13+ from pathlib import Path
1314
1415DEFAULT_SOURCE_ROOT = "/glade/campaign/cesm/cesmdata/cseg/inputdata/"
1516DEFAULT_TARGET_ROOT = (
@@ -60,18 +61,23 @@ def _handle_non_dir_entry(entry, user_uid):
6061 return None
6162
6263
63- def handle_non_dir (var , user_uid ):
64+ def handle_non_dir (var , user_uid , inputdata_root ):
6465 """
6566 Check if a non-directory is owned by the user and should be processed. Passes var to a
6667 helper function depending on its type.
6768
6869 Args:
6970 var (os.DirEntry or str): A directory entry from os.scandir(), or a string path.
7071 user_uid (int): The UID of the user whose files to find.
72+ inputdata_root (str): The root of the directory tree containing CESM input data.
7173
7274 Returns:
7375 str or None: The absolute path to the file if it's owned by the user
7476 and is a regular file (not a symlink), otherwise None.
77+
78+ Raises:
79+ TypeError: If var is not a DirEntry-like object.
80+ ValueError: If the file path is not under inputdata_root.
7581 """
7682
7783 # Fall back to duck typing: If var has the required DirEntry methods and members, treat it as a
@@ -80,12 +86,22 @@ def handle_non_dir(var, user_uid):
8086 if isinstance (var , os .DirEntry ) or all (
8187 hasattr (var , m ) for m in ["stat" , "is_file" , "is_symlink" , "path" ]
8288 ):
83- return _handle_non_dir_entry (var , user_uid )
89+ file_path = _handle_non_dir_entry (var , user_uid )
90+ else :
91+ raise TypeError (
92+ f"Unsure how to handle non-directory variable of type { type (var )} "
93+ )
8494
85- raise TypeError (f"Unsure how to handle non-directory variable of type { type (var )} " )
95+ # Check that resulting path is a child of inputdata_root
96+ if file_path is not None and not Path (file_path ).is_relative_to (inputdata_root ):
97+ raise ValueError (
98+ f"'{ file_path } ' must be equivalent to or under '{ inputdata_root } "
99+ )
100+
101+ return file_path
86102
87103
88- def find_owned_files_scandir (directory , user_uid ):
104+ def find_owned_files_scandir (directory , user_uid , inputdata_root = DEFAULT_SOURCE_ROOT ):
89105 """
90106 Efficiently find all files owned by a specific user using os.scandir().
91107
@@ -95,20 +111,28 @@ def find_owned_files_scandir(directory, user_uid):
95111 Args:
96112 directory (str): The root directory to search.
97113 user_uid (int): The UID of the user whose files to find.
114+ inputdata_root (str): The root of the directory tree containing CESM input data.
98115
99116 Yields:
100117 str: Absolute paths to files owned by the user.
118+
119+ Raises:
120+ ValueError: If any file found is not under inputdata_root.
101121 """
102122 try :
103123 with os .scandir (directory ) as entries :
104124 for entry in entries :
105125 try :
106126 # Recursively process directories (not following symlinks)
107127 if entry .is_dir (follow_symlinks = False ):
108- yield from find_owned_files_scandir (entry .path , user_uid )
128+ yield from find_owned_files_scandir (
129+ entry .path , user_uid , inputdata_root
130+ )
109131
110132 # Things other than directories are handled separately
111- elif (entry_path := handle_non_dir (entry , user_uid )) is not None :
133+ elif (
134+ entry_path := handle_non_dir (entry , user_uid , inputdata_root )
135+ ) is not None :
112136 yield entry_path
113137
114138 except (OSError , PermissionError ) as e :
@@ -119,7 +143,9 @@ def find_owned_files_scandir(directory, user_uid):
119143 logger .debug ("Error accessing %s: %s. Skipping." , directory , e )
120144
121145
122- def replace_files_with_symlinks (source_dir , target_dir , username , dry_run = False ):
146+ def replace_files_with_symlinks (
147+ source_dir , target_dir , username , inputdata_root = DEFAULT_SOURCE_ROOT , dry_run = False
148+ ):
123149 """
124150 Finds files owned by a specific user in a source directory tree,
125151 deletes them, and replaces them with symbolic links to the same
@@ -128,6 +154,7 @@ def replace_files_with_symlinks(source_dir, target_dir, username, dry_run=False)
128154 Args:
129155 source_dir (str): The root of the directory tree to search for files.
130156 target_dir (str): The root of the directory tree containing the new files.
157+ inputdata_root (str): The root of the directory tree containing CESM input data.
131158 username (str): The name of the user whose files will be processed.
132159 dry_run (bool): If True, only show what would be done without making changes.
133160 """
@@ -152,7 +179,7 @@ def replace_files_with_symlinks(source_dir, target_dir, username, dry_run=False)
152179 )
153180
154181 # Use efficient scandir-based search
155- for file_path in find_owned_files_scandir (source_dir , user_uid ):
182+ for file_path in find_owned_files_scandir (source_dir , user_uid , inputdata_root ):
156183 logger .info ("Found owned file: %s" , file_path )
157184
158185 # Determine the relative path and the new link's destination
@@ -251,6 +278,16 @@ def parse_arguments():
251278 ),
252279 )
253280
281+ # The root of the directory tree containing CESM input data.
282+ # ONLY INTENDED FOR USE IN TESTING
283+ parser .add_argument (
284+ "--inputdata-root" ,
285+ "-inputdata" , # to match rimport
286+ type = validate_directory ,
287+ default = DEFAULT_SOURCE_ROOT ,
288+ help = argparse .SUPPRESS ,
289+ )
290+
254291 # Verbosity options (mutually exclusive)
255292 verbosity_group = parser .add_mutually_exclusive_group ()
256293 verbosity_group .add_argument (
@@ -311,7 +348,11 @@ def main():
311348
312349 # --- Execution ---
313350 replace_files_with_symlinks (
314- args .source_root , args .target_root , my_username , dry_run = args .dry_run
351+ args .source_root ,
352+ args .target_root ,
353+ my_username ,
354+ inputdata_root = args .inputdata_root ,
355+ dry_run = args .dry_run ,
315356 )
316357
317358 if args .timing :
0 commit comments