Skip to content

Commit cef11bc

Browse files
committed
Fixed RO-Crate parser to preserve original task names (easier debugging)
Augmented TO-Crate parser to take in an optional list of step names to ignore
1 parent 761dcb9 commit cef11bc

3 files changed

Lines changed: 69 additions & 24 deletions

File tree

tests/translators_loggers/Dockerfile.streamflow

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ RUN apt-get -y install vim --fix-missing
2424
RUN apt-get -y install gcc
2525
RUN apt-get -y install gcc-multilib
2626
RUN apt-get -y install graphviz libgraphviz-dev
27+
RUN apt-get -y install zip
2728

2829

2930

tests/translators_loggers/test_translators_loggers.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ def run_workflow_streamflow(container, num_tasks, str_dirpath):
209209
uuid = output.decode().splitlines()[1].strip().split(" ")[0]
210210
exit_code, output = container.exec_run(cmd=f"streamflow prov {uuid}",
211211
user="wfcommons", stdout=True, stderr=True)
212+
exit_code, output = container.exec_run(cmd=f"mkdir RO-Crate",
213+
user="wfcommons", stdout=True, stderr=True)
214+
exit_code, output = container.exec_run(cmd=f"unzip *.zip -d ./RO-Crate",
215+
user="wfcommons", stdout=True, stderr=True)
212216

213217
def run_workflow_pegasus(container, num_tasks, str_dirpath):
214218
# Run the workflow!
@@ -264,18 +268,18 @@ class TestTranslators:
264268
@pytest.mark.parametrize(
265269
"backend",
266270
[
267-
"swiftt",
268-
"dask",
269-
"parsl",
270-
"nextflow",
271-
"nextflow_subworkflow",
272-
"airflow",
273-
"bash",
274-
"taskvine",
275-
"makeflow",
276-
"cwl",
271+
# "swiftt",
272+
# "dask",
273+
# "parsl",
274+
# "nextflow",
275+
# "nextflow_subworkflow",
276+
# "airflow",
277+
# "bash",
278+
# "taskvine",
279+
# "makeflow",
280+
# "cwl",
277281
"streamflow",
278-
"pegasus",
282+
# "pegasus",
279283
])
280284
@pytest.mark.unit
281285
# @pytest.mark.skip(reason="tmp")
@@ -330,8 +334,8 @@ def test_translator(self, backend) -> None:
330334
parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["cpu-benchmark","stress-ng", "wfbench"])
331335
elif backend == "makeflow":
332336
parser = MakeflowLogsParser(execution_dir = dirpath, resource_monitor_logs_dir = dirpath / "monitor_data/")
333-
# elif backend == "streamflow":
334-
# parser =ROCrateLogsParser(dirpath / "work/wfcommons/most-recent/wfbench")
337+
elif backend == "streamflow":
338+
parser = ROCrateLogsParser(dirpath / "RO-Crate", steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"])
335339

336340
if parser is not None:
337341
sys.stderr.write(f"[{backend}] Parsing the logs...\n")
@@ -340,6 +344,18 @@ def test_translator(self, backend) -> None:
340344

341345
original_workflow : Workflow = benchmark.workflow
342346

347+
# print(original_workflow.tasks)
348+
# print("======")
349+
# print(reconstructed_workflow.tasks)
350+
for task_name in original_workflow.tasks.keys():
351+
original_task = original_workflow.tasks[task_name]
352+
reconstructed_task = reconstructed_workflow.tasks[task_name]
353+
print("ORIGINAL:", original_task.task_id, "RECONSTRUCTED:", reconstructed_task.task_id)
354+
print(" NUM_INPUT_FILES: ", len(original_task.input_files), len(reconstructed_task.input_files))
355+
print(" NUM_OUTPUT_FILES: ", len(original_task.output_files), len(reconstructed_task.output_files))
356+
print(" INPUT FILES: ", [f.file_id for f in original_task.input_files], [f.file_id for f in reconstructed_task.input_files])
357+
print(" OUTPUT FILES: ", [f.file_id for f in original_task.output_files], [f.file_id for f in reconstructed_task.output_files])
358+
343359
_compare_workflows(original_workflow, reconstructed_workflow)
344360

345361
# Shutdown the container (weirdly, container is already shutdown by now... not sure how)

wfcommons/wfinstances/logs/ro_crate.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,16 @@ class ROCrateLogsParser(LogsParser):
4444
def __init__(self,
4545
crate_dir: pathlib.Path,
4646
description: Optional[str] = None,
47-
logger: Optional[Logger] = None) -> None:
47+
logger: Optional[Logger] = None,
48+
steps_to_ignore: Optional[list[str]]=None) -> None:
4849
"""Create an object of the RO crate parser."""
4950

