Skip to content

Commit 47fc44e

Browse files
Merge pull request #142 from wfcommons/streamflow-ro-crate
Improvements to RO-crate logging and testing
2 parents eac0a93 + 767ec69 commit 47fc44e

7 files changed

Lines changed: 114 additions & 1532 deletions

File tree

docs/source/dev_api_wfinstances.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ wfcommons.wfinstances.logs.makeflow
5050
:private-members:
5151
:noindex:
5252

53+
wfcommons.wfinstances.logs.taskvine
54+
-----------------------------------
55+
56+
.. automodule:: wfcommons.wfinstances.logs.taskvine
57+
:members:
58+
:undoc-members:
59+
:show-inheritance:
60+
:private-members:
61+
:noindex:
62+
5363
wfcommons.wfinstances.logs.nextflow
5464
-----------------------------------
5565

docs/source/user_api_wfbench.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ wfcommons.wfbench.translator.cwl
3838
:undoc-members:
3939
:show-inheritance:
4040

41+
wfcommons.wfbench.translator.streamflow
42+
--------------------------------
43+
44+
.. automodule:: wfcommons.wfbench.translator.streamflow
45+
:members:
46+
:undoc-members:
47+
:show-inheritance:
48+
4149
wfcommons.wfbench.translator.dask
4250
---------------------------------
4351

tests/translators_loggers/Dockerfile.streamflow

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ RUN apt-get -y install vim --fix-missing
2424
RUN apt-get -y install gcc
2525
RUN apt-get -y install gcc-multilib
2626
RUN apt-get -y install graphviz libgraphviz-dev
27+
RUN apt-get -y install zip
2728

2829

2930

tests/translators_loggers/test_translators_loggers.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ def run_workflow_streamflow(container, num_tasks, str_dirpath):
209209
uuid = output.decode().splitlines()[1].strip().split(" ")[0]
210210
exit_code, output = container.exec_run(cmd=f"streamflow prov {uuid}",
211211
user="wfcommons", stdout=True, stderr=True)
212+
exit_code, output = container.exec_run(cmd=f"mkdir RO-Crate",
213+
user="wfcommons", stdout=True, stderr=True)
214+
exit_code, output = container.exec_run(cmd=f"unzip *.zip -d ./RO-Crate",
215+
user="wfcommons", stdout=True, stderr=True)
212216

213217
def run_workflow_pegasus(container, num_tasks, str_dirpath):
214218
# Run the workflow!
@@ -330,8 +334,10 @@ def test_translator(self, backend) -> None:
330334
parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["cpu-benchmark","stress-ng", "wfbench"])
331335
elif backend == "makeflow":
332336
parser = MakeflowLogsParser(execution_dir = dirpath, resource_monitor_logs_dir = dirpath / "monitor_data/")
333-
# elif backend == "streamflow":
334-
# parser =ROCrateLogsParser(dirpath / "work/wfcommons/most-recent/wfbench")
337+
elif backend == "streamflow":
338+
parser = ROCrateLogsParser(dirpath / "RO-Crate",
339+
steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"],
340+
file_extensions_to_ignore=[".out", ".err"])
335341

336342
if parser is not None:
337343
sys.stderr.write(f"[{backend}] Parsing the logs...\n")

uv.lock

Lines changed: 0 additions & 1509 deletions
This file was deleted.

wfcommons/wfbench/translator/streamflow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ def __init__(self,
3838
super().__init__(workflow, logger)
3939

4040
def translate(self, output_folder: pathlib.Path) -> None:
41+
"""
42+
Translate a workflow benchmark description (WfFormat) into an actual workflow application.
43+
44+
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
45+
:type output_folder: pathlib.Path
46+
"""
4147
# Perform the CWL translation (which will create the output folder)
4248
from wfcommons.wfbench import CWLTranslator
4349
cwl_translator = CWLTranslator(workflow=self.workflow, logger=self.logger)

wfcommons/wfinstances/logs/ro_crate.py

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,18 @@ class ROCrateLogsParser(LogsParser):
4444
def __init__(self,
4545
crate_dir: pathlib.Path,
4646
description: Optional[str] = None,
47-
logger: Optional[Logger] = None) -> None:
47+
logger: Optional[Logger] = None,
48+
steps_to_ignore: Optional[list[str]]=[],
49+
file_extensions_to_ignore: Optional[list[str]]=[],
50+
) -> None:
4851
"""Create an object of the RO crate parser."""
4952

