Skip to content

Commit a7a9547

Browse files
committed
nextflow translator: subworkflows
1 parent 4a9807a commit a7a9547

7 files changed

Lines changed: 79 additions & 570 deletions

File tree

docs/source/generating_workflow_benchmarks.rst

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,24 @@ workflow benchmark for running with Nextflow::
132132
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6)
133133

134134
# generate a Nextflow workflow
135-
translator = NextflowTranslator(benchmark.workflow)
136-
translator.translate(output_folder=pathlib.Path("./nextflow-wf/""))
135+
translator = NextflowTranslator(
136+
benchmark.workflow,
137+
use_subworkflows=False,
138+
max_tasks_per_subworkflow=1000,
139+
)
140+
translator.translate(output_folder=pathlib.Path("./nextflow-wf/"))
141+
142+
If you want to split large workflows across multiple Nextflow module files, enable
143+
subworkflows and set the maximum number of tasks per module. This produces a
144+
``modules/`` directory plus a top-level ``workflow.nf`` that includes and runs
145+
the modules sequentially::
146+
147+
translator = NextflowTranslator(
148+
benchmark.workflow,
149+
use_subworkflows=True,
150+
max_tasks_per_subworkflow=250,
151+
)
152+
translator.translate(output_folder=pathlib.Path("./nextflow-wf/"))
137153

138154
.. warning::
139155

tests/requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pytest
2+
pytest-cov
3+
docker

tests/translators_loggers/test_translators_loggers.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2025 The WfCommons Team.
4+
# Copyright (c) 2025-2026 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
@@ -105,6 +105,7 @@ def _additional_setup_swiftt(container):
105105
"dask": noop,
106106
"parsl": noop,
107107
"nextflow": noop,
108+
"nextflow_subworkflow": noop,
108109
"airflow": noop,
109110
"bash": noop,
110111
"taskvine": _additional_setup_taskvine,
@@ -203,6 +204,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
203204
"dask": run_workflow_dask,
204205
"parsl": run_workflow_parsl,
205206
"nextflow": run_workflow_nextflow,
207+
"nextflow_subworkflow": run_workflow_nextflow,
206208
"airflow": run_workflow_airflow,
207209
"bash": run_workflow_bash,
208210
"taskvine": run_workflow_taskvine,
@@ -216,6 +218,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
216218
"dask": DaskTranslator,
217219
"parsl": ParslTranslator,
218220
"nextflow": NextflowTranslator,
221+
"nextflow_subworkflow": NextflowTranslator,
219222
"airflow": AirflowTranslator,
220223
"bash": BashTranslator,
221224
"taskvine": TaskVineTranslator,
@@ -235,6 +238,7 @@ class TestTranslators:
235238
"dask",
236239
"parsl",
237240
"nextflow",
241+
"nextflow_subworkflow",
238242
"airflow",
239243
"bash",
240244
"taskvine",
@@ -256,7 +260,10 @@ def test_translator(self, backend) -> None:
256260

257261
# Perform the translation
258262
sys.stderr.write(f"\n[{backend}] Translating workflow...\n")
259-
translator = translator_classes[backend](benchmark.workflow)
263+
if backend == "nextflow_subworkflow":
264+
translator = translator_classes[backend](benchmark.workflow, use_subworkflows=True, max_tasks_per_subworkflow=10)
265+
else:
266+
translator = translator_classes[backend](benchmark.workflow)
260267
translator.translate(output_folder=dirpath)
261268

262269
# # Make the directory that holds the translation world-writable,
@@ -266,7 +273,7 @@ def test_translator(self, backend) -> None:
266273
# os.chmod(dirpath, 0o777)
267274

268275
# Start the Docker container
269-
container = _start_docker_container(backend, str_dirpath, str_dirpath, str_dirpath + "bin/")
276+
container = _start_docker_container(backend if backend != "nextflow_subworkflow" else "nextflow", str_dirpath, str_dirpath, str_dirpath + "bin/")
270277

271278
# Do whatever necessary setup
272279
additional_setup_methods[backend](container)

wfcommons/wfbench/translator/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from .cwl import CWLTranslator
1414
from .dask import DaskTranslator
1515
from .nextflow import NextflowTranslator
16-
from .nextflow_subworkflow import NextflowSubworkflowTranslator
1716
from .parsl import ParslTranslator
1817
from .pegasus import PegasusTranslator
1918
from .pycompss import PyCompssTranslator

wfcommons/wfbench/translator/nextflow.py

Lines changed: 46 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2021-2025 The WfCommons Team.
4+
# Copyright (c) 2021-2026 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
@@ -31,9 +31,8 @@ class NextflowTranslator(Translator):
3131
:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
3232
:type workflow: Union[Workflow, pathlib.Path]
3333
:param use_subworkflows: Whether to split the workflow into multiple module files.
34-
If None (default), automatically uses subworkflows for workflows with > 1000 tasks.
35-
:type use_subworkflows: Optional[bool]
36-
:param max_tasks_per_subworkflow: Maximum number of tasks per module file when using subworkflows (default: 1000).
34+
:type use_subworkflows: bool
35+
:param max_tasks_per_subworkflow: Maximum number of tasks per module file when using subworkflows.
3736
:type max_tasks_per_subworkflow: int
3837
:param max_parents_threshold: Tasks with more parents than this get their own module (default: 100).
3938
:type max_parents_threshold: int
@@ -44,8 +43,8 @@ class NextflowTranslator(Translator):
4443
"""
4544
def __init__(self,
4645
workflow: Union[Workflow, pathlib.Path],
47-
use_subworkflows: Optional[bool] = None,
48-
max_tasks_per_subworkflow: Optional[int] = 1000,
46+
use_subworkflows: bool = False,
47+
max_tasks_per_subworkflow: int = 1000,
4948
max_parents_threshold: Optional[int] = 100,
5049
slurm: Optional[bool] = False,
5150
logger: Optional[Logger] = None) -> None:
@@ -84,23 +83,17 @@ def translate(self, output_folder: pathlib.Path) -> None:
8483
# Create a topological order of the tasks
8584
sorted_tasks = self._get_tasks_in_topological_order()
8685

87-
# Determine whether to use subworkflows
88-
use_subworkflows = self.use_subworkflows
89-
if use_subworkflows is None:
90-
# Auto-detect: use subworkflows for large workflows
91-
use_subworkflows = len(sorted_tasks) > self.max_tasks_per_subworkflow
92-
9386
# Create the bash script for each task
9487
for task in sorted_tasks:
9588
self._create_task_script(task)
9689

97-
if use_subworkflows:
90+
if self.use_subworkflows:
9891
self._translate_with_subworkflows(output_folder, sorted_tasks)
9992
else:
10093
self._translate_single_file(output_folder, sorted_tasks)
10194

10295
# Create the README file
103-
self._write_readme_file(output_folder, use_subworkflows)
96+
self._write_readme_file(output_folder, self.use_subworkflows)
10497

10598
# =========================================================================
10699
# Common methods
@@ -252,26 +245,14 @@ def _generate_task_function(self, task: Task) -> str:
252245
code = f"// Function to call task {task.task_id}\n"
253246
function_name = task.task_id.replace(".", "_")
254247
code += f"def function_{function_name}(Map inputs) " + "{\n"
255-
256-
if self._find_parents(task.task_id):
257-
# Input channel mixing and then call
258-
code += f"\tdef {function_name}_necessary_input = Channel.empty()\n"
259-
for f in task.input_files:
260-
code += f"\t{function_name}_necessary_input = {function_name}_necessary_input.mix(inputs.{f.file_id})\n"
261-
code += f"\tdef {function_name}_necessary_input_future = {function_name}_necessary_input.collect()\n"
262-
code += f"\tdef {function_name}_produced_output = {function_name}({function_name}_necessary_input_future)\n"
263-
else:
264-
# Simple call
265-
code += f"\tdef {function_name}_produced_output = {function_name}()\n"
266-
267-
# Pass on the outputs
268-
code += "\n"
269248
code += "\tdef outputs = inputs.clone()\n"
270-
if self._find_children(task.task_id):
271-
counter = 0
272-
for f in task.output_files:
273-
code += f"\toutputs.{f.file_id} = {function_name}_produced_output.map" + "{it[" + str(counter) + "]}\n"
274-
counter += 1
249+
code += self._generate_task_call(
250+
task=task,
251+
function_name=function_name,
252+
inputs_var="inputs",
253+
results_var="outputs",
254+
include_comment=False,
255+
)
275256
code += "\treturn outputs\n"
276257
code += "}\n\n"
277258

@@ -481,43 +462,61 @@ def _generate_module_function(self, module_idx: int, tasks: List[Task]) -> str:
481462
# Call each task's process
482463
for task in tasks:
483464
function_name = task.task_id.replace(".", "_")
484-
code += self._generate_task_call_in_module(task, function_name)
465+
code += self._generate_task_call(
466+
task=task,
467+
function_name=function_name,
468+
inputs_var="results",
469+
results_var="results",
470+
include_comment=True,
471+
)
485472

486473
code += "\treturn results\n"
487474
code += "}\n\n"
488475

489476
return code
490477

491-
def _generate_task_call_in_module(self, task: Task, function_name: str) -> str:
478+
def _generate_task_call(self,
479+
task: Task,
480+
function_name: str,
481+
inputs_var: str,
482+
results_var: str,
483+
include_comment: bool) -> str:
492484
"""
493-
Generate the code to call a task's process within a module function.
485+
Generate the code to call a task's process and map outputs into a results map.
494486
495487
:param task: The task.
496488
:type task: Task
497489
:param function_name: The sanitized function name.
498490
:type function_name: str
491+
:param inputs_var: The variable name containing input channels.
492+
:type inputs_var: str
493+
:param results_var: The variable name to update with outputs.
494+
:type results_var: str
495+
:param include_comment: Whether to include a task comment.
496+
:type include_comment: bool
499497
:return: The code.
500498
:rtype: str
501499
"""
502500
code = ""
501+
has_parents = self._find_parents(task.task_id)
503502

504-
if self._find_parents(task.task_id):
505-
# Mix input channels and call process
506-
code += f"\t// Task: {task.task_id}\n"
503+
if include_comment:
504+
root_suffix = " (root)" if not has_parents else ""
505+
code += f"\t// Task: {task.task_id}{root_suffix}\n"
506+
507+
if has_parents:
507508
code += f"\tdef {function_name}_input = Channel.empty()\n"
508509
for f in task.input_files:
509-
code += f"\t{function_name}_input = {function_name}_input.mix(results.{f.file_id})\n"
510-
code += f"\tdef {function_name}_output = {function_name}({function_name}_input.collect())\n"
510+
code += f"\t{function_name}_input = {function_name}_input.mix({inputs_var}.{f.file_id})\n"
511+
code += f"\tdef {function_name}_input_future = {function_name}_input.collect()\n"
512+
code += f"\tdef {function_name}_output = {function_name}({function_name}_input_future)\n"
511513
else:
512-
# Root task - no inputs needed
513-
code += f"\t// Task: {task.task_id} (root)\n"
514514
code += f"\tdef {function_name}_output = {function_name}()\n"
515515

516-
# Update results map with outputs
517516
if self._find_children(task.task_id):
518517
counter = 0
519518
for f in task.output_files:
520-
code += f"\tresults.{f.file_id} = {function_name}_output.map{{it[{counter}]}}\n"
519+
code += f"\t{results_var}.{f.file_id} = {function_name}_output.map{{it[{counter}]}}\n"
521520
counter += 1
522521
code += "\n"
523522

@@ -530,49 +529,7 @@ def _generate_main_workflow(self) -> str:
530529
:return: The main workflow file content.
531530
:rtype: str
532531
"""
533-
# Start with the template header
534-
code = """params.simulate = false
535-
params.pwd = null
536-
params.help = null
537-
pwd = null
538-
539-
def printUsage(error_msg, exit_code) {
540-
def usage_string = \"\"\"
541-
Usage: nextflow run workflow.nf --pwd /path/to/directory [--simulate] [--help]
542-
543-
Required parameters:
544-
--pwd Working directory (where the workflow.nf file is located)
545-
546-
Optional parameters:
547-
--help Show this message and exit.
548-
--simulate Use a "sleep 1" for all tasks instead of the WfBench benchmark.
549-
\"\"\"
550-
if (error_msg) {
551-
def RED = '\\u001B[31m'
552-
def RESET = '\\u001B[0m'
553-
System.err.println \"${RED}Error: ${RESET}\" + error_msg
554-
}
555-
System.err.println usage_string
556-
exit exit_code
557-
}
558-
559-
def validateParams() {
560-
if (params.help) {
561-
printUsage(msg = \"\", exit_code=0)
562-
}
563-
if (params.pwd == null) {
564-
printUsage(msg = \"Missing required parameter: --pwd\", exit_code=1)
565-
}
566-
pwd = file(params.pwd).toAbsolutePath().toString()
567-
if (!file(pwd).exists()) {
568-
printUsage(msg = \"Directory not found: ${pwd}\", exit_code=1)
569-
}
570-
}
571-
572-
// Call validation at the start
573-
validateParams()
574-
575-
"""
532+
code = ""
576533

577534
# Include module functions (one per line to avoid long strings)
578535
code += "// Include module functions\n"
@@ -601,4 +558,4 @@ def validateParams() {
601558

602559
code += "}\n"
603560

604-
return code
561+
return self._merge_codelines("templates/nextflow/workflow.nf", code)

0 commit comments

Comments
 (0)