5051
# TODO: Decide if these should be RO crate or Streamflow or whatev
5152
super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger)
5253

5354
# Sanity check
55+
if steps_to_ignore is None:
56+
steps_to_ignore = []
5457
if not crate_dir.is_dir():
5558
raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}')
5659

@@ -63,6 +66,10 @@ def __init__(self,
6366

6467
self.file_objects = {}
6568

69+
self.task_id_name_map: dict[str, str] = {}
70+
71+
self.steps_to_ignore = steps_to_ignore
72+
6673

6774
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
6875
"""
@@ -113,6 +120,16 @@ def _create_tasks(self, create_actions, main_workflow_id):
113120
self._process_main_workflow(create_action)
114121
continue
115122

123+
create_action['name'] = create_action['name'].removeprefix("Run of workflow/")
124+
125+
# Below would remove the "file.cwl#" tag, which runs the risk
126+
# of non-uniqueness of action names perhaps
127+
# create_action['name'] = create_action['name'].split('#', 1)[-1]
128+
129+
# Check if we should ignore this step
130+
if create_action["name"] in self.steps_to_ignore:
131+
continue
132+
116133
# Get all input & output for the create_action
117134
input = [obj['@id'] for obj in create_action['object']]
118135
output = [obj['@id'] for obj in create_action['result']]
@@ -121,17 +138,19 @@ def _create_tasks(self, create_actions, main_workflow_id):
121138
input_files = self._filter_file_ids(input)
122139
output_files = self._filter_file_ids(output)
123140

124-
create_action['name'] = create_action['name'].removeprefix("Run of workflow/")
125141

126142
task = Task(name=create_action['name'],
127-
task_id=create_action['@id'],
143+
task_id=create_action['name'],
144+
# task_id=create_action['name'] + "_" + create_action['@id'],
128145
task_type=TaskType.COMPUTE,
129146
runtime=self._time_diff(create_action['startTime'], create_action['endTime']),
130147
executed_at=create_action['startTime'],
131148
input_files=self._get_file_objects(input_files),
132149
output_files=self._get_file_objects(output_files),
133150
logger=self.logger)
134151
self.workflow.add_task(task)
152+
self.task_id_name_map[create_action['@id']] = create_action['name']
153+
# self.task_id_name_map[create_action['@id']] = create_action['name'] + "_" + create_action['@id']
135154

136155
# For each file, track which task(s) it is in/output for
137156
for infile in input_files:
@@ -162,20 +181,29 @@ def _add_dependencies(self, files, instruments):
162181
for file in files.values():
163182
for parent in file.get('out', []):
164183
for child in file.get('in', []):
165-
self.workflow.add_dependency(parent, child)
184+
# self.workflow.add_dependency(parent, child)
185+
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
166186

167187
# Assumes
168188
parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
169189
for parameter_connection in parameter_connections:
170-
source = parameter_connection["sourceParameter"]["@id"]
171-
source = source.rsplit("#", 1)[0] # Trim to get instrument
190+
# parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
191+
# which is bad design but whatever
192+
source_parameters = parameter_connection["sourceParameter"]
193+
if not isinstance(source_parameters, list):
194+
source_parameters = [source_parameters]
195+
196+
for item in source_parameters:
197+
source = item["@id"]
198+
source = source.rsplit("#", 1)[0] # Trim to get instrument
199+
200+
target = parameter_connection["targetParameter"]["@id"]
201+
target = target.rsplit("#", 1)[0] # Trim to get instrument
172202

173-
target = parameter_connection["targetParameter"]["@id"]
174-
target = target.rsplit("#", 1)[0] # Trim to get instrument
203+
for parent in instruments.get(source, []):
204+
for child in instruments.get(target, []):
205+
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
175206

176-
for parent in instruments.get(source, []):
177-
for child in instruments.get(target, []):
178-
self.workflow.add_dependency(parent, child)
179207

180208
def _time_diff(self, start_time, end_time):
181209
diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)

0 commit comments

Comments
 (0)