Skip to content

Commit 54c212e

Browse files
committed
Insane race-condition bug fix if wfbench.py (having to deal with killing
children processes that are becoming orphans) Some API updates to specify num_chunks at the user level
1 parent 4da3f66 commit 54c212e

3 files changed

Lines changed: 29 additions & 315 deletions

File tree

bin/wfbench

Lines changed: 23 additions & 311 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import os
1212
import pathlib
1313
import subprocess
1414
import time
15+
import signal
1516
import sys
1617
import argparse
1718
import re
@@ -80,26 +81,26 @@ class ProcessHandle:
8081
self._proc.terminate()
8182

8283
def terminate_along_with_children(self):
83-
# If it's a multiprocessing, just kill the parent and return
8484
if isinstance(self._proc, multiprocessing.Process):
8585
self._proc.terminate()
8686
return
87-
# If it's a Popen, then do the brute-force thing
8887
try:
89-
parent = psutil.Process(self._proc.pid)
90-
children = parent.children(recursive=True)
91-
for child in children:
92-
try:
93-
child.kill()
94-
except psutil.NoSuchProcess:
95-
pass # Process is already dead'
88+
pgid = os.getpgid(self._proc.pid)
89+
os.killpg(pgid, signal.SIGKILL)
90+
except ProcessLookupError:
91+
pass # group leader already gone, try children directly
92+
except PermissionError:
93+
pass
94+
finally:
95+
# Catch any re-parented children (ppid=1) that psutil can still see
9696
try:
97-
parent.kill()
97+
for child in psutil.Process(self._proc.pid).children(recursive=True):
98+
try:
99+
child.kill()
100+
except psutil.NoSuchProcess:
101+
pass
98102
except psutil.NoSuchProcess:
99-
pass # Nevermind
100-
except subprocess.TimeoutExpired:
101-
log_debug("Process did not terminate; force-killing.")
102-
subprocess.Popen(["pkill", "-f", "stress-ng"]).wait()
103+
pass
103104

