@@ -25,7 +25,7 @@ logging.basicConfig(
2525 handlers=[logging.StreamHandler()]
2626)
2727
28- workflow_id = " %s "
28+ workflow_id = " %s "
2929workflow_name = " %s "
3030out_files = [%s]
3131
@@ -63,12 +63,12 @@ string command =
6363"""
6464import logging
6565import os
66+ import sys
6667import pathlib
6768import signal
6869import socket
6970import subprocess
7071import time
71- from pathos.helpers import mp as multiprocessing
7272
7373__import__( " logging " ).basicConfig(
7474 level=logging.INFO,
@@ -77,14 +77,15 @@ __import__("logging").basicConfig(
7777 handlers=[logging.StreamHandler()]
7878)
7979
80- cpu_benchmark = " %s "
80+ wfbench = " %s "
8181task_name = " %s "
82- files_list = [" %s " ]
82+ input_file = [" %s " ]
8383gpu_work = int(%i)
8484cpu_work = int(%i)
8585percent_cpu = %f
8686cpu_threads = int(10 * percent_cpu)
87- output_data = { " %s " : int(%i)}
87+ output_file = " %s "
88+ output_file_size = int(%i)
8889dep = %i
8990workflow_id = " %s "
9091task_id = f " {workflow_id}_{task_name} "
@@ -106,131 +107,22 @@ if 'workflow_id':
106107
107108__import__( " logging " ).info(f " Starting {task_name} Benchmark on {socket.gethostname()} " )
108109
109- procs = []
110- cpu_queue = multiprocessing.Queue()
111- __import__( " logging " ).debug(f " Working directory: {os.getcwd()} " )
112-
113- __import__( " logging " ).debug( " Starting IO benchmark... " )
114- io_proc = None
115- termination_event = multiprocessing.Event()
116-
117- io_proc = multiprocessing.Process(
118- target=lambda inputs=files_list, outputs=output_data, cpu_queue=cpu_queue,
119- termination_event=termination_event: (
120- memory_limit := 10 * 1024 * 1024,
121- [open(name, " wb " ).close() for name in outputs],
122- io_completed := 0,
123- bytes_read := {name: 0 for name in inputs},
124- bytes_written := {name: 0 for name in outputs},
125- input_sizes := {name: __import__( " os " ).path.getsize(name) for name in inputs},
126- [
127- (
128- cpu_percent := cpu_queue.get(timeout=1.0),
129- should_exit := termination_event.is_set(),
130- (
131- while_loop_var := True,
132- [
133- (
134- new_val := (
135- cpu_queue.get(timeout = 1.0)
136- if not cpu_queue.empty() else None
137- ),
138- cpu_percent := (
139- max(cpu_percent, new_val)
140- if new_val is not None else cpu_percent
141- ),
142- while_loop_var := (
143- new_val is not None and not cpu_queue.empty()
144- )
145- )
146- for _ in range(100) if while_loop_var
147- ],
148- bytes_to_read := {
149- name: max(0, int(size * (cpu_percent / 100) - bytes_read[name]))
150- for name, size in input_sizes.items()
151- },
152- bytes_to_write := {
153- name: max(0, int(size * (cpu_percent / 100) - bytes_written[name]))
154- for name, size in outputs.items()
155- },
156- __import__( " logging " ).debug( " Starting IO Read Benchmark... " ),
157- in_file := list(bytes_to_read.keys())[0],
158- in_size := list(bytes_to_read.values())[0],
159- open(in_file, " rb " ).read(int(in_size)),
160- __import__( " logging " ).debug( " Completed IO Read Benchmark! " ),
161- out_file := list(outputs.keys())[0],
162- out_size := list(outputs.values())[0],
163- __import__( " logging " ).debug(f " Writing output file '{out_file}' " ),
164- open(out_file, " ab " ).write(__import__( " os " ).urandom(int(out_size))),
165- bytes_read.update({
166- name: bytes_read[name] + bytes_to_read[name]
167- for name in bytes_to_read
168- }),
169- bytes_written.update({
170- name: bytes_written[name] + bytes_to_write[name]
171- for name in bytes_to_write
172- }),
173-
174- __import__( " logging " ).debug(f " Bytes Read: {bytes_read} " ),
175- __import__( " logging " ).debug(f " Bytes Written: {bytes_written} " ),
176- io_completed := cpu_percent,
177- ) if cpu_percent is not None else time.sleep(0.1),
178- not (should_exit or io_completed >= 100)
179- )
180- for _ in range(1000000)
181- if not (io_completed >= 100 or termination_event.is_set())
182- ],
183- __import__( " logging " ).info( " IO benchmark completed " )
184- )
185- )
186- io_proc.start()
187- procs.append(io_proc)
188-
189- if cpu_work > 0:
190- __import__( " logging " ).info(f " Starting CPU and Memory Benchmarks for {task_name}... " )
191-
192- mem_threads = 10 - cpu_threads
193- cpu_work_per_thread = int(cpu_work / cpu_threads)
194-
195- cpu_procs = []
196- mem_procs = []
197- cpu_prog = [f " {cpu_benchmark} " , f " {cpu_work_per_thread} " ]
198- mem_prog = [ " stress-ng " , " --vm " , f " {mem_threads} " ,
199- " --vm-bytes " , " 0.05%% " , " --vm-keep " ]
200-
201- for i in range(cpu_threads):
202- cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
203- cpu_procs.append(cpu_proc)
204- monitor_thread = multiprocessing.Process(
205- target=lambda proc=cpu_proc, queue=cpu_queue:
206- [
207- queue.put(float(line.strip().split()[1].strip('%%')))
208- for line in iter(proc.stdout.readline, " " )
209- if line.strip() and line.strip().startswith( " Progress: " )
210- ]
211- )
212- monitor_thread.start()
213-
214- if mem_threads > 0:
215- mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
216- mem_procs.append(mem_proc)
217-
218- procs.extend(cpu_procs)
219- for proc in procs:
220- if isinstance(proc, subprocess.Popen):
221- proc.wait()
222- if io_proc is not None and io_proc.is_alive():
223- io_proc.join()
224-
225- for mem_proc in mem_procs:
226- try:
227- os.kill(mem_proc.pid, signal.SIGKILL)
228- except subprocess.TimeoutExpired:
229- __import__( " logging " ).debug( " Memory process did not terminate; force-killing. " )
230- subprocess.Popen([ " pkill " , " -f " , " stress-ng " ]).wait()
231-
232- __import__( " logging " ).info( " Completed CPU and Memory Benchmarks! " )
233-
110+ cmd = [
111+ sys.executable, wfbench,
112+ " --name " , task_name,
113+ " --workflow_id " , workflow_id,
114+ " --percent-cpu " , str(percent_cpu),
115+ " --cpu-work " , str(cpu_work),
116+ " --output-files " , f'{{ " {output_file} " : {output_file_size}}}',
117+ " --input-files " , str(input_file).replace( " ' " , ' " '),
118+ " --with-flowcept " ,
119+ ]
120+ if gpu_work:
121+ cmd += [ " --gpu-work " , str(gpu_work)]
122+
123+ logging.info(f " Launching wfbench for task {task_name} " )
124+ proc = subprocess.run(cmd)
125+
234126__import__( " logging " ).info(f " Benchmark {task_name} completed! " )
235127
236128if 'workflow_id':
0 commit comments