Skip to content

Commit 262e9af

Browse files
committed
Nextflow with Flowcept
1 parent 8d44fba commit 262e9af

2 files changed

Lines changed: 21 additions & 68 deletions

File tree

wfcommons/wfbench/translator/nextflow.py

Lines changed: 21 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,14 @@ class NextflowTranslator(Translator):
2727
:param logger: The logger where to log information/warning or errors (optional).
2828
:type logger: Logger
2929
"""
30-
3130
def __init__(self,
3231
workflow: Union[Workflow, pathlib.Path],
3332
logger: Optional[Logger] = None) -> None:
3433
"""Create an object of the translator."""
3534
super().__init__(workflow, logger)
3635

3736
self.script = ""
38-
39-
self._usage_string = """
40-
Usage: nextflow run workflow.nf --pwd /path/to/directory [--simulate] [--help]
41-
42-
Required parameters:
43-
--pwd Working directory (where the workflow.nf file is located)
44-
45-
Optional parameters:
46-
--help Show this message and exit.
47-
--simulate Use a "sleep 1" for all tasks instead of the WfBench benchmark.
48-
"""
49-
37+
self.out_files = set()
5038

5139
def translate(self, output_folder: pathlib.Path) -> None:
5240
"""
@@ -55,7 +43,6 @@ def translate(self, output_folder: pathlib.Path) -> None:
5543
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
5644
:type output_folder: pathlib.Path
5745
"""
58-
5946
# Create the output folder
6047
output_folder.mkdir(parents=True)
6148

@@ -73,7 +60,8 @@ def translate(self, output_folder: pathlib.Path) -> None:
7360

7461
# Create the Nextflow workflow script and file
7562
self._create_workflow_script(sorted_tasks)
76-
self._write_output_file(self.script, output_folder.joinpath("workflow.nf"))
63+
run_workflow_code = self._merge_codelines("templates/nextflow_templates/workflow.nf", self.script)
64+
self._write_output_file(run_workflow_code, output_folder.joinpath("workflow.nf"))
7765

7866
# Create the README file
7967
self._write_readme_file(output_folder)
@@ -86,10 +74,10 @@ def _create_workflow_script(self, tasks: list[Task]):
8674
8775
:param tasks: The (sorted) list of tasks.
8876
:type tasks: list[Task]
89-
"""
90-
91-
# Output the code for command-line argument processing
92-
self.script += self._generate_arg_parsing_code()
77+
"""
78+
# Add Flowcept code if enabled
79+
if self.workflow.workflow_id:
80+
self.script += self._generate_flowcept_code()
9381

9482
# Output the code for each task
9583
for task in tasks:
@@ -100,55 +88,21 @@ def _create_workflow_script(self, tasks: list[Task]):
10088

10189
return
10290

103-
def _generate_arg_parsing_code(self):
91+
def _generate_flowcept_code(self) -> str:
10492
"""
105-
Generate the code to parse command-line argument.
106-
10793
:return: The code.
10894
:rtype: str
10995
"""
110-
111-
code = r'''
112-
params.simulate = false
113-
params.pwd = null
114-
params.help = null
115-
pwd = null
116-
117-
def printUsage(error_msg, exit_code) {
118-
119-
def usage_string = """
120-
'''
121-
code += self._usage_string
122-
123-
code += r'''
124-
"""
125-
if (error_msg) {
126-
def RED = '\u001B[31m'
127-
def RESET = '\u001B[0m'
128-
System.err.println "${RED}Error: ${RESET}" + error_msg
129-
}
130-
System.err.println usage_string
131-
exit exit_code
132-
}
133-
134-
def validateParams() {
135-
if (params.help) {
136-
printUsage(msg = "", exit_code=0)
137-
}
138-
if (params.pwd == null) {
139-
printUsage(msg = "Missing required parameter: --pwd", exit_code=1)
140-
}
141-
pwd = file(params.pwd).toAbsolutePath().toString()
142-
if (!file(pwd).exists()) {
143-
printUsage(msg = "Directory not found: ${pwd}", exit_code=1)
144-
}
145-
}
146-
147-
// Call validation at the start
148-
validateParams()
149-
150-
'''
151-
return code
96+
out_files = ", ".join(f"'{item}'" for item in self.out_files)
97+
return "process flowcept(){\n" \
98+
" input:\n" \
99+
" output:\n" \
100+
" script:\n" \
101+
" \"\"\"\n" \
102+
" ${pwd}/bin/flowcept.py " \
103+
f"{self.workflow.name} {self.workflow.workflow_id} '\\[{out_files}\\]' \n" \
104+
" \"\"\"\n" \
105+
"}\n\n"
152106

153107
def _get_tasks_in_topological_order(self) -> List[Task]:
154108
"""
@@ -168,6 +122,9 @@ def _get_tasks_in_topological_order(self) -> List[Task]:
168122
if not all_children:
169123
break
170124
for potential_task in all_children:
125+
num_children = len(self.task_children[potential_task.task_id])
126+
if not num_children:
127+
self.out_files.add(potential_task.output_files[0])
171128
if all(parent in sorted_tasks for parent in self._find_parents(potential_task.task_id)):
172129
tasks_in_current_level.append(potential_task)
173130
levels[current_level] = tasks_in_current_level
@@ -353,6 +310,3 @@ def _write_readme_file(self, output_folder: pathlib.Path) -> None:
353310
out.write(f"Run the workflow in directory {str(output_folder)} using the following command:\n")
354311

355312
out.write(f"\tnextflow run ./workflow.nf --pwd `pwd`\n")
356-
out.write("\n")
357-
out.write(self._usage_string)
358-

wfcommons/wfbench/translator/templates/swift_t_templates/workflow.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ global const string flowcept =
1515
"""
1616
import logging
1717
import pathlib
18-
import subprocess
1918
import time
2019
from flowcept.flowcept_api.flowcept_controller import Flowcept
2120

0 commit comments

Comments
 (0)