Skip to content

Commit bb2e267

Browse files
authored
Merge pull request #83 from wfcommons/nextflow_improvements
Nextflow with Flowcept
2 parents 17f4ee5 + 06d83cf commit bb2e267

7 files changed

Lines changed: 147 additions & 87 deletions

File tree

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2021-2024 The WfCommons Team.
4+
# Copyright (c) 2021-2025 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

1111
from .airflow import AirflowTranslator
12+
from .bash import BashTranslator
13+
from .cwl import CWLTranslator
1214
from .dask import DaskTranslator
1315
from .nextflow import NextflowTranslator
1416
from .parsl import ParslTranslator
1517
from .pegasus import PegasusTranslator
18+
from .pycompss import PyCompssTranslator
1619
from .swift_t import SwiftTTranslator
1720
from .taskvine import TaskVineTranslator
18-
from .cwl import CWLTranslator
19-
from .bash import BashTranslator
20-
from .pycompss import PyCompssTranslator

wfcommons/wfbench/translator/cwl.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2024 The WfCommons Team.
4+
# Copyright (c) 2024-2025 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
@@ -21,6 +21,7 @@
2121

2222
this_dir = pathlib.Path(__file__).resolve().parent
2323

24+
2425
class CWLTranslator(Translator):
2526
"""
2627
A WfFormat parser for creating CWL workflow benchmarks.
@@ -222,9 +223,9 @@ def _write_cwl_files(self, output_folder: pathlib.Path) -> None:
222223

223224
clt_folder = cwl_folder.joinpath("clt")
224225
clt_folder.mkdir(exist_ok=True)
225-
shutil.copy(this_dir.joinpath("templates/cwl_templates/wfbench.cwl"), clt_folder)
226-
shutil.copy(this_dir.joinpath("templates/cwl_templates/folder.cwl"), clt_folder)
227-
shutil.copy(this_dir.joinpath("templates/cwl_templates/shell.cwl"), clt_folder)
226+
shutil.copy(this_dir.joinpath("templates/cwl/wfbench.cwl"), clt_folder)
227+
shutil.copy(this_dir.joinpath("templates/cwl/folder.cwl"), clt_folder)
228+
shutil.copy(this_dir.joinpath("templates/cwl/shell.cwl"), clt_folder)
228229

229230
with open(cwl_folder.joinpath("main.cwl"), "w", encoding="utf-8") as f:
230231
f.write("\n".join(self.cwl_script))

wfcommons/wfbench/translator/nextflow.py

Lines changed: 39 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
# (at your option) any later version.
1010

1111
import pathlib
12+
import shutil
1213

1314
from logging import Logger
1415
from typing import List, Optional, Union
@@ -17,6 +18,8 @@
1718
from ...common import Workflow
1819
from ...common.task import Task
1920

21+
this_dir = pathlib.Path(__file__).resolve().parent
22+
2023

2124
class NextflowTranslator(Translator):
2225
"""
@@ -27,26 +30,14 @@ class NextflowTranslator(Translator):
2730
:param logger: The logger where to log information/warning or errors (optional).
2831
:type logger: Logger
2932
"""
30-
3133
def __init__(self,
3234
workflow: Union[Workflow, pathlib.Path],
3335
logger: Optional[Logger] = None) -> None:
3436
"""Create an object of the translator."""
3537
super().__init__(workflow, logger)
3638

3739
self.script = ""
38-
39-
self._usage_string = """
40-
Usage: nextflow run workflow.nf --pwd /path/to/directory [--simulate] [--help]
41-
42-
Required parameters:
43-
--pwd Working directory (where the workflow.nf file is located)
44-
45-
Optional parameters:
46-
--help Show this message and exit.
47-
--simulate Use a "sleep 1" for all tasks instead of the WfBench benchmark.
48-
"""
49-
40+
self.out_files = set()
5041

5142
def translate(self, output_folder: pathlib.Path) -> None:
5243
"""
@@ -55,25 +46,29 @@ def translate(self, output_folder: pathlib.Path) -> None:
5546
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
5647
:type output_folder: pathlib.Path
5748
"""
58-
5949
# Create the output folder
60-
output_folder.mkdir(parents=True)
50+
self.output_folder = output_folder
51+
self.output_folder.mkdir(parents=True)
6152

6253
# Create benchmark files
6354
self._copy_binary_files(output_folder)
6455
self._generate_input_files(output_folder)
56+
57+
if self.workflow.workflow_id:
58+
shutil.copy(this_dir.joinpath("templates/flowcept_agent.py"), output_folder.joinpath("bin"))
6559

6660
# Create a topological order of the tasks
6761
sorted_tasks = self._get_tasks_in_topological_order()
6862
# print([t.task_id for t in sorted_tasks])
6963

7064
# Create the bash script for each task
7165
for task in sorted_tasks:
72-
self._create_task_script(output_folder, task)
66+
self._create_task_script(task)
7367

7468
# Create the Nextflow workflow script and file
7569
self._create_workflow_script(sorted_tasks)
76-
self._write_output_file(self.script, output_folder.joinpath("workflow.nf"))
70+
run_workflow_code = self._merge_codelines("templates/nextflow/workflow.nf", self.script)
71+
self._write_output_file(run_workflow_code, output_folder.joinpath("workflow.nf"))
7772

7873
# Create the README file
7974
self._write_readme_file(output_folder)
@@ -86,10 +81,10 @@ def _create_workflow_script(self, tasks: list[Task]):
8681
8782
:param tasks: The (sorted) list of tasks.
8883
:type tasks: list[Task]
89-
"""
90-
91-
# Output the code for command-line argument processing
92-
self.script += self._generate_arg_parsing_code()
84+
"""
85+
# Add Flowcept code if enabled
86+
if self.workflow.workflow_id:
87+
self.script += self._generate_flowcept_code()
9388

9489
# Output the code for each task
9590
for task in tasks:
@@ -100,60 +95,29 @@ def _create_workflow_script(self, tasks: list[Task]):
10095

10196
return
10297

103-
def _generate_arg_parsing_code(self):
98+
def _generate_flowcept_code(self) -> str:
10499
"""
105-
Generate the code to parse command-line argument.
106100
107101
:return: The code.
108102
:rtype: str
109103
"""
110-
111-
code = r'''
112-
params.simulate = false
113-
params.pwd = null
114-
params.help = null
115-
pwd = null
116-
117-
def printUsage(error_msg, exit_code) {
118-
119-
def usage_string = """
120-
'''
121-
code += self._usage_string
122-
123-
code += r'''
124-
"""
125-
if (error_msg) {
126-
def RED = '\u001B[31m'
127-
def RESET = '\u001B[0m'
128-
System.err.println "${RED}Error: ${RESET}" + error_msg
129-
}
130-
System.err.println usage_string
131-
exit exit_code
132-
}
133-
134-
def validateParams() {
135-
if (params.help) {
136-
printUsage(msg = "", exit_code=0)
137-
}
138-
if (params.pwd == null) {
139-
printUsage(msg = "Missing required parameter: --pwd", exit_code=1)
140-
}
141-
pwd = file(params.pwd).toAbsolutePath().toString()
142-
if (!file(pwd).exists()) {
143-
printUsage(msg = "Directory not found: ${pwd}", exit_code=1)
144-
}
145-
}
146-
147-
// Call validation at the start
148-
validateParams()
149-
150-
'''
151-
return code
104+
out_files = ", ".join(f"\"{item}\"" for item in self.out_files)
105+
return "process flowcept(){\n" \
106+
" input:\n" \
107+
" output:\n" \
108+
" script:\n" \
109+
" \"\"\"\n" \
110+
" ${pwd}/bin/flowcept_agent.py " \
111+
f"{self.workflow.name} {self.workflow.workflow_id} '[{out_files}]' \n" \
112+
" \"\"\"\n" \
113+
"}\n\n"
152114

153115
def _get_tasks_in_topological_order(self) -> List[Task]:
154116
"""
155117
Sort the workflow tasks in topological order.
156118
119+
:param output_folder: The path to the output folder.
120+
:type output_folder: pathlib.Path
157121
:return: A sorted list of tasks.
158122
:rtype: List[Task]
159123
"""
@@ -168,21 +132,20 @@ def _get_tasks_in_topological_order(self) -> List[Task]:
168132
if not all_children:
169133
break
170134
for potential_task in all_children:
135+
num_children = len(self.task_children[potential_task.task_id])
136+
if not num_children:
137+
self.out_files.add(f"{self.output_folder.absolute()}/{potential_task.output_files[0]}")
171138
if all(parent in sorted_tasks for parent in self._find_parents(potential_task.task_id)):
172139
tasks_in_current_level.append(potential_task)
173140
levels[current_level] = tasks_in_current_level
174141
sorted_tasks += tasks_in_current_level
175142
current_level += 1
176143
return sorted_tasks
177144

178-
179-
@staticmethod
180-
def _create_task_script(output_folder: pathlib.Path, task: Task):
145+
def _create_task_script(self, task: Task):
181146
"""
182147
Generate the bash script for invoking a task.
183148
184-
:param output_folder: The path to the output folder.
185-
:type output_folder: pathlib.Path
186149
:param task: The task.
187150
:type task: Task
188151
:return: The code.
@@ -194,16 +157,16 @@ def _create_task_script(output_folder: pathlib.Path, task: Task):
194157
# Generate input spec
195158
input_spec = "'\\["
196159
for f in task.input_files:
197-
input_spec += f"\"{output_folder.resolve()}/data/{f.file_id}\","
160+
input_spec += f"\"{self.output_folder.resolve()}/data/{f.file_id}\","
198161
input_spec = input_spec[:-1] + "\\]'"
199162

