Skip to content

Commit 4316c38

Browse files
committed
Added testing to Streamflow/RO-Crate
1 parent 59775c2 commit 4316c38

2 files changed

Lines changed: 33 additions & 58 deletions

File tree

tests/translators_loggers/test_translators_loggers.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -268,18 +268,18 @@ class TestTranslators:
268268
@pytest.mark.parametrize(
269269
"backend",
270270
[
271-
# "swiftt",
272-
# "dask",
273-
# "parsl",
274-
# "nextflow",
275-
# "nextflow_subworkflow",
276-
# "airflow",
277-
# "bash",
278-
# "taskvine",
279-
# "makeflow",
280-
# "cwl",
271+
"swiftt",
272+
"dask",
273+
"parsl",
274+
"nextflow",
275+
"nextflow_subworkflow",
276+
"airflow",
277+
"bash",
278+
"taskvine",
279+
"makeflow",
280+
"cwl",
281281
"streamflow",
282-
# "pegasus",
282+
"pegasus",
283283
])
284284
@pytest.mark.unit
285285
# @pytest.mark.skip(reason="tmp")
@@ -346,18 +346,9 @@ def test_translator(self, backend) -> None:
346346

347347
original_workflow : Workflow = benchmark.workflow
348348

349-
# print(original_workflow.tasks)
350-
# print("======")
351-
# print(reconstructed_workflow.tasks)
352349
for task_name in original_workflow.tasks.keys():
353350
original_task = original_workflow.tasks[task_name]
354351
reconstructed_task = reconstructed_workflow.tasks["main.cwl#" + task_name]
355-
print("ORIGINAL:", original_task.task_id, "RECONSTRUCTED:", reconstructed_task.task_id)
356-
print(" NUM_INPUT_FILES: ", len(original_task.input_files), len(reconstructed_task.input_files))
357-
print(" NUM_OUTPUT_FILES: ", len(original_task.output_files), len(reconstructed_task.output_files))
358-
print(" INPUT FILES: ", [f.file_id for f in original_task.input_files], [f.file_id for f in reconstructed_task.input_files])
359-
print(" OUTPUT FILES: ", [f.file_id for f in original_task.output_files], [f.file_id for f in reconstructed_task.output_files])
360-
361352
_compare_workflows(original_workflow, reconstructed_workflow)
362353

363354
# Shutdown the container (weirdly, container is already shutdown by now... not sure how)

wfcommons/wfinstances/logs/ro_crate.py

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,6 @@ def _construct_data_file_id_name_map(self):
122122
continue
123123
alternate_name = item["alternateName"]
124124
self.data_file_id_name_map[id] = alternate_name
125-
print("=== FILE MAP ===")
126-
print(self.data_file_id_name_map)
127-
print("==== END FILE MAP ===")
128125

129126

130127
def _create_tasks(self, create_actions, main_workflow_id):
@@ -141,8 +138,8 @@ def _create_tasks(self, create_actions, main_workflow_id):
141138
continue
142139

143140
create_action['name'] = create_action['name'].removeprefix("Run of workflow/")
144-
print("***************************************")
145-
print("DEALING WITH TASK:", create_action['name'])
141+
# print("***************************************")
142+
# print("DEALING WITH TASK:", create_action['name'])
146143

147144
# Below would remove the "file.cwl#" tag, which runs the risk
148145
# of non-uniqueness of action names perhaps
@@ -155,19 +152,11 @@ def _create_tasks(self, create_actions, main_workflow_id):
155152
# Get all input & output for the create_action
156153
input = [obj['@id'] for obj in create_action['object']]
157154
output = [obj['@id'] for obj in create_action['result']]
158-
# print("RAW INPUT FILES: ", input)
159-
# print("RAW OUTPUT FILES: ", output)
160155