104105
def wait(self):
105106
if isinstance(self._proc, multiprocessing.Process):
@@ -348,186 +349,6 @@ def unlock_core(path_locked: pathlib.Path,
348349
finally:
349350
lock.release()
350351

351-
# def monitor_progress(proc, cpu_queue):
352-
# """Monitor progress from the CPU benchmark process."""
353-
# for line in iter(proc.stdout.readline, ""): # No decode needed
354-
# line = line.strip()
355-
# if line.startswith("Progress:"):
356-
# try:
357-
# progress = float(line.split()[1].strip('%'))
358-
# cpu_queue.put(progress)
359-
# except (ValueError, IndexError):
360-
# pass
361-
#
362-
# def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue,
363-
# cpu_threads: Optional[int] = 5,
364-
# mem_threads: Optional[int] = 5,
365-
# cpu_work: Optional[int] = 100,
366-
# core: Optional[int] = None,
367-
# total_mem: Optional[int] = None) -> List:
368-
# """
369-
# Run CPU and memory benchmark.
370-
#
371-
# :param cpu_queue: Queue to push CPU benchmark progress as a float.
372-
# :type cpu_queue: multiprocessing.Queue
373-
# :param cpu_threads: Number of threads for CPU benchmark.
374-
# :type cpu_threads: Optional[int]
375-
# :param mem_threads: Number of threads for memory benchmark.
376-
# :type mem_threads: Optional[int]
377-
# :param cpu_work: Total work units for CPU benchmark.
378-
# :type cpu_work: Optional[int]
379-
# :param core: Core to pin the benchmark processes to.
380-
# :type core: Optional[int]
381-
# :param total_mem: Total memory to use for memory benchmark.
382-
# :type total_mem: Optional[float]
383-
#
384-
# :return: Lists of CPU and memory subprocesses.
385-
# :rtype: List
386-
# """
387-
# total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%"
388-
# cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2
389-
# cpu_samples = min(cpu_work_per_thread, int32_max)
390-
# cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max
391-
# if cpu_ops > int32_max:
392-
# log_info("Exceeded maximum allowed value of cpu work.")
393-
# cpu_ops = int32_max
394-
#
395-
# cpu_proc = None
396-
# mem_proc = None
397-
#
398-
# cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}",
399-
# "--monte-carlo-method", "pi",
400-
# "--monte-carlo-rand", "lcg",
401-
# "--monte-carlo-samples", f"{cpu_samples}",
402-
# "--monte-carlo-ops", f"{cpu_ops}",
403-
# "--quiet"]
404-
# mem_prog = ["stress-ng", "--vm", f"{mem_threads}",
405-
# "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"]
406-
#
407-
# if cpu_threads > 0:
408-
# cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid)
409-
#
410-
# # NOTE: might be a good idea to use psutil to set the affinity (works across platforms)
411-
# if core:
412-
# os.sched_setaffinity(cpu_proc.pid, {core})
413-
#
414-
# if mem_threads > 0:
415-
# # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows
416-
# mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
417-
# if core:
418-
# os.sched_setaffinity(mem_proc.pid, {core})
419-
#
420-
# return [cpu_proc, mem_proc]
421-
#
422-
#
423-
# def io_read_benchmark_user_input_data_size(inputs,
424-
# rundir=None,
425-
# memory_limit=None):
426-
# if memory_limit is None:
427-
# memory_limit = -1
428-
# memory_limit = int(memory_limit)
429-
# log_debug("Starting IO Read Benchmark...")
430-
# for file, size in inputs.items():
431-
# with open(rundir.joinpath(file), "rb") as fp:
432-
# log_debug(f"Reading '{file}'")
433-
# chunk_size = min(size, memory_limit)
434-
# while fp.read(chunk_size):
435-
# pass
436-
# log_debug("Completed IO Read Benchmark!")
437-
#
438-
#
439-
# def io_write_benchmark_user_input_data_size(outputs,
440-
# rundir=None,
441-
# memory_limit=None):
442-
# if memory_limit is None:
443-
# memory_limit = sys.maxsize
444-
# memory_limit = int(memory_limit)
445-
# for file_name, file_size in outputs.items():
446-
# log_debug(f"Writing output file '{file_name}'")
447-
# file_size_todo = file_size
448-
# while file_size_todo > 0:
449-
# with open(rundir.joinpath(file_name), "ab") as fp:
450-
# chunk_size = min(file_size_todo, memory_limit)
451-
# file_size_todo -= fp.write(os.urandom(int(chunk_size)))
452-
#
453-
#
454-
# def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None):
455-
# """Alternate between reading and writing to a file, ensuring read only happens after write."""
456-
#
457-
# if memory_limit is None:
458-
# memory_limit = 10 * 1024 * 1024 # sys.maxsize
459-
# memory_limit = int(memory_limit)
460-
#
461-
# # queue will have messages in the form (cpu_percent_completed)
462-
# # Get the last message and trash the rest
463-
#
464-
# # Create empty files
465-
# for name in outputs:
466-
# open(rundir.joinpath(name), "wb").close()
467-
#
468-
# io_completed = 1
469-
# bytes_read = {
470-
# name: 0
471-
# for name in inputs
472-
# }
473-
# bytes_written = {
474-
# name: 0
475-
# for name in outputs
476-
# }
477-
#
478-
# # get size of inputs
479-
# inputs = {
480-
# name: os.path.getsize(rundir.joinpath(name))
481-
# for name in inputs
482-
# }
483-
#
484-
# while io_completed < 100:
485-
# #cpu_percent = max(io_completed, cpu_queue.get())
486-
# #while True: # Get the last message
487-
# # try:
488-
# # cpu_percent = max(io_completed, cpu_queue.get_nowait())
489-
# # except queue.Empty:
490-
# # break
491-
#
492-
# log_debug(f"IO Percent: {io_completed}")
493-
# if True: #cpu_percent:
494-
# bytes_to_read = {
495-
# name: int(size * (io_completed / 100) - bytes_read[name])
496-
# for name, size in inputs.items()
497-
# }
498-
# bytes_to_write = {
499-
# name: int(size * (io_completed / 100) - bytes_written[name])
500-
# for name, size in outputs.items()
501-
# }
502-
# io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit)
503-
# io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit)
504-
#
505-
# bytes_read = {
506-
# name: bytes_read[name] + bytes_to_read[name]
507-
# for name in bytes_to_read
508-
# }
509-
# bytes_written = {
510-
# name: bytes_written[name] + bytes_to_write[name]
511-
# for name in bytes_to_write
512-
# }
513-
#
514-
# log_debug(f"Bytes Read: {bytes_read}")
515-
# log_debug(f"Bytes Written: {bytes_written}")
516-
#
517-
# io_completed = io_completed + 1
518-
#
519-
# if io_completed >= 100:
520-
# break
521-
522-
523-
# def gpu_benchmark(time: int = 100,
524-
# work: int = 100,
525-
# device: int = 0): #work, device
526-
#
527-
# gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"]
528-
# log_debug(f"Running GPU Benchmark: {gpu_prog}")
529-
# subprocess.Popen(gpu_prog, shell=True)
530-
531352

532353
def get_parser() -> argparse.ArgumentParser:
533354
parser = argparse.ArgumentParser()
@@ -550,7 +371,8 @@ def get_parser() -> argparse.ArgumentParser:
550371
"(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).")
551372
parser.add_argument("--input-files", help="Input files names as a JSON array "
552373
"(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).")
553-
parser.add_argument("--debug", action="store_true", help="Enable debug messages.")
374+
parser.add_argument("--silent", action="store_true", help="Disable all log messages.")
375+
parser.add_argument("--debug", action="store_true", help="Enable debug log messages.")
554376
parser.add_argument("--with-flowcept", action="store_true", default=False, help="Enable Flowcept monitoring.")
555377
parser.add_argument("--workflow_id", default=None, help="Id to group tasks in a workflow.")
556378

