Skip to content

Commit c6deea8

Browse files
committed
Merge branch 'main' into airflow-translator
2 parents 677384f + a84a6c2 commit c6deea8

12 files changed

Lines changed: 749 additions & 102 deletions

File tree

bin/wfbench

Lines changed: 499 additions & 0 deletions
Large diffs are not rendered by default.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ dependencies = [
3636
"scipy",
3737
"pyyaml",
3838
"pandas",
39+
"shortuuid",
3940
"stringcase",
4041
"filelock",
4142
"pathos",

wfcommons/wfbench/bench.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2021-2024 The WfCommons Team.
4+
# Copyright (c) 2021-2025 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
@@ -13,10 +13,9 @@
1313
import logging
1414
import os
1515
import pathlib
16-
import re
1716
import subprocess
1817
import time
19-
import uuid
18+
import shortuuid
2019
import sys
2120

2221
from logging import Logger
@@ -39,14 +38,17 @@ class WorkflowBenchmark:
3938
:type recipe: Type[WfChefWorkflowRecipe]
4039
:param num_tasks: Total number of tasks in the benchmark workflow.
4140
:type num_tasks: int
41+
:param with_flowcept:
42+
:type with_flowcept: bool
4243
:param logger: The logger where to log information/warning or errors.
4344
:type logger: Optional[Logger]
4445
"""
4546

4647
def __init__(self,
4748
recipe: Type[WfChefWorkflowRecipe],
4849
num_tasks: int,
49-
logger: Optional[Logger] = None, with_flowcept=False) -> None:
50+
with_flowcept: bool = False,
51+
logger: Optional[Logger] = None) -> None:
5052
"""Create an object that represents a workflow benchmark generator."""
5153
self.logger: Logger = logging.getLogger(
5254
__name__) if logger is None else logger
@@ -296,7 +298,7 @@ def create_benchmark(self,
296298
f"{self.workflow.name.lower()}-{self.num_tasks}").with_suffix(".json")
297299

298300
if self.with_flowcept:
299-
self.workflow.workflow_id = str(uuid.uuid4())
301+
self.workflow.workflow_id = str(shortuuid.uuid())
300302

301303
cores, lock = self._creating_lock_files(lock_files_folder)
302304
for task in self.workflow.tasks.values():

wfcommons/wfbench/translator/abstract_translator.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import os
1313
import pathlib
1414
import shutil
15-
15+
import textwrap
1616
from abc import ABC, abstractmethod
1717
from typing import Optional, Union
1818

@@ -183,4 +183,30 @@ def _merge_codelines(self, template_file_path: str, wf_codelines: str) -> str:
183183
with open(this_dir.joinpath(template_file_path)) as fp:
184184
run_workflow_code = fp.read()
185185
return run_workflow_code.replace("# Generated code goes here", wf_codelines)
186-
186+
187+
def _flowcept_init_python(self, workflow_id: str, workflow_name: str) -> str:
188+
"""
189+
190+
:param workflow_id:
191+
:type workflow_id: str
192+
193+
:param workflow_name:
194+
:type workflow_name: str
195+
196+
:return:
197+
:rtype: str
198+
"""
199+
code = textwrap.dedent(f"""
200+
from flowcept.flowcept_api.flowcept_controller import Flowcept
201+
flowcept_agent = Flowcept(workflow_id="{workflow_id}", workflow_name="{workflow_name}", bundle_exec_id="{workflow_id}")
202+
flowcept_agent.start()
203+
""")
204+
return code
205+
206+
def _flowcept_stop_python(self) -> str:
207+
"""
208+
209+
:return:
210+
:rtype: str
211+
"""
212+
return "flowcept_agent.stop()"

wfcommons/wfbench/translator/bash.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,14 @@ def _bash_wftasks_codelines(self) -> None:
100100
if a.startswith("--output-files"):
101101
flag, output_files_dict = a.split(" ", 1)
102102
output_files_dict = {f"data/{key}": value for key, value in ast.literal_eval(output_files_dict).items()}
103-
a = f"{flag} '{json.dumps(output_files_dict).replace('"', '\\"')}'"
103+
output_files_dict = json.dumps(output_files_dict).replace('"', '\\"')
104+
a = f"{flag} '{output_files_dict}'"
104105

105106
if a.startswith("--input-files"):
106107
flag, input_files_arr = a.split(" ", 1)
107108
input_files_arr = [f"data/{file}" for file in ast.literal_eval(input_files_arr)]
108-
a = f"{flag} '{json.dumps(input_files_arr).replace('"', '\\"')}'"
109+
input_files_arr = json.dumps(input_files_arr).replace('"', '\\"')
110+
a = f"{flag} '{input_files_arr}'"
109111

110112
args.append(a)
111113

wfcommons/wfbench/translator/cwl.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,13 @@ def _parse_steps(self) -> None:
107107
if a.startswith("--output-files"):
108108
flag, output_files_dict = a.split(" ", 1)
109109
output_files_dict = {f"{key}": value for key, value in ast.literal_eval(output_files_dict).items()}
110-
a = f"{flag} '{json.dumps(output_files_dict).replace('"', '\\"')}'"
110+
output_files_dict = json.dumps(output_files_dict).replace('"', '\\"')
111+
a = f"{flag} '{output_files_dict}'"
111112
if a.startswith("--input-files"):
112113
flag, input_files_arr = a.split(" ", 1)
113114
input_files_arr = [f"{file}" for file in ast.literal_eval(input_files_arr)]
114-
a = f"{flag} '{json.dumps(input_files_arr).replace('"', '\\"')}'"
115+
input_files_arr = json.dumps(input_files_arr).replace('"', '\\"')
116+
a = f"{flag} '{input_files_arr}'"
115117
args_array.append(a)
116118

117119

wfcommons/wfbench/translator/parsl.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,13 @@ def _parsl_wftasks_codelines(self) -> None:
103103
for a in task.args:
104104
if a.startswith("--output-files"):
105105
flag, output_files_dict = a.split(" ", 1)
106-
output_files_dict = ast.literal_eval(output_files_dict)
107-
a = f"{flag} '{json.dumps(output_files_dict).replace('"', '\\"')}'"
106+
output_files_dict = json.dumps(ast.literal_eval(output_files_dict)).replace('"', '\\"')
107+
a = f"{flag} '{output_files_dict}'"
108108

109109
if a.startswith("--input-files"):
110110
flag, input_files_arr = a.split(" ", 1)
111-
input_files_arr = ast.literal_eval(input_files_arr)
112-
a = f"{flag} '{json.dumps(input_files_arr).replace('"', '\\"')}'"
111+
input_files_arr = json.dumps(ast.literal_eval(input_files_arr)).replace('"', '\\"')
112+
a = f"{flag} '{input_files_arr}'"
113113
args.append(a)
114114

115115
args = " ".join(args)

wfcommons/wfbench/translator/pycompss.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,6 @@ def translate(self, output_folder: pathlib.Path) -> None:
4747
"""
4848
self.output_folder = output_folder
4949
self.script = ""
50-
# IMPORT Flowcept
51-
if self.workflow.workflow_id is not None:
52-
self.script += "from flowcept.flowcept_api.flowcept_controller import Flowcept\n\n"
5350

5451
# PyCOMPSs translator
5552
self.script += "\n# workflow tasks\n"
@@ -232,10 +229,10 @@ def _pycompss_code(self) -> None:
232229
self.script += f"\n\nif __name__ == \"__main__\":\n"
233230
# START Flowcept
234231
if self.workflow.workflow_id is not None:
235-
self.script += f"\tf = Flowcept(workflow_id='{self.workflow.workflow_id}', workflow_name='{self.workflow.name}', bundle_exec_id='{self.workflow.workflow_id}')\n"
236-
self.script += "\tf.start()\n"
232+
flowcept_init_code = self._flowcept_init_python(self.workflow.workflow_id, self.workflow.name)
233+
self.script += "".join("\t" + line + "\n" for line in flowcept_init_code.splitlines())
237234
# main
238235
self.script += f"\tmain_program()\n"
239236
# STOP Flowcept
240237
if self.workflow.workflow_id is not None:
241-
self.script += "\tf.stop()\n"
238+
self.script += f"\t{self._flowcept_stop_python()}\n"

wfcommons/wfbench/translator/swift_t.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,18 @@ def translate(self, output_folder: pathlib.Path) -> None:
7474
# defining input files
7575
self.logger.debug("Defining input files")
7676
in_count = 0
77-
self.script = f"string root_in_files[];\n"
77+
self.output_folder = output_folder
78+
self.cpu_benchmark = output_folder.joinpath("./bin/cpu-benchmark").absolute()
79+
self.script = f"string fs = sprintf(flowcept_start, \"{self.workflow.workflow_id}\", \"{self.workflow.name}\");\nstring fss = python_persist(fs);\n\n" if self.workflow.workflow_id else ""
80+
self.script += "string root_in_files[];\n"
7881

7982
for task_name in self.root_task_names:
8083
task = self.tasks[task_name]
8184
for file in task.input_files:
8285
if task.name not in self.categories_input.keys():
8386
self.categories_input[task.name] = in_count
84-
self.script += f"root_in_files[{in_count}] = \"{file.file_id}\";\n"
87+
in_file = output_folder.joinpath(f"./data/{file.file_id}").absolute()
88+
self.script += f"root_in_files[{in_count}] = \"{in_file}\";\n"
8589
in_count += 1
8690
self.files_map[file.file_id] = f"ins[{in_count}]"
8791

@@ -96,6 +100,10 @@ def translate(self, output_folder: pathlib.Path) -> None:
96100
for category in self.categories_list:
97101
self._add_tasks(category)
98102

103+
# flowcept stop
104+
# if self.workflow.workflow_id:
105+
# self.script += "string fss = sprintf(flowcept_stop);\npython_persist(fss);"
106+
99107
run_workflow_code = self._merge_codelines("templates/swift_t_templates/workflow.swift", self.script)
100108

101109
# write benchmark files
@@ -197,27 +205,28 @@ def _add_tasks(self, category: str) -> None:
197205
num_tasks += 1
198206

199207
cats = " + ".join(f"{k}__out[{v - 1}]" for k, v in input_files_cat.items())
200-
in_str = ", ".join(f"{k}__{v}" for k, v in input_files_cat.items())
208+
in_str = ", ".join(f"{k}_{v - 1}_output.txt" for k, v in input_files_cat.items())
201209
if "ins[" in cats:
202210
cats = "0"
203211
in_str = ""
204212
self.script += f"int dep_{self.cmd_counter} = {cats};\n"
205213
args += f", dep_{self.cmd_counter}"
206-
self.script += f"string {category}_in = \"{in_str}\";\n"
214+
args += f", \"{self.workflow.workflow_id}\", fss" if self.workflow.workflow_id else ", \"\""
215+
self.script += f"string {category}_in = \"{self.output_folder.absolute()}/data/{in_str}\";\n"
207216

208217
if num_tasks > 1:
209218
self.script += f"foreach i in [0:{num_tasks - 1}] {{\n" \
210-
f" string of = sprintf(\"{category}_%i_output.txt\", i);\n" \
211-
f" string cmd_{self.cmd_counter} = sprintf(command, \"{category}\", {args});\n" \
219+
f" string of = sprintf(\"{self.output_folder.absolute()}/data/{category}_%i_output.txt\", i);\n" \
220+
f" string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", \"{category}\", {args});\n" \
212221
f" string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \
213222
f" string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \
214223
f" {category}__out[i] = string2int(of_{self.cmd_counter});\n" \
215224
"}\n\n"
216225

217226
else:
218227
args = args.replace(
219-
", of", f", \"{category}_0_output.txt\"").replace("[i]", "[0]")
220-
self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{category}\", {args});\n" \
228+
", of", f", \"{self.output_folder.absolute()}/data/{category}_0_output.txt\"").replace("[i]", "[0]")
229+
self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", \"{category}\", {args});\n" \
221230
f"string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \
222231
f"string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \
223232
f"{category}__out[0] = string2int(of_{self.cmd_counter});\n\n"

wfcommons/wfbench/translator/taskvine.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,18 @@ def translate(self, output_folder: pathlib.Path) -> None:
5151
self.next_level = self.root_task_names.copy()
5252
while self.next_level:
5353
self.next_level = self._add_level_tasks(self.next_level)
54-
self.script += "wait_for_tasks_completion()\n\n"
54+
self.script += "wait_for_tasks_completion()\n"
5555

5656
# generate code
5757
run_workflow_code = self._merge_codelines("templates/taskvine_template.py", self.script)
58-
58+
59+
# generate Flowcept code
60+
if self.workflow.workflow_id is not None:
61+
run_workflow_code = run_workflow_code.replace("# FLOWCEPT_INIT",
62+
self._flowcept_init_python(self.workflow.workflow_id,
63+
self.workflow.name))
64+
run_workflow_code = run_workflow_code.replace("# FLOWCEPT_END", self._flowcept_stop_python())
65+
5966
# write benchmark files
6067
output_folder.mkdir(parents=True)
6168
with open(output_folder.joinpath("taskvine_workflow.py"), "w") as fp:

0 commit comments

Comments
 (0)