1- #!/usr/bin/env python
1+ #!/usr/bin/env python3
22# -*- coding: utf-8 -*-
33#
44# Copyright (c) 2021-2025 The WfCommons Team.
@@ -20,13 +20,17 @@ import re
2020import json
2121import logging
2222import pandas as pd
23+ import psutil
2324
2425from io import StringIO
2526from filelock import FileLock
2627from pathos .helpers import mp as multiprocessing
2728from typing import List , Optional
2829
2930
31+ int32_max = 2147483647
32+
33+
3034# Configure logging
3135logging .basicConfig (
3236 level = logging .INFO , # Change this to control the verbosity
@@ -39,6 +43,16 @@ logging.basicConfig(
3943this_dir = pathlib .Path (__file__ ).resolve ().parent
4044
4145
46+ def kill_process_and_children (proc ):
47+ try :
48+ parent = psutil .Process (proc .pid )
49+ children = parent .children (recursive = True )
50+ for child in children :
51+ child .kill ()
52+ parent .kill ()
53+ except psutil .NoSuchProcess :
54+ pass # Process is already dead
55+
4256def log_info (msg : str ):
4357 """
4458 Log an info message to stderr
@@ -165,34 +179,38 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue,
165179 :rtype: List
166180 """
167181 total_mem = f"{ total_mem } B" if total_mem else f"{ 100.0 / os .cpu_count ()} %"
168- cpu_work_per_thread = int (cpu_work / cpu_threads )
169-
170- cpu_procs = []
171- mem_procs = []
172- cpu_prog = [f"{ this_dir .joinpath ('cpu-benchmark' )} " , f"{ cpu_work_per_thread } " ]
182+ cpu_work_per_thread = int (1000000 * cpu_work / (16384 * cpu_threads ))
183+ cpu_samples = min (cpu_work_per_thread , int32_max )
184+ cpu_ops = (cpu_work_per_thread + int32_max - 1 ) // int32_max
185+ if cpu_ops > int32_max :
186+ log_info ("Exceeded maximum number of cpu work." )
187+ cpu_ops = int32_max
188+
189+ cpu_proc = None
190+ mem_proc = None
191+
192+ cpu_prog = ["stress-ng" , "--monte-carlo" , f"{ cpu_threads } " ,
193+ "--monte-carlo-method" , "pi" ,
194+ "--monte-carlo-rand" , "lcg" ,
195+ "--monte-carlo-samples" , f"{ cpu_samples } " ,
196+ "--monte-carlo-ops" , f"{ cpu_ops } " ]
173197 mem_prog = ["stress-ng" , "--vm" , f"{ mem_threads } " ,
174198 "--vm-bytes" , f"{ total_mem } " , "--vm-keep" ]
175199
176- for i in range ( cpu_threads ) :
177- cpu_proc = subprocess .Popen (cpu_prog , stdout = subprocess . PIPE , stderr = subprocess . PIPE , text = True )
200+ if cpu_threads > 0 :
201+ cpu_proc = subprocess .Popen (cpu_prog , preexec_fn = os . setsid )
178202
179203 # NOTE: might be a good idea to use psutil to set the affinity (works across platforms)
180204 if core :
181205 os .sched_setaffinity (cpu_proc .pid , {core })
182- cpu_procs .append (cpu_proc )
183-
184- # Start a thread to monitor the progress of each CPU benchmark process
185- monitor_thread = multiprocessing .Process (target = monitor_progress , args = (cpu_proc , cpu_queue ))
186- monitor_thread .start ()
187206
188207 if mem_threads > 0 :
189208 # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows
190209 mem_proc = subprocess .Popen (mem_prog , preexec_fn = os .setsid )
191210 if core :
192211 os .sched_setaffinity (mem_proc .pid , {core })
193- mem_procs .append (mem_proc )
194212
195- return cpu_procs , mem_procs
213+ return cpu_proc , mem_proc
196214
197215
198216def io_read_benchmark_user_input_data_size (inputs ,
@@ -446,22 +464,22 @@ def main():
446464 log_debug (f"{ args .name } acquired core { core } " )
447465
448466 mem_threads = int (10 - 10 * args .percent_cpu )
449- cpu_procs , mem_procs = cpu_mem_benchmark (cpu_queue = cpu_queue ,
467+ cpu_proc , mem_proc = cpu_mem_benchmark (cpu_queue = cpu_queue ,
450468 cpu_threads = int (10 * args .percent_cpu ),
451469 mem_threads = mem_threads ,
452- cpu_work = sys . maxsize if args .time else int (args .cpu_work ),
470+ cpu_work = int32_max ** 2 if args .time else int (args .cpu_work ),
453471 core = core ,
454472 total_mem = mem_bytes )
455473
456- procs .extend ( cpu_procs )
474+ procs .append ( cpu_proc )
457475 if args .time :
458476 time .sleep (int (args .time ))
459477 for proc in procs :
460478 if isinstance (proc , multiprocessing .Process ):
461479 if proc .is_alive ():
462480 proc .terminate ()
463481 elif isinstance (proc , subprocess .Popen ):
464- proc . terminate ( )
482+ kill_process_and_children ( proc )
465483 else :
466484 for proc in procs :
467485 if isinstance (proc , subprocess .Popen ):
@@ -470,11 +488,10 @@ def main():
470488 # io_proc.terminate()
471489 io_proc .join ()
472490
473- for mem_proc in mem_procs :
474- try :
475- os .kill (mem_proc .pid , signal .SIGKILL ) # Force kill if SIGTERM fails
476- except subprocess .TimeoutExpired :
477- log_debug ("Memory process did not terminate; force-killing." )
491+ try :
492+ os .kill (mem_proc .pid , signal .SIGKILL ) # Force kill if SIGTERM fails
493+ except subprocess .TimeoutExpired :
494+ log_debug ("Memory process did not terminate; force-killing." )
478495 # As a fallback, use pkill if any remaining instances are stuck
479496 subprocess .Popen (["pkill" , "-f" , "stress-ng" ]).wait ()
480497
0 commit comments