@@ -612,6 +434,8 @@ def main():
612434
if args.with_flowcept:
613435
flowcept, flowcept_task = begin_flowcept(args)
614436

437+
if args.silent:
438+
logging.getLogger().setLevel(logging.NOTSET)
615439
if args.debug:
616440
logging.getLogger().setLevel(logging.DEBUG)
617441

@@ -625,9 +449,9 @@ def main():
625449
path_cores = pathlib.Path(args.path_cores)
626450
core = lock_core(path_locked, path_cores)
627451

628-
# if args.time and (not args.cpu_work and not args.gpu_work):
629-
# log_error("If --time is provided, at least one of --cpu-work and --gpu-work must also be provided.")
630-
# sys.exit(1)
452+
if not args.time and (not args.cpu_work and not args.gpu_work):
453+
log_error("At least one of --time, --cpu-work, or --gpu-work should be provided.")
454+
sys.exit(1)
631455

632456
# Compute the (feasible) number of chunks based on the arguments
633457
num_chunks = compute_num_chunks(args)
@@ -782,119 +606,7 @@ def main():
782606
if args.with_flowcept:
783607
end_flowcept(flowcept, flowcept_task)
784608

785-
log_debug(f"{args.name} Benchmark Completed")
786-
787-
# OLD CODE BELOW:
788-
#
789-
# procs = []
790-
# io_proc = None
791-
# outputs_dict = {}
792-
#
793-
# cpu_queue = multiprocessing.Queue()
794-
#
795-
# log_debug(f"Working directory: {os.getcwd()}")
796-
#
797-
# if cleaned_input or cleaned_output:
798-
# log_debug("Starting IO benchmark...")
799-
#
800-
# # Attempt to parse the cleaned string
801-
# try:
802-
# outputs_dict = json.loads(cleaned_output)
803-
# except json.JSONDecodeError as e:
804-
# log_error(f"Failed to decode --output-files JSON string argument: {e}")
805-
# sys.exit(1)
806-
#
807-
# try:
808-
# inputs_array = json.loads(cleaned_input)
809-
# except json.JSONDecodeError as e:
810-
# log_error(f"Failed to decode --input-files JSON string argument: {e}")
811-
# sys.exit(1)
812-
#
813-
# # print("OUTPUT", outputs_dict)
814-
# # print("INPUTS", inputs_array)
815-
#
816-
# # Create a multiprocessing event that in the first run is set to True
817-
# write_done_event = multiprocessing.Event()
818-
# # Set this to True to allow the first read to happen
819-
# write_done_event.set()
820-
# # Print the value of the event
821-
# # print("Event Value:", write_done_event.is_set())
822-
#
823-
# io_proc = multiprocessing.Process(
824-
# target=io_alternate,
825-
# args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event)
826-
# )
827-
# io_proc.start()
828-
# procs.append(io_proc)
829-
#
830-
# if args.gpu_work:
831-
# log_info(f"Starting GPU Benchmark for {args.name}...")
832-
# available_gpus = get_available_gpus() #checking for available GPUs
833-
#
834-
# if not available_gpus:
835-
# log_error("No GPU available")
836-
# sys.exit(1)
837-
# else:
838-
# device = available_gpus[0]
839-
# log_debug(f"Running on GPU {device}")
840-
#
841-
# if args.time:
842-
# log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}")
843-
# gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device)
844-
# else:
845-
# gpu_benchmark(work=int(args.gpu_work), device=device)
846-
#
847-
# if args.cpu_work:
848-
# log_info(f"Starting CPU and Memory Benchmarks for {args.name}...")
849-
# if core:
850-
# log_debug(f"{args.name} acquired core {core}")
851-
#
852-
# mem_threads=int(10 - 10 * args.percent_cpu)
853-
# [cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue,
854-
# cpu_threads=int(10 * args.percent_cpu),
855-
# mem_threads=mem_threads,
856-
# cpu_work=int32_max**2 if args.time else int(args.cpu_work),
857-
# core=core,
858-
# total_mem=mem_bytes)
859-
# procs.append(cpu_proc)
860-
# if args.time:
861-
# time.sleep(int(args.time))
862-
# for proc in procs:
863-
# if isinstance(proc, multiprocessing.Process):
864-
# if proc.is_alive():
865-
# proc.terminate()
866-
# elif isinstance(proc, subprocess.Popen):
867-
# kill_process_and_children(proc)
868-
# else:
869-
# for proc in procs:
870-
# if isinstance(proc, subprocess.Popen):
871-
# proc.wait()
872-
# if io_proc is not None and io_proc.is_alive():
873-
# #io_proc.terminate()
874-
# io_proc.join()
875-
#
876-
# try:
877-
# kill_process_and_children(mem_proc)
878-
# except subprocess.TimeoutExpired:
879-
# log_debug("Memory process did not terminate; force-killing.")
880-
# # As a fallback, use pkill if any remaining instances are stuck
881-
# subprocess.Popen(["pkill", "-f", "stress-ng"]).wait()
882-
#
883-
# log_debug("Completed CPU and Memory Benchmarks!")
884-
#
885-
# NOTE: If you would like to run only IO add time.sleep(2)
886-
# Check if all procs are done, if not, kill them
887-
# log_debug("Checking if all processes are done...")
888-
# for proc in procs:
889-
# if isinstance(proc, multiprocessing.Process):
890-
# if proc.is_alive():
891-
# proc.terminate()
892-
# proc.join()
893-
# if isinstance(proc, subprocess.Popen):
894-
# proc.wait()
895-
#
896-
#
897-
# log_info(f"Benchmark {args.name} completed!")
609+
log_info(f"{args.name} benchmark completed")
898610

899611
if __name__ == "__main__":
900612
main()

0 commit comments

Comments
 (0)