Skip to content

Commit 14a8d70

Browse files
authored
Merge pull request #73 from wfcommons/flowcept_improvements
Enabling Swift/T support for Flowcept
2 parents 958eeed + 995a6fb commit 14a8d70

2 files changed

Lines changed: 72 additions & 23 deletions

File tree

wfcommons/wfbench/translator/swift_t.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,17 @@ def __init__(self,
3939
self.stress_path = stress_path
4040
self.categories_list = []
4141
self.categories_input = {}
42-
self.parsed_tasks = []
42+
self.parsed_tasks = set()
4343
self.files_map = {}
4444
self.tasks_map = {}
4545
self.cmd_counter = 1
46+
self.out_files = set()
4647

4748
# find applications
48-
self.apps = []
49+
self.apps = set()
4950
for task in self.tasks.values():
5051
self.tasks_map[task.task_id] = task.name
51-
52-
if task.name not in self.apps:
53-
self.apps.append(task.name)
52+
self.apps.add(task.name)
5453

5554
out_count = 0
5655
for file in task.output_files:
@@ -76,7 +75,7 @@ def translate(self, output_folder: pathlib.Path) -> None:
7675
in_count = 0
7776
self.output_folder = output_folder
7877
self.cpu_benchmark = output_folder.joinpath("./bin/cpu-benchmark").absolute()
79-
self.script = f"string fs = sprintf(flowcept_start, \"{self.workflow.workflow_id}\", \"{self.workflow.name}\");\nstring fss = python_persist(fs);\n\n" if self.workflow.workflow_id else ""
78+
self.script = f"string fs = sprintf(flowcept_start, \"{self.workflow.workflow_id}\");\nstring fss = python_persist(fs);\n\n" if self.workflow.workflow_id else ""
8079
self.script += "string root_in_files[];\n"
8180

8281
for task_name in self.root_task_names:
@@ -100,9 +99,11 @@ def translate(self, output_folder: pathlib.Path) -> None:
10099
for category in self.categories_list:
101100
self._add_tasks(category)
102101

103-
# flowcept stop
104-
# if self.workflow.workflow_id:
105-
# self.script += "string fss = sprintf(flowcept_stop);\npython_persist(fss);"
102+
# flowcept end
103+
if self.workflow.workflow_id:
104+
out_files = ", ".join(f"'{item}'" for item in self.out_files)
105+
self.script += f"string fc = sprintf(flowcept, \"{self.workflow.workflow_id}\", \"{self.workflow.name}\", \"{out_files}\");\n" \
106+
"python_persist(fc);\n"
106107

107108
run_workflow_code = self._merge_codelines("templates/swift_t_templates/workflow.swift", self.script)
108109

@@ -134,7 +135,7 @@ def _find_categories_list(self, task_name: str, parent_task: Optional[str] = Non
134135
if parent_task.name not in self.categories_list:
135136
return
136137

137-
self.parsed_tasks.append(task_name)
138+
self.parsed_tasks.add(task_name)
138139
category = self.tasks_map[task_name]
139140
if category not in self.categories_list:
140141
self.categories_list.append(category)
@@ -153,29 +154,28 @@ def _add_tasks(self, category: str) -> None:
153154
:type category: str
154155
"""
155156
num_tasks = 0
157+
num_children = 0
156158
input_files_cat = {}
157-
parsed_input_files = []
159+
parsed_input_files = set()
158160
self.script += f"int {category}__out[];\n"
159161

160-
for task_name in self.tasks:
161-
task = self.tasks[task_name]
162-
162+
for task in self.tasks.values():
163163
if task.name == category:
164164
# in/output files
165-
input_files = []
165+
num_children = len(self.task_children[task.task_id])
166+
input_files = set()
166167
prefix = ""
167168

168169
for file in task.output_files:
169-
out_file = file.file_id
170170
file_size = file.size
171171

172172
for file in task.input_files:
173173
cat_prefix = self.files_map[file.file_id].split("__out")[0]
174174
if file.file_id not in parsed_input_files:
175175
input_files_cat.setdefault(cat_prefix, 0)
176176
input_files_cat[cat_prefix] += 1
177-
parsed_input_files.append(file.file_id)
178-
input_files.append(self.files_map[file.file_id])
177+
parsed_input_files.add(file.file_id)
178+
input_files.add(self.files_map[file.file_id])
179179
if not prefix:
180180
prefix = cat_prefix
181181

@@ -222,10 +222,15 @@ def _add_tasks(self, category: str) -> None:
222222
f" string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \
223223
f" {category}__out[i] = string2int(of_{self.cmd_counter});\n" \
224224
"}\n\n"
225-
225+
if not num_children:
226+
for i in range(num_tasks):
227+
self.out_files.add(f"{self.output_folder.absolute()}/data/{category}_{i}_output.txt")
226228
else:
229+
out_file = f"{self.output_folder.absolute()}/data/{category}_0_output.txt"
230+
if not num_children:
231+
self.out_files.add(out_file)
227232
args = args.replace(
228-
", of", f", \"{self.output_folder.absolute()}/data/{category}_0_output.txt\"").replace("[i]", "[0]")
233+
", of", f", \"{out_file}\"").replace("[i]", "[0]")
229234
self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", \"{category}\", {args});\n" \
230235
f"string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \
231236
f"string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \

wfcommons/wfbench/translator/templates/swift_t_templates/workflow.swift

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,62 @@ import python;
44
import string;
55
import unix;
66

7-
global const string flowcept_start =
7+
global const string flowcept_start =
88
"""
9+
import time
910
workflow_id = "%s"
11+
time.sleep(30)
12+
""";
13+
14+
global const string flowcept =
15+
"""
16+
import logging
17+
import pathlib
18+
import subprocess
19+
import time
1020
from flowcept.flowcept_api.flowcept_controller import Flowcept
11-
flowcept_agent = Flowcept(workflow_id=workflow_id, workflow_name="%s", bundle_exec_id=workflow_id)
21+
22+
logging.basicConfig(
23+
level=logging.INFO,
24+
format="[WfBench][%%(asctime)s][%%(levelname)s] %%(message)s",
25+
datefmt="%%H:%%M:%%S",
26+
handlers=[logging.StreamHandler()]
27+
)
28+
29+
workflow_id = "%s"
30+
workflow_name = "%s"
31+
out_files = [%s]
32+
33+
logging.info("Flowcept Starting")
34+
flowcept_agent = Flowcept(workflow_id=workflow_id, workflow_name=workflow_name, bundle_exec_id=workflow_id)
1235
1336
try:
1437
flowcept_agent.start()
1538
except Exception:
1639
import traceback
1740
traceback.print_exc()
41+
42+
remaining_files = set(out_files)
43+
44+
while remaining_files:
45+
found_files = set()
46+
for f in remaining_files:
47+
if pathlib.Path(f).exists():
48+
found_files.add(f)
49+
remaining_files -= found_files
50+
if not remaining_files:
51+
break
52+
time.sleep(1)
53+
54+
time.sleep(180)
55+
try:
56+
flowcept_agent.stop()
57+
time.sleep(120)
58+
except Exception:
59+
import traceback
60+
traceback.print_exc()
61+
62+
logging.info("Flowcept Completed")
1863
""";
1964

2065
string command =
@@ -188,7 +233,6 @@ logging.info(f"Benchmark {task_name} completed!")
188233
if 'workflow_id':
189234
fc_task.end()
190235
fc.stop()
191-
time.sleep(1)
192236
""";
193237

194238
# Generated code goes here

0 commit comments

Comments
 (0)