200163
# Generate output spec
201164
output_spec = "'\\{"
202165
for f in task.output_files:
203-
output_spec += f"\"{output_folder.resolve()}/data/{f.file_id}\":{str(f.size)},"
166+
output_spec += f"\"{self.output_folder.resolve()}/data/{f.file_id}\":{str(f.size)},"
204167
output_spec = output_spec[:-1] + "\\}'"
205168

206-
code += f"{output_folder.resolve()}/bin/{task.program} "
169+
code += f"{self.output_folder.resolve()}/bin/{task.program} "
207170

208171
for a in task.args:
209172
if "--output-files" in a:
@@ -214,7 +177,7 @@ def _create_task_script(output_folder: pathlib.Path, task: Task):
214177
code += f"{a} "
215178
code += "\n"
216179

217-
script_file_path = output_folder.joinpath(f"bin/script_{task.task_id}.sh")
180+
script_file_path = self.output_folder.joinpath(f"bin/script_{task.task_id}.sh")
218181
with open(script_file_path, "w") as out:
219182
out.write(code)
220183

@@ -296,6 +259,8 @@ def _generate_workflow_code(self, sorted_tasks: List[Task]) -> str:
296259

297260
# Generate workflow function
298261
code += "workflow {\n"
262+
if self.workflow.workflow_id:
263+
code += "\tflowcept()\n"
299264
code += "\tresults = bootstrap()\n"
300265
for task in sorted_tasks:
301266
function_name = task.task_id.replace(".", "_")
@@ -353,6 +318,3 @@ def _write_readme_file(self, output_folder: pathlib.Path) -> None:
353318
out.write(f"Run the workflow in directory {str(output_folder)} using the following command:\n")
354319

