Skip to content

Commit 17f4ee5

Browse files
authored
Merge pull request #82 from wfcommons/nextflow_improvements
Nextflow improvements
2 parents 654ea93 + 8d44fba commit 17f4ee5

2 files changed

Lines changed: 51 additions & 26 deletions

File tree

docs/source/generating_workflow_benchmarks.rst

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -88,58 +88,58 @@ description in :ref:`json-format-label`.
8888
to execute memory-intensive threads. Therefore, it is crucial to ensure that
8989
:code:`stress-ng` is installed on all worker nodes.
9090

91-
Nextflow
91+
92+
Dask
9293
++++++++
93-
`Nextflow <https://www.nextflow.io/>`_ is a workflow management system that enables
94-
the development of portable and reproducible workflows. It supports deploying workflows
95-
on a variety of execution platforms including local, HPC schedulers, and cloud-based
94+
`Dask <https://www.dask.org/>`_ is an open-source library for parallel computing
95+
in Python. It makes it possible to easily implement and execute workflows local machines, HPC cluster schedulers, and cloud-based
9696
and container-based environments. Below, we provide an example on how to generate
97-
workflow benchmark for running with Nextflow::
97+
workflow benchmark for running with Dask::
9898

9999
import pathlib
100100

101101
from wfcommons import BlastRecipe
102-
from wfcommons.wfbench import WorkflowBenchmark, NextflowTranslator
102+
from wfcommons.wfbench import WorkflowBenchmark, DaskTranslator
103103

104104
# create a workflow benchmark object to generate specifications based on a recipe
105105
benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500)
106106

107107
# generate a specification based on performance characteristics
108108
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6)
109109

110-
# generate a Nextflow workflow
111-
translator = NextflowTranslator(benchmark.workflow)
112-
translator.translate(output_folder=pathlib.Path("./nextflow-wf/""))
113-
114-
.. warning::
115-
116-
Nextflow's way of defining workflows does not support tasks with iterations i.e. tasks
117-
that depend on another instance of the same abstract task. Thus, the translator
118-
fails when you try to translate a workflow with iterations.
110+
# generate a Dask workflow
111+
translator = DaskTranslator(benchmark.workflow)
112+
translator.translate(output_folder=pathlib.Path("./dask-wf/""))
119113

120-
Dask
114+
Nextflow
121115
++++++++
122-
`Dask <https://www.dask.org/>`_ is an open-source library for parallel computing
123-
in Python. It makes it possible to easily implement and execute workflows local machines, HPC cluster schedulers, and cloud-based
116+
117+
`Nextflow <https://www.nextflow.io/>`_ is a workflow management system that enables
118+
the development of portable and reproducible workflows. It supports deploying workflows
119+
on a variety of execution platforms including local, HPC schedulers, and cloud-based
124120
and container-based environments. Below, we provide an example on how to generate
125-
workflow benchmark for running with Dask::
121+
workflow benchmark for running with Nextflow::
126122

127123
import pathlib
128124

129125
from wfcommons import BlastRecipe
130-
from wfcommons.wfbench import WorkflowBenchmark, DaskTranslator
126+
from wfcommons.wfbench import WorkflowBenchmark, NextflowTranslator
131127

132128
# create a workflow benchmark object to generate specifications based on a recipe
133129
benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500)
134130

135131
# generate a specification based on performance characteristics
136132
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6)
137133

138-
# generate a Dask workflow
139-
translator = DaskTranslator(benchmark.workflow)
140-
translator.translate(output_folder=pathlib.Path("./dask-wf/""))
134+
# generate a Nextflow workflow
135+
translator = NextflowTranslator(benchmark.workflow)
136+
translator.translate(output_folder=pathlib.Path("./nextflow-wf/""))
141137

138+
.. warning::
142139

140+
Nextflow's way of defining workflows does not support tasks with iterations i.e. tasks
141+
that depend on another instance of the same abstract task. Thus, the translator
142+
fails when you try to translate a workflow with iterations.
143143

144144
Pegasus
145145
+++++++
@@ -175,6 +175,31 @@ for running with Pegasus::
175175
the :code:`lock_files_folder` parameter when using
176176
:meth:`~wfcommons.wfbench.bench.WorkflowBenchmark.create_benchmark`.
177177

178+
PyCOMPSs
179+
++++++++
180+
181+
`PyCOMPSs <https://compss.bsc.es/>`_ is a programming model and runtime that
182+
enables the parallel execution of Python applications on distributed computing
183+
infrastructures. It allows developers to define tasks using simple Python
184+
decorators, automatically handling task scheduling, data dependencies, and
185+
resource management.. Below, we provide an example on how to generate workflow
186+
benchmark for running with PyCOMPSs::
187+
188+
import pathlib
189+
190+
from wfcommons import CyclesRecipe
191+
from wfcommons.wfbench import WorkflowBenchmark, PyCompssTranslator
192+
193+
# create a workflow benchmark object to generate specifications based on a recipe
194+
benchmark = WorkflowBenchmark(recipe=CyclesRecipe, num_tasks=200)
195+
196+
# generate a specification based on performance characteristics
197+
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=500, data=1000, percent_cpu=0.8)
198+
199+
# generate a PyCOMPSs workflow
200+
translator = PyCompssTranslator(benchmark.workflow)
201+
translator.translate(output_folder=pathlib.Path("./pycompss-wf/"))
202+
178203
Swift/T
179204
+++++++
180205

wfcommons/wfbench/translator/nextflow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,16 +194,16 @@ def _create_task_script(output_folder: pathlib.Path, task: Task):
194194
# Generate input spec
195195
input_spec = "'\\["
196196
for f in task.input_files:
197-
input_spec += "\"" + str(output_folder.joinpath(f"data/{f.file_id}")) + "\","
197+
input_spec += f"\"{output_folder.resolve()}/data/{f.file_id}\","
198198
input_spec = input_spec[:-1] + "\\]'"
199199

200200
# Generate output spec
201201
output_spec = "'\\{"
202202
for f in task.output_files:
203-
output_spec += "\"" + str(output_folder.joinpath(f"data/{f.file_id}")) + "\":" + str(f.size)+ ","
203+
output_spec += f"\"{output_folder.resolve()}/data/{f.file_id}\":{str(f.size)},"
204204
output_spec = output_spec[:-1] + "\\}'"
205205

206-
code += str(output_folder.joinpath(f"bin/{task.program} "))
206+
code += f"{output_folder.resolve()}/bin/{task.program} "
207207

208208
for a in task.args:
209209
if "--output-files" in a:

0 commit comments

Comments
 (0)