5053
# TODO: Decide if these should be RO crate or Streamflow or whatev
5154
super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger)
5255

5356
# Sanity check
57+
if steps_to_ignore is None:
58+
steps_to_ignore = []
5459
if not crate_dir.is_dir():
5560
raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}')
5661

@@ -63,6 +68,12 @@ def __init__(self,
6368

6469
self.file_objects = {}
6570

71+
self.task_id_name_map: dict[str, str] = {}
72+
self.data_file_id_name_map: dict[str, str] = {}
73+
74+
self.steps_to_ignore = steps_to_ignore
75+
self.file_extensions_to_ignore = file_extensions_to_ignore
76+
6677

6778
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
6879
"""
@@ -89,16 +100,29 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
89100
# Dictionary of ro-crate objects by "@id"
90101
self.lookup = {item["@id"]: item for item in self.graph_data}
91102

103+
# Dictionary of application data files
104+
self._construct_data_file_id_name_map()
105+
92106
# Find id of the main workflow
93107
overview = self.lookup.get("./")
94108
main_workflow_id = overview.get("mainEntity").get("@id")
95109

96110
create_actions = list(filter((lambda x: x.get('@type') == "CreateAction"), self.graph_data))
97111
self._create_tasks(create_actions, main_workflow_id)
98112

113+
return self.workflow
99114

100115

101-
return self.workflow
116+
def _construct_data_file_id_name_map(self):
117+
for item in self.graph_data:
118+
if item["@type"] != "File":
119+
continue
120+
id = item["@id"]
121+
if "alternateName" not in item:
122+
continue
123+
alternate_name = item["alternateName"]
124+
self.data_file_id_name_map[id] = alternate_name
125+
102126

103127
def _create_tasks(self, create_actions, main_workflow_id):
104128
# Object to track dependencies between tasks based on files
@@ -113,6 +137,18 @@ def _create_tasks(self, create_actions, main_workflow_id):
113137
self._process_main_workflow(create_action)
114138
continue
115139

140+
create_action['name'] = create_action['name'].removeprefix("Run of workflow/")
141+
# print("***************************************")
142+
# print("DEALING WITH TASK:", create_action['name'])
143+
144+
# Below would remove the "file.cwl#" tag, which runs the risk
145+
# of non-uniqueness of action names perhaps
146+
# create_action['name'] = create_action['name'].split('#', 1)[-1]
147+
148+
# Check if we should ignore this step
149+
if create_action["name"] in self.steps_to_ignore:
150+
continue
151+
116152
# Get all input & output for the create_action
117153
input = [obj['@id'] for obj in create_action['object']]
118154
output = [obj['@id'] for obj in create_action['result']]
@@ -121,17 +157,19 @@ def _create_tasks(self, create_actions, main_workflow_id):
121157
input_files = self._filter_file_ids(input)
122158
output_files = self._filter_file_ids(output)
123159

124-
create_action['name'] = create_action['name'].removeprefix("Run of workflow/")
125160

126161
task = Task(name=create_action['name'],
127-
task_id=create_action['@id'],
162+
task_id=create_action['name'],
163+
# task_id=create_action['name'] + "_" + create_action['@id'],
128164
task_type=TaskType.COMPUTE,
129165
runtime=self._time_diff(create_action['startTime'], create_action['endTime']),
130166
executed_at=create_action['startTime'],
131167
input_files=self._get_file_objects(input_files),
132168
output_files=self._get_file_objects(output_files),
133169
logger=self.logger)
134170
self.workflow.add_task(task)
171+
self.task_id_name_map[create_action['@id']] = create_action['name']
172+
# self.task_id_name_map[create_action['@id']] = create_action['name'] + "_" + create_action['@id']
135173

136174
# For each file, track which task(s) it is in/output for
137175
for infile in input_files:
@@ -159,23 +197,32 @@ def _create_tasks(self, create_actions, main_workflow_id):
159197
self._add_dependencies(files, instruments)
160198

161199
def _add_dependencies(self, files, instruments):
200+
201+
# File dependencies
162202
for file in files.values():
163203
for parent in file.get('out', []):
164204
for child in file.get('in', []):
165-
self.workflow.add_dependency(parent, child)
166-
167-
# Assumes
168-
parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
169-
for parameter_connection in parameter_connections:
170-
source = parameter_connection["sourceParameter"]["@id"]
171-
source = source.rsplit("#", 1)[0] # Trim to get instrument
205+
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
206+
207+
# THIS IS COMMENTED OUT AT IT SEEMS TO ADD TONS OF NON-EXISTING DEPENDENCIES ON WORKFLOW BENCHMARKS
208+
# (FOR INSTANCE, IT TOTALLY BREAKS THE BENCHMARK WORKFLOW DUE TO ALL OF THEM USING shell.cwl#output_files
209+
# parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
210+
# for parameter_connection in parameter_connections:
211+
# # parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
212+
# # which is bad design but whatever
213+
# source_parameters = parameter_connection["sourceParameter"]
214+
# if not isinstance(source_parameters, list):
215+
# source_parameters = [source_parameters]
216+
# source = item["@id"]
217+
# source = source.rsplit("#", 1)[0] # Trim to get instrument
218+
#
219+
# target = parameter_connection["targetParameter"]["@id"]
220+
# target = target.rsplit("#", 1)[0] # Trim to get instrument
221+
#
222+
# for parent in instruments.get(source, []):
223+
# for child in instruments.get(target, []):
224+
# self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
172225

173-
target = parameter_connection["targetParameter"]["@id"]
174-
target = target.rsplit("#", 1)[0] # Trim to get instrument
175-
176-
for parent in instruments.get(source, []):
177-
for child in instruments.get(target, []):
178-
self.workflow.add_dependency(parent, child)
179226

180227
def _time_diff(self, start_time, end_time):
181228
diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)
@@ -186,19 +233,20 @@ def _get_file_objects(self, files):
186233
output = []
187234
for file in files:
188235
if file not in self.file_objects:
189-
self.file_objects[file] = File(file_id=file,
236+
self.file_objects[file] = File(file_id=self.data_file_id_name_map[file],
190237
size=os.path.getsize(f"{self.crate_dir}/{file}"),
191238
logger=self.logger)
192239
output.append(self.file_objects[file])
193240
return output
194241

195242
def _filter_file_ids(self, ids):
196-
# Given a list of "@id"s, returns those with the File type as well as unpacks PropertyValue into Files.
197-
file_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'File', ids))
198243

244+
file_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'File', ids))
199245
property_value_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'PropertyValue', ids))
200246
for property_value_id in property_value_ids:
201247
property_values = self.lookup.get(property_value_id)['value']
248+
if isinstance(property_values, dict):
249+
property_values = [property_values]
202250

203251
# Filter out values without "@id"s (i.e. int values, etc.)
204252
pv_contained_ids = list(filter(lambda x: isinstance(x, dict) and "@id" in x, property_values))
@@ -210,7 +258,19 @@ def _filter_file_ids(self, ids):
210258
# Filter duplicates while adding
211259
file_ids = list(set(file_ids + pv_filtered_ids))
212260

213-
return file_ids
261+
# Removing files based on file extensions
262+
to_return = []
263+
for file_id in file_ids:
264+
to_ignore = False
265+
for suffix in self.file_extensions_to_ignore:
266+
if self.data_file_id_name_map[file_id].endswith(suffix):
267+
to_ignore = True
268+
break
269+
if not to_ignore:
270+
to_return.append(file_id)
271+
272+
return to_return
273+
214274
def _process_main_workflow(self, main_workflow):
215275
self.workflow.makespan = self._time_diff(main_workflow['startTime'], main_workflow['endTime'])
216276
self.workflow.executed_at = main_workflow['startTime']

0 commit comments

Comments
 (0)