355320
out.write(f"\tnextflow run ./workflow.nf --pwd `pwd`\n")
356-
out.write("\n")
357-
out.write(self._usage_string)
358-

wfcommons/wfbench/translator/swift_t.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def translate(self, output_folder: pathlib.Path) -> None:
105105
self.script += f"string fc = sprintf(flowcept, \"{self.workflow.workflow_id}\", \"{self.workflow.name}\", \"{out_files}\");\n" \
106106
"python_persist(fc);\n"
107107

108-
run_workflow_code = self._merge_codelines("templates/swift_t_templates/workflow.swift", self.script)
108+
run_workflow_code = self._merge_codelines("templates/swift_t/workflow.swift", self.script)
109109

110110
# write benchmark files
111111
output_folder.mkdir(parents=True)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2025 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import ast
12+
import logging
13+
import pathlib
14+
import sys
15+
import time
16+
from flowcept.flowcept_api.flowcept_controller import Flowcept
17+
18+
logging.basicConfig(
19+
level=logging.INFO,
20+
format="[WfBench][%%(asctime)s][%%(levelname)s] %%(message)s",
21+
datefmt="%%H:%%M:%%S",
22+
handlers=[logging.StreamHandler()]
23+
)
24+
25+
workflow_name = sys.argv[1]
26+
workflow_id = sys.argv[2]
27+
out_files = ast.literal_eval(sys.argv[3])
28+
29+
logging.info("Flowcept Starting")
30+
flowcept_agent = Flowcept(workflow_id=workflow_id, workflow_name=workflow_name, bundle_exec_id=workflow_id, start_persistence=False, save_workflow=True)
31+
32+
try:
33+
flowcept_agent.start()
34+
except Exception:
35+
import traceback
36+
traceback.print_exc()
37+
38+
remaining_files = set(out_files)
39+
40+
while remaining_files:
41+
found_files = set()
42+
for f in remaining_files:
43+
if pathlib.Path(f).exists():
44+
found_files.add(f)
45+
remaining_files -= found_files
46+
if not remaining_files:
47+
break
48+
time.sleep(1)
49+
50+
try:
51+
flowcept_agent.stop()
52+
except Exception:
53+
import traceback
54+
traceback.print_exc()
55+
56+
logging.info("Flowcept Completed")

0 commit comments

Comments
 (0)