161156
# Filter for actual files
162157
input_files = self._filter_file_ids(input)
163-
print("GOT THESE IDS FOR INPUT FILES: ", input_files)
164-
print("TRANSLATED TO REAL FILE NAMES: ", [self.data_file_id_name_map[f] for f in input_files])
165158
output_files = self._filter_file_ids(output)
166-
print("GOT THESE IDS FOR OUTPUT FILES: ", output_files)
167-
print("TRANSLATED TO REAL FILE NAMES: ", [self.data_file_id_name_map[f] for f in output_files])
168159

169-
print("FILTERED INPUT FILES: ", input_files)
170-
print("FILTERED OUTPUT FILES: ", output_files)
171160

172161
task = Task(name=create_action['name'],
173162
task_id=create_action['name'],
@@ -208,31 +197,31 @@ def _create_tasks(self, create_actions, main_workflow_id):
208197
self._add_dependencies(files, instruments)
209198

210199
def _add_dependencies(self, files, instruments):
200+
201+
# File dependencies
211202
for file in files.values():
212203
for parent in file.get('out', []):
213204
for child in file.get('in', []):
214-
# self.workflow.add_dependency(parent, child)
215205
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
216206

217-
# Assumes
218-
parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
219-
for parameter_connection in parameter_connections:
220-
# parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
221-
# which is bad design but whatever
222-
source_parameters = parameter_connection["sourceParameter"]
223-
if not isinstance(source_parameters, list):
224-
source_parameters = [source_parameters]
225-
226-
for item in source_parameters:
227-
source = item["@id"]
228-
source = source.rsplit("#", 1)[0] # Trim to get instrument
229-
230-
target = parameter_connection["targetParameter"]["@id"]
231-
target = target.rsplit("#", 1)[0] # Trim to get instrument
232-
233-
for parent in instruments.get(source, []):
234-
for child in instruments.get(target, []):
235-
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
207+
# THIS IS COMMENTED OUT AT IT SEEMS TO ADD TONS OF NON-EXISTING DEPENDENCIES ON WORKFLOW BENCHMARKS
208+
# (FOR INSTANCE, IT TOTALLY BREAKS THE BENCHMARK WORKFLOW DUE TO ALL OF THEM USING shell.cwl#output_files
209+
# parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
210+
# for parameter_connection in parameter_connections:
211+
# # parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
212+
# # which is bad design but whatever
213+
# source_parameters = parameter_connection["sourceParameter"]
214+
# if not isinstance(source_parameters, list):
215+
# source_parameters = [source_parameters]
216+
# source = item["@id"]
217+
# source = source.rsplit("#", 1)[0] # Trim to get instrument
218+
#
219+
# target = parameter_connection["targetParameter"]["@id"]
220+
# target = target.rsplit("#", 1)[0] # Trim to get instrument
221+
#
222+
# for parent in instruments.get(source, []):
223+
# for child in instruments.get(target, []):
224+
# self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
236225

237226

238227
def _time_diff(self, start_time, end_time):
@@ -254,19 +243,14 @@ def _filter_file_ids(self, ids):
254243

255244
file_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'File', ids))
256245
property_value_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'PropertyValue', ids))
257-
# print("FILE_IDS =", file_ids)
258-
# print("PROPERTY_VALUE_IDS =", property_value_ids)
259246
for property_value_id in property_value_ids:
260247
property_values = self.lookup.get(property_value_id)['value']
261-
# print("PROPERTY_VALUES =", property_values)
262248
if isinstance(property_values, dict):
263249
property_values = [property_values]
264250

265251
# Filter out values without "@id"s (i.e. int values, etc.)
266252
pv_contained_ids = list(filter(lambda x: isinstance(x, dict) and "@id" in x, property_values))
267-
# print("PV_CONTAINED_IDS.1 = ", pv_contained_ids)
268253
pv_contained_ids = [obj["@id"] for obj in pv_contained_ids]
269-
# print("PV_CONTAINED_IDS.2 = ", pv_contained_ids)
270254

271255
# Recurse to verify everything's a file
272256
pv_filtered_ids = self._filter_file_ids(pv_contained_ids)

0 commit comments

Comments
 (0)