Skip to content

Commit 318da88

Browse files
committed
Merge branch 'main' into cwl_stdout_stderr
2 parents 8f0aa9d + 7328279 commit 318da88

3 files changed

Lines changed: 42 additions & 34 deletions

File tree

tests/translators_loggers/Dockerfile.streamflow

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ RUN apt-get -y install gcc-multilib
2626
RUN apt-get -y install graphviz libgraphviz-dev
2727
RUN apt-get -y install zip
2828

29-
30-
3129
# Python stuff
3230
RUN apt-get -y install python3 python3-pip
3331
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
@@ -41,7 +39,9 @@ RUN apt-get -y install stress-ng
4139

4240
# Streamflow
4341
RUN apt-get -y install nodejs
44-
RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev14
42+
#RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev14
43+
# For now, the above hasnt' been released yet, so we get a commit tag
44+
RUN git clone https://github.com/alpha-unito/streamflow.git && cd streamflow && git checkout 4cbe9244165a81114a73f5bb2ff5067a85a5600c && python3 -m pip install --break-system-packages .
4545

4646
# Add wfcommons user
4747
RUN useradd -ms /bin/bash wfcommons

tests/translators_loggers/test_translators_loggers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,8 @@ def test_translator(self, backend) -> None:
338338
elif backend == "streamflow":
339339
parser = ROCrateLogsParser(dirpath / "RO-Crate",
340340
steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"],
341-
file_extensions_to_ignore=[".out", ".err"])
341+
file_extensions_to_ignore=[".out", ".err"],
342+
instruments_to_ignore=["shell.cwl"])
342343

343344
if parser is not None:
344345
sys.stderr.write(f"[{backend}] Parsing the logs...\n")

wfcommons/wfinstances/logs/ro_crate.py

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,27 @@ class ROCrateLogsParser(LogsParser):
3939
:type description: Optional[str]
4040
:param logger: The logger where to log information/warning or errors (optional).
4141
:type logger: Optional[Logger]
42+
:param steps_to_ignore: Names of CWL steps that should be ignored in the translation
43+
:type steps_to_ignore: Optional[list[str]]
44+
:param file_extensions_to_ignore: File extensions that should be ignored in the translation
45+
:type file_extensions_to_ignore: Optional[list[str]]
46+
:param instruments_to_ignore: Names of instruments that should be ignored in the translation
47+
:type instruments_to_ignore: Optional[list[str]]
4248
"""
4349

4450
def __init__(self,
4551
crate_dir: pathlib.Path,
4652
description: Optional[str] = None,
4753
logger: Optional[Logger] = None,
48-
steps_to_ignore: Optional[list[str]]=[],
49-
file_extensions_to_ignore: Optional[list[str]]=[],
54+
steps_to_ignore: Optional[list[str]] = None,
55+
file_extensions_to_ignore: Optional[list[str]] = None,
56+
instruments_to_ignore: Optional[list[str]] = None,
5057
) -> None:
5158
"""Create an object of the RO crate parser."""
5259

53-
# TODO: Decide if these should be RO crate or Streamflow or whatev
5460
super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger)
5561

5662
# Sanity check
57-
if steps_to_ignore is None:
58-
steps_to_ignore = []
5963
if not crate_dir.is_dir():
6064
raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}')
6165

@@ -71,8 +75,9 @@ def __init__(self,
7175
self.task_id_name_map: dict[str, str] = {}
7276
self.data_file_id_name_map: dict[str, str] = {}
7377

74-
self.steps_to_ignore = steps_to_ignore
75-
self.file_extensions_to_ignore = file_extensions_to_ignore
78+
self.steps_to_ignore : list[str] = steps_to_ignore or []
79+
self.file_extensions_to_ignore : list[str] = file_extensions_to_ignore or []
80+
self.instruments_to_ignore : list[str] = instruments_to_ignore or []
7681

7782

7883
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
@@ -159,17 +164,17 @@ def _create_tasks(self, create_actions, main_workflow_id):
159164

160165

161166
task = Task(name=create_action['name'],
162-
task_id=create_action['name'],
163-
# task_id=create_action['name'] + "_" + create_action['@id'],
167+
# task_id=create_action['name'],
168+
task_id=create_action['name'] + "_" + create_action['@id'],
164169
task_type=TaskType.COMPUTE,
165170
runtime=self._time_diff(create_action['startTime'], create_action['endTime']),
166171
executed_at=create_action['startTime'],
167172
input_files=self._get_file_objects(input_files),
168173
output_files=self._get_file_objects(output_files),
169174
logger=self.logger)
170175
self.workflow.add_task(task)
171-
self.task_id_name_map[create_action['@id']] = create_action['name']
172-
# self.task_id_name_map[create_action['@id']] = create_action['name'] + "_" + create_action['@id']
176+
# self.task_id_name_map[create_action['@id']] = create_action['name']
177+
self.task_id_name_map[create_action['@id']] = create_action['name'] + "_" + create_action['@id']
173178

174179
# For each file, track which task(s) it is in/output for
175180
for infile in input_files:
@@ -204,25 +209,27 @@ def _add_dependencies(self, files, instruments):
204209
for child in file.get('in', []):
205210
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
206211

207-
# THIS IS COMMENTED OUT AT IT SEEMS TO ADD TONS OF NON-EXISTING DEPENDENCIES ON WORKFLOW BENCHMARKS
208-
# (FOR INSTANCE, IT TOTALLY BREAKS THE BENCHMARK WORKFLOW DUE TO ALL OF THEM USING shell.cwl#output_files
209-
# parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
210-
# for parameter_connection in parameter_connections:
211-
# # parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
212-
# # which is bad design but whatever
213-
# source_parameters = parameter_connection["sourceParameter"]
214-
# if not isinstance(source_parameters, list):
215-
# source_parameters = [source_parameters]
216-
# source = item["@id"]
217-
# source = source.rsplit("#", 1)[0] # Trim to get instrument
218-
#
219-
# target = parameter_connection["targetParameter"]["@id"]
220-
# target = target.rsplit("#", 1)[0] # Trim to get instrument
221-
#
222-
# for parent in instruments.get(source, []):
223-
# for child in instruments.get(target, []):
224-
# self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
225-
212+
parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
213+
for parameter_connection in parameter_connections:
214+
# parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
215+
# which is bad design but whatever
216+
source_parameters = parameter_connection["sourceParameter"]
217+
if not isinstance(source_parameters, list):
218+
source_parameters = [source_parameters]
219+
for item in source_parameters:
220+
source = item["@id"]
221+
source = source.rsplit("#", 1)[0] # Trim to get instrument
222+
223+
target = parameter_connection["targetParameter"]["@id"]
224+
target = target.rsplit("#", 1)[0] # Trim to get instrument
225+
226+
if source in self.instruments_to_ignore or target in self.instruments_to_ignore:
227+
continue
228+
# print("source", source, "----> target", target)
229+
230+
for parent in instruments.get(source, []):
231+
for child in instruments.get(target, []):
232+
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
226233

227234
def _time_diff(self, start_time, end_time):
228235
diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)

0 commit comments

Comments
 (0)