Skip to content

Commit a84a6c2

Browse files
authored
Merge pull request #71 from wfcommons/flowcept_improvements
Flowcept support for Swift/T
2 parents a1215d5 + c2826c6 commit a84a6c2

3 files changed

Lines changed: 181 additions & 80 deletions

File tree

bin/wfbench

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ def get_parser() -> argparse.ArgumentParser:
334334

335335

336336
def begin_flowcept(args):
337-
print("Running with Flowcept.")
337+
log_info("Running with Flowcept.")
338338
from flowcept import Flowcept, FlowceptTask
339339
# TODO: parametrize to allow storing individual tasks
340340
f = Flowcept(workflow_id=args.workflow_id,

wfcommons/wfbench/translator/swift_t.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,18 @@ def translate(self, output_folder: pathlib.Path) -> None:
7474
# defining input files
7575
self.logger.debug("Defining input files")
7676
in_count = 0
77-
self.script = f"string root_in_files[];\n"
77+
self.output_folder = output_folder
78+
self.cpu_benchmark = output_folder.joinpath("./bin/cpu-benchmark").absolute()
79+
self.script = f"string fs = sprintf(flowcept_start, \"{self.workflow.workflow_id}\", \"{self.workflow.name}\");\nstring fss = python_persist(fs);\n\n" if self.workflow.workflow_id else ""
80+
self.script += "string root_in_files[];\n"
7881

7982
for task_name in self.root_task_names:
8083
task = self.tasks[task_name]
8184
for file in task.input_files:
8285
if task.name not in self.categories_input.keys():
8386
self.categories_input[task.name] = in_count
84-
self.script += f"root_in_files[{in_count}] = \"{file.file_id}\";\n"
87+
in_file = output_folder.joinpath(f"./data/{file.file_id}").absolute()
88+
self.script += f"root_in_files[{in_count}] = \"{in_file}\";\n"
8589
in_count += 1
8690
self.files_map[file.file_id] = f"ins[{in_count}]"
8791

@@ -96,6 +100,10 @@ def translate(self, output_folder: pathlib.Path) -> None:
96100
for category in self.categories_list:
97101
self._add_tasks(category)
98102

103+
# flowcept stop
104+
# if self.workflow.workflow_id:
105+
# self.script += "string fss = sprintf(flowcept_stop);\npython_persist(fss);"
106+
99107
run_workflow_code = self._merge_codelines("templates/swift_t_templates/workflow.swift", self.script)
100108

101109
# write benchmark files
@@ -197,27 +205,28 @@ def _add_tasks(self, category: str) -> None:
197205
num_tasks += 1
198206

199207
cats = " + ".join(f"{k}__out[{v - 1}]" for k, v in input_files_cat.items())
200-
in_str = ", ".join(f"{k}__{v}" for k, v in input_files_cat.items())
208+
in_str = ", ".join(f"{k}_{v - 1}_output.txt" for k, v in input_files_cat.items())
201209
if "ins[" in cats:
202210
cats = "0"
203211
in_str = ""
204212
self.script += f"int dep_{self.cmd_counter} = {cats};\n"
205213
args += f", dep_{self.cmd_counter}"
206-
self.script += f"string {category}_in = \"{in_str}\";\n"
214+
args += f", \"{self.workflow.workflow_id}\", fss" if self.workflow.workflow_id else ", \"\""
215+
self.script += f"string {category}_in = \"{self.output_folder.absolute()}/data/{in_str}\";\n"
207216

208217
if num_tasks > 1:
209218
self.script += f"foreach i in [0:{num_tasks - 1}] {{\n" \
210-
f" string of = sprintf(\"{category}_%i_output.txt\", i);\n" \
211-
f" string cmd_{self.cmd_counter} = sprintf(command, \"{category}\", {args});\n" \
219+
f" string of = sprintf(\"{self.output_folder.absolute()}/data/{category}_%i_output.txt\", i);\n" \
220+
f" string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", \"{category}\", {args});\n" \
212221
f" string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \
213222
f" string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \
214223
f" {category}__out[i] = string2int(of_{self.cmd_counter});\n" \
215224
"}\n\n"
216225

217226
else:
218227
args = args.replace(
219-
", of", f", \"{category}_0_output.txt\"").replace("[i]", "[0]")
220-
self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{category}\", {args});\n" \
228+
", of", f", \"{self.output_folder.absolute()}/data/{category}_0_output.txt\"").replace("[i]", "[0]")
229+
self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", \"{category}\", {args});\n" \
221230
f"string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \
222231
f"string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \
223232
f"{category}__out[0] = string2int(of_{self.cmd_counter});\n\n"

