Skip to content

Commit 59775c2

Browse files
committed
Added a "ignore these file extensions" feature
1 parent cef11bc commit 59775c2

2 files changed

Lines changed: 58 additions & 8 deletions

File tree

tests/translators_loggers/test_translators_loggers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,9 @@ def test_translator(self, backend) -> None:
335335
elif backend == "makeflow":
336336
parser = MakeflowLogsParser(execution_dir = dirpath, resource_monitor_logs_dir = dirpath / "monitor_data/")
337337
elif backend == "streamflow":
338-
parser = ROCrateLogsParser(dirpath / "RO-Crate", steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"])
338+
parser = ROCrateLogsParser(dirpath / "RO-Crate",
339+
steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"],
340+
file_extensions_to_ignore=[".out", ".err"])
339341

340342
if parser is not None:
341343
sys.stderr.write(f"[{backend}] Parsing the logs...\n")
@@ -349,7 +351,7 @@ def test_translator(self, backend) -> None:
349351
# print(reconstructed_workflow.tasks)
350352
for task_name in original_workflow.tasks.keys():
351353
original_task = original_workflow.tasks[task_name]
352-
reconstructed_task = reconstructed_workflow.tasks[task_name]
354+
reconstructed_task = reconstructed_workflow.tasks["main.cwl#" + task_name]
353355
print("ORIGINAL:", original_task.task_id, "RECONSTRUCTED:", reconstructed_task.task_id)
354356
print(" NUM_INPUT_FILES: ", len(original_task.input_files), len(reconstructed_task.input_files))
355357
print(" NUM_OUTPUT_FILES: ", len(original_task.output_files), len(reconstructed_task.output_files))

wfcommons/wfinstances/logs/ro_crate.py

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ def __init__(self,
4545
crate_dir: pathlib.Path,
4646
description: Optional[str] = None,
4747
logger: Optional[Logger] = None,
48-
steps_to_ignore: Optional[list[str]]=None) -> None:
48+
steps_to_ignore: Optional[list[str]]=[],
49+
file_extensions_to_ignore: Optional[list[str]]=[],
50+
) -> None:
4951
"""Create an object of the RO crate parser."""
5052

5153
# TODO: Decide if these should be RO crate or Streamflow or whatev
@@ -67,8 +69,10 @@ def __init__(self,
6769
self.file_objects = {}
6870

6971
self.task_id_name_map: dict[str, str] = {}
72+
self.data_file_id_name_map: dict[str, str] = {}
7073

7174
self.steps_to_ignore = steps_to_ignore
75+
self.file_extensions_to_ignore = file_extensions_to_ignore
7276

7377

7478
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
@@ -96,16 +100,32 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
96100
# Dictionary of ro-crate objects by "@id"
97101
self.lookup = {item["@id"]: item for item in self.graph_data}
98102

103+
# Dictionary of application data files
104+
self._construct_data_file_id_name_map()
105+
99106
# Find id of the main workflow
100107
overview = self.lookup.get("./")
101108
main_workflow_id = overview.get("mainEntity").get("@id")
102109

103110
create_actions = list(filter((lambda x: x.get('@type') == "CreateAction"), self.graph_data))
104111
self._create_tasks(create_actions, main_workflow_id)
105112

113+
return self.workflow
106114

107115

108-
return self.workflow
116+
def _construct_data_file_id_name_map(self):
117+
for item in self.graph_data:
118+
if item["@type"] != "File":
119+
continue
120+
id = item["@id"]
121+
if "alternateName" not in item:
122+
continue
123+
alternate_name = item["alternateName"]
124+
self.data_file_id_name_map[id] = alternate_name
125+
print("=== FILE MAP ===")
126+
print(self.data_file_id_name_map)
127+
print("==== END FILE MAP ===")
128+
109129

110130
def _create_tasks(self, create_actions, main_workflow_id):
111131
# Object to track dependencies between tasks based on files
@@ -121,6 +141,8 @@ def _create_tasks(self, create_actions, main_workflow_id):
121141
continue
122142

123143
create_action['name'] = create_action['name'].removeprefix("Run of workflow/")
144+
print("***************************************")
145+
print("DEALING WITH TASK:", create_action['name'])
124146

125147
# Below would remove the "file.cwl#" tag, which runs the risk
126148
# of non-uniqueness of action names perhaps
@@ -133,11 +155,19 @@ def _create_tasks(self, create_actions, main_workflow_id):
133155
# Get all input & output for the create_action
134156
input = [obj['@id'] for obj in create_action['object']]
135157
output = [obj['@id'] for obj in create_action['result']]
158+
# print("RAW INPUT FILES: ", input)
159+
# print("RAW OUTPUT FILES: ", output)
136160

137161
# Filter for actual files
138162
input_files = self._filter_file_ids(input)
163+
print("GOT THESE IDS FOR INPUT FILES: ", input_files)
164+
print("TRANSLATED TO REAL FILE NAMES: ", [self.data_file_id_name_map[f] for f in input_files])
139165
output_files = self._filter_file_ids(output)
166+
print("GOT THESE IDS FOR OUTPUT FILES: ", output_files)
167+
print("TRANSLATED TO REAL FILE NAMES: ", [self.data_file_id_name_map[f] for f in output_files])
140168

169+
print("FILTERED INPUT FILES: ", input_files)
170+
print("FILTERED OUTPUT FILES: ", output_files)
141171

142172
task = Task(name=create_action['name'],
143173
task_id=create_action['name'],
@@ -214,31 +244,49 @@ def _get_file_objects(self, files):
214244
output = []
215245
for file in files:
216246
if file not in self.file_objects:
217-
self.file_objects[file] = File(file_id=file,
247+
self.file_objects[file] = File(file_id=self.data_file_id_name_map[file],
218248
size=os.path.getsize(f"{self.crate_dir}/{file}"),
219249
logger=self.logger)
220250
output.append(self.file_objects[file])
221251
return output
222252

223253
def _filter_file_ids(self, ids):
224-
# Given a list of "@id"s, returns those with the File type as well as unpacks PropertyValue into Files.
225-
file_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'File', ids))
226254

255+
file_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'File', ids))
227256
property_value_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'PropertyValue', ids))
257+
# print("FILE_IDS =", file_ids)
258+
# print("PROPERTY_VALUE_IDS =", property_value_ids)
228259
for property_value_id in property_value_ids:
229260
property_values = self.lookup.get(property_value_id)['value']
261+
# print("PROPERTY_VALUES =", property_values)
262+
if isinstance(property_values, dict):
263+
property_values = [property_values]
230264

231265
# Filter out values without "@id"s (i.e. int values, etc.)
232266
pv_contained_ids = list(filter(lambda x: isinstance(x, dict) and "@id" in x, property_values))
267+
# print("PV_CONTAINED_IDS.1 = ", pv_contained_ids)
233268
pv_contained_ids = [obj["@id"] for obj in pv_contained_ids]
269+
# print("PV_CONTAINED_IDS.2 = ", pv_contained_ids)
234270

235271
# Recurse to verify everything's a file
236272
pv_filtered_ids = self._filter_file_ids(pv_contained_ids)
237273

238274
# Filter duplicates while adding
239275
file_ids = list(set(file_ids + pv_filtered_ids))
240276

241-
return file_ids
277+
# Removing files based on file extensions
278+
to_return = []
279+
for file_id in file_ids:
280+
to_ignore = False
281+
for suffix in self.file_extensions_to_ignore:
282+
if self.data_file_id_name_map[file_id].endswith(suffix):
283+
to_ignore = True
284+
break
285+
if not to_ignore:
286+
to_return.append(file_id)
287+
288+
return to_return
289+
242290
def _process_main_workflow(self, main_workflow):
243291
self.workflow.makespan = self._time_diff(main_workflow['startTime'], main_workflow['endTime'])
244292
self.workflow.executed_at = main_workflow['startTime']

0 commit comments

Comments
 (0)