wfcommons/wfbench/translator/templates/swift_t_templates/workflow.swift

Lines changed: 163 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -4,99 +4,191 @@ import python;
44
import string;
55
import unix;
66

7+
global const string flowcept_start =
8+
"""
9+
workflow_id = "%s"
10+
from flowcept.flowcept_api.flowcept_controller import Flowcept
11+
flowcept_agent = Flowcept(workflow_id=workflow_id, workflow_name="%s", bundle_exec_id=workflow_id)
12+
13+
try:
14+
flowcept_agent.start()
15+
except Exception:
16+
import traceback
17+
traceback.print_exc()
18+
""";
19+
720
string command =
821
"""
22+
import logging
923
import os
1024
import pathlib
25+
import signal
1126
import socket
1227
import subprocess
1328
import time
29+
from pathos.helpers import mp as multiprocessing
1430
15-
this_dir = pathlib.Path(".").absolute()
31+
logging.basicConfig(
32+
level=logging.INFO,
33+
format="[WfBench][%%(asctime)s][%%(levelname)s] %%(message)s",
34+
datefmt="%%H:%%M:%%S",
35+
handlers=[logging.StreamHandler()]
36+
)
1637
38+
cpu_benchmark = "%s"
1739
task_name = "%s"
18-
files_list = "%s"
40+
files_list = ["%s"]
1941
gpu_work = int(%i)
42+
cpu_work = int(%i)
43+
cpu_threads = int(10 * %f)
44+
output_data = {"%s": int(%i)}
45+
dep = %i
46+
workflow_id = "%s"
2047
21-
print(f"[WfBench] [{task_name}] Starting Benchmark on {socket.gethostname()}", flush=True)
22-
23-
print(f"[WfBench] [{task_name}] Starting IO Read Benchmark...", flush=True)
24-
if "__" not in files_list:
25-
with open(this_dir.joinpath(f"./data/{files_list}"), "rb") as fp:
26-
start = time.perf_counter()
27-
print(f"[WfBench] Reading '{files_list}'", flush=True)
28-
fp.readlines()
29-
end = time.perf_counter()
30-
data_size = this_dir.joinpath(f"./data/{files_list}").stat().st_size
31-
print(f"[WfBench] [{task_name}] Metrics (read) [time,size]: {end - start},{data_size}", flush=True)
32-
else:
33-
files = files_list.split(", ")
34-
for file in files:
35-
counter = 0
36-
fd = file.split("__")
37-
start = time.perf_counter()
38-
file_size = 0
39-
for f in this_dir.glob(f"./data/{fd[0]}_*_output.txt"):
40-
if counter >= int(fd[1]):
41-
break
42-
file_size += os.stat(f).st_size
43-
with open(f, "rb") as fp:
44-
print(f"[WfBench] Reading '{f}'", flush=True)
45-
fp.readlines()
46-
counter += 1
47-
end = time.perf_counter()
48-
print(f"[WfBench] [{task_name}] Metrics (read) [time,size]: {end - start},{file_size}", flush=True)
49-
print(f"[WfBench] [{task_name}] Completed IO Read Benchmark", flush=True)
50-
51-
if gpu_work > 0:
52-
print(f"[WfBench] [{task_name}] Starting GPU Benchmark...", flush=True)
53-
gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID {this_dir.joinpath('./bin/gpu-benchmark')} {gpu_work}"]
54-
start = time.perf_counter()
55-
gpu_proc = subprocess.Popen(gpu_prog, shell=True)
56-
gpu_proc.wait()
57-
end = time.perf_counter()
58-
print(f"[WfBench] [{task_name}] Metrics (compute-gpu) [time,work]: {end - start},{gpu_work}", flush=True)
48+
if 'workflow_id':
49+
logging.info("Running with Flowcept.")
50+
from flowcept import Flowcept, FlowceptTask
51+
fc = Flowcept(workflow_id=workflow_id,
52+
bundle_exec_id=workflow_id,
53+
start_persistence=False, save_workflow=False)
54+
fc.start()
55+
fc_task = FlowceptTask(workflow_id=workflow_id, used={
56+
'workflow_id': workflow_id
57+
})
58+
59+
logging.info(f"Starting {task_name} Benchmark on {socket.gethostname()}")
60+
61+
procs = []
62+
cpu_queue = multiprocessing.Queue()
63+
logging.debug(f"Working directory: {os.getcwd()}")
64+
65+
logging.debug("Starting IO benchmark...")
66+
io_proc = None
67+
termination_event = multiprocessing.Event()
68+
69+
io_proc = multiprocessing.Process(
70+
target=lambda inputs=files_list, outputs=output_data, cpu_queue=cpu_queue,
71+
termination_event=termination_event: (
72+
memory_limit := 10 * 1024 * 1024,
73+
[open(name, "wb").close() for name in outputs],
74+
io_completed := 0,
75+
bytes_read := {name: 0 for name in inputs},
76+
bytes_written := {name: 0 for name in outputs},
77+
input_sizes := {name: __import__("os").path.getsize(name) for name in inputs},
78+
[
79+
(
80+
cpu_percent := cpu_queue.get(timeout=1.0),
81+
should_exit := termination_event.is_set(),
82+
(
83+
while_loop_var := True,
84+
[
85+
(
86+
new_val := (
87+
cpu_queue.get(timeout = 1.0)
88+
if not cpu_queue.empty() else None
89+
),
90+
cpu_percent := (
91+
max(cpu_percent, new_val)
92+
if new_val is not None else cpu_percent
93+
),
94+
while_loop_var := (
95+
new_val is not None and not cpu_queue.empty()
96+
)
97+
)
98+
for _ in range(100) if while_loop_var
99+
],
100+
bytes_to_read := {
101+
name: max(0, int(size * (cpu_percent / 100) - bytes_read[name]))
102+
for name, size in input_sizes.items()
103+
},
104+
bytes_to_write := {
105+
name: max(0, int(size * (cpu_percent / 100) - bytes_written[name]))
106+
for name, size in outputs.items()
107+
},
108+
logging.debug("Starting IO Read Benchmark..."),
109+
in_file := list(bytes_to_read.keys())[0],
110+
in_size := list(bytes_to_read.values())[0],
111+
open(in_file, "rb").read(int(in_size)),
112+
logging.debug("Completed IO Read Benchmark!"),
113+
out_file := list(output_data.keys())[0],
114+
out_size := list(output_data.values())[0],
115+
logging.debug(f"Writing output file '{out_file}'"),
116+
open(out_file, "ab").write(__import__("os").urandom(int(out_size))),
117+
bytes_read.update({
118+
name: bytes_read[name] + bytes_to_read[name]
119+
for name in bytes_to_read
120+
}),
121+
bytes_written.update({
122+
name: bytes_written[name] + bytes_to_write[name]
123+
for name in bytes_to_write
124+
}),
125+
126+
logging.debug(f"Bytes Read: {bytes_read}"),
127+
logging.debug(f"Bytes Written: {bytes_written}"),
128+
io_completed := cpu_percent,
129+
) if cpu_percent is not None else time.sleep(0.1),
130+
not (should_exit or io_completed >= 100)
131+
)
132+
for _ in range(1000000)
133+
if not (io_completed >= 100 or termination_event.is_set())
134+
],
135+
logging.info("IO benchmark completed")
136+
)
137+
)
138+
io_proc.start()
139+
procs.append(io_proc)
59140
60-
cpu_work = int(%i)
61141
if cpu_work > 0:
62-
print(f"[WfBench] [{task_name}] Starting CPU and Memory Benchmarks...", flush=True)
63-
cpu_threads=int(10 * %f)
64-
mem_threads=10 - cpu_threads
65-
total_mem_bytes = 0.05
142+
logging.info(f"Starting CPU and Memory Benchmarks for {task_name}...")
143+
144+
mem_threads = 10 - cpu_threads
66145
cpu_work_per_thread = int(cpu_work / cpu_threads)
67146
68147
cpu_procs = []
69-
cpu_prog = [
70-
f"{this_dir.joinpath('./bin/cpu-benchmark')}", f"{cpu_work_per_thread}"]
148+
mem_procs = []
149+
cpu_prog = [f"{cpu_benchmark}", f"{cpu_work_per_thread}"]
150+
mem_prog = ["stress-ng", "--vm", f"{mem_threads}",
151+
"--vm-bytes", "0.05%%", "--vm-keep"]
71152
72-
start = time.perf_counter()
73153
for i in range(cpu_threads):
74-
cpu_proc = subprocess.Popen(cpu_prog)
154+
cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
75155
cpu_procs.append(cpu_proc)
156+
monitor_thread = multiprocessing.Process(
157+
target=lambda proc=cpu_proc, queue=cpu_queue:
158+
[
159+
queue.put(float(line.strip().split()[1].strip('%%')))
160+
for line in iter(proc.stdout.readline, "")
161+
if line.strip() and line.strip().startswith("Progress:")
162+
]
163+
)
164+
monitor_thread.start()
76165
77166
if mem_threads > 0:
78-
mem_prog = ["stress-ng", "--vm", f"{mem_threads}",
79-
"--vm-bytes", f"{total_mem_bytes}%%", "--vm-keep"]
80-
mem_proc = subprocess.Popen(mem_prog, stderr=subprocess.DEVNULL)
81-
82-
for proc in cpu_procs:
83-
proc.wait()
84-
mem_kill = subprocess.Popen(["killall", "stress-ng"])
85-
mem_kill.wait()
86-
end = time.perf_counter()
87-
print(f"[WfBench] [{task_name}] Metrics (compute) [time,work]: {end - start},{cpu_work}", flush=True)
88-
print(f"[WfBench] [{task_name}] Completed CPU and Memory Benchmarks", flush=True)
89-
90-
print(f"[WfBench] [{task_name}] Writing output file", flush=True)
91-
start = time.perf_counter()
92-
with open(this_dir.joinpath("./data/%s"), "wb") as fp:
93-
file_size = int(%i)
94-
fp.write(os.urandom(file_size))
95-
end = time.perf_counter()
96-
print(f"[WfBench] [{task_name}] Metrics (write) [time,size]: {end - start},{file_size}", flush=True)
97-
98-
print(f"[WfBench] [{task_name}] Benchmark completed!", flush=True)
99-
dep = %i
167+
mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
168+
mem_procs.append(mem_proc)
169+
170+
procs.extend(cpu_procs)
171+
for proc in procs:
172+
if isinstance(proc, subprocess.Popen):
173+
proc.wait()
174+
if io_proc is not None and io_proc.is_alive():
175+
io_proc.join()
176+
177+
for mem_proc in mem_procs:
178+
try:
179+
os.kill(mem_proc.pid, signal.SIGKILL)
180+
except subprocess.TimeoutExpired:
181+
logging.debug("Memory process did not terminate; force-killing.")
182+
subprocess.Popen(["pkill", "-f", "stress-ng"]).wait()
183+
184+
logging.debug("Completed CPU and Memory Benchmarks!")
185+
186+
logging.info(f"Benchmark {task_name} completed!")
187+
188+
if 'workflow_id':
189+
fc_task.end()
190+
fc.stop()
191+
time.sleep(1)
100192
""";
101193

102194
# Generated code goes here

0 commit comments

Comments
 (0)