Skip to content

Commit 90a6a49

Browse files
committed
Updagted wfbench to make it callable as a module
1 parent 8bd49b5 commit 90a6a49

1 file changed

Lines changed: 62 additions & 48 deletions

File tree

bin/wfbench

Lines changed: 62 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ def get_parser() -> argparse.ArgumentParser:
363363
"computation throughout the execution (fewer chunks may be used "
364364
"if amounts of work and or input/output file sizes are too small).")
365365
parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.")
366-
parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete "
366+
parser.add_argument("--time-limit", default=None, help="Time limit (in seconds) to complete "
367367
"the computational portion of the benchmark (overrides CPU and GPU works). "
368368
"Is only approximate since I/O time may make the overall time longer.")
369369
parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).")
@@ -379,15 +379,15 @@ def get_parser() -> argparse.ArgumentParser:
379379
return parser
380380

381381

382-
def begin_flowcept(args):
382+
def begin_flowcept(workflow_id, name, used):
383383
log_debug("Running with Flowcept.")
384384
from flowcept import Flowcept, FlowceptTask
385385
# TODO: parametrize to allow storing individual tasks
386-
f = Flowcept(workflow_id=args.workflow_id,
387-
bundle_exec_id=args.workflow_id,
386+
f = Flowcept(workflow_id=workflow_id,
387+
bundle_exec_id=workflow_id,
388388
start_persistence=False, save_workflow=False)
389389
f.start()
390-
t = FlowceptTask(task_id=f"{args.workflow_id}_{args.name}", workflow_id=args.workflow_id, used={**args.__dict__})
390+
t = FlowceptTask(task_id=f"{workflow_id}_{name}", workflow_id=workflow_id, used=used)
391391
return f, t
392392

393393

@@ -396,22 +396,22 @@ def end_flowcept(flowcept, flowcept_task):
396396
flowcept.stop()
397397

398398

399-
def compute_num_chunks(args):
399+
def compute_num_chunks(time_limit, cpu_work, gpu_work, num_chunks):
400400
# Compute the (feasible number of chunks)
401401
min_chunk_size_time = 1.0 # At least 1 second per chunk, if we're doing time-based
402402
# TODO: Pick reasonable factors below so that a chunk takes about min_chunk_size_time sec on a reasonable machine
403403
min_chunk_size_cpu_work = 3000000 * min_chunk_size_time # 1s on my MacBook Pro
404404
min_chunk_size_gpu_work = 30000000 * min_chunk_size_time # unknown.....
405405

406-
if args.time:
407-
num_chunks = min(int(args.num_chunks), int(float(args.time) / min_chunk_size_time))
406+
if time_limit:
407+
num_chunks = min(int(num_chunks), int(float(time_limit) / min_chunk_size_time))
408408
else:
409-
if args.cpu_work:
410-
num_chunks_cpu = min(int(args.num_chunks), int(float(args.cpu_work) / min_chunk_size_cpu_work))
409+
if cpu_work:
410+
num_chunks_cpu = min(int(num_chunks), int(float(cpu_work) / min_chunk_size_cpu_work))
411411
else:
412412
num_chunks_cpu = 1
413-
if args.gpu_work:
414-
num_chunks_gpu = min(int(args.num_chunks), int(float(args.gpu_work) / min_chunk_size_gpu_work))
413+
if gpu_work:
414+
num_chunks_gpu = min(int(num_chunks), int(float(gpu_work) / min_chunk_size_gpu_work))
415415
else:
416416
num_chunks_gpu = 1
417417
num_chunks = min(num_chunks_cpu, num_chunks_gpu)
@@ -425,36 +425,38 @@ def kill_current_handles(handles: list[ProcessHandle]):
425425
handle.terminate_along_with_children()
426426

427427

428-
def main():
429-
"""Main program."""
430-
parser = get_parser()
431-
args = parser.parse_args()
432-
core = None
428+
def run(workflow_id, name, with_flowcept, silent, debug, rundir, path_lock, path_cores,
429+
time_limit, cpu_work, percent_cpu, mem, gpu_work, num_chunks,
430+
input_files, output_files):
431+
"""Main function."""
433432

434-
if args.with_flowcept:
435-
flowcept, flowcept_task = begin_flowcept(args)
433+
if with_flowcept:
434+
flowcept, flowcept_task = begin_flowcept(workflow_id, name, locals())
435+
else:
436+
flowcept = None
437+
flowcept_task = None
436438

437-
if args.silent:
439+
if silent:
438440
logging.getLogger().setLevel(logging.NOTSET)
439-
if args.debug:
441+
if debug:
440442
logging.getLogger().setLevel(logging.DEBUG)
441443

442-
if args.rundir:
443-
rundir = pathlib.Path(args.rundir)
444+
if rundir:
445+
rundir = pathlib.Path(rundir)
444446
else:
445447
rundir = pathlib.Path(os.getcwd())
446448

447-
if args.path_lock and args.path_cores:
448-
path_locked = pathlib.Path(args.path_lock)
449-
path_cores = pathlib.Path(args.path_cores)
449+
if path_lock and path_cores:
450+
path_locked = pathlib.Path(path_lock)
451+
path_cores = pathlib.Path(path_cores)
450452
core = lock_core(path_locked, path_cores)
451-
452-
if not args.time and (not args.cpu_work and not args.gpu_work):
453-
log_error("At least one of --time, --cpu-work, or --gpu-work should be provided.")
454-
sys.exit(1)
453+
else:
454+
path_locked = None
455+
path_cores = None
456+
core = None
455457

456458
# Compute the (feasible) number of chunks based on the arguments
457-
num_chunks = compute_num_chunks(args)
459+
num_chunks = compute_num_chunks(time_limit, cpu_work, gpu_work, num_chunks)
458460
log_debug(f"Executing benchmark with {num_chunks} chunks.")
459461

460462
# At this point we know the number of chunks, and we can just iterate as follows (N = num_chunks + 2)
@@ -470,17 +472,17 @@ def main():
470472
N = num_chunks + 2
471473
steps = [{"io_read_benchmark": IOReadBenchmark(),
472474
"io_write_benchmark": IOWriteBenchmark(),
473-
"cpu_benchmark": CPUBenchmark(cpu_threads=int(10 * args.percent_cpu),
474-
mem_threads=int(10 - 10 * args.percent_cpu),
475+
"cpu_benchmark": CPUBenchmark(cpu_threads=int(10 * percent_cpu),
476+
mem_threads=int(10 - 10 * percent_cpu),
475477
core=core,
476-
total_mem=args.mem * 1000 * 1000 if args.mem else None),
478+
total_mem=mem * 1000 * 1000 if mem else None),
477479
"gpu_benchmark": GPUBenchmark()} for i in range(N)]
478480

479481
min_chunk_size_data = 1000 # 1KB per chunk at a minimum for each input / output file, otherwise the file
480482
# is read/written all at once at the beginning/end
481483

482484
# Augment I/O read benchmarks for each input file
483-
cleaned_input = "{}" if args.input_files is None else re.sub(r'\\+', '', args.input_files)
485+
cleaned_input = "{}" if input_files is None else re.sub(r'\\+', '', input_files)
484486
try:
485487
input_files = json.loads(cleaned_input)
486488
except json.JSONDecodeError as e:
@@ -503,7 +505,7 @@ def main():
503505
steps[step]["io_read_benchmark"].add_read_operation(file_path, opened_file, num_bytes)
504506

505507
# Augment I/O write benchmarks for each output file
506-
cleaned_output = "{}" if args.output_files is None else re.sub(r'\\+', '', args.output_files)
508+
cleaned_output = "{}" if output_files is None else re.sub(r'\\+', '', output_files)
507509
try:
508510
output_files = json.loads(cleaned_output)
509511
except json.JSONDecodeError as e:
@@ -526,25 +528,25 @@ def main():
526528
steps[step]["io_write_benchmark"].add_write_operation(file_path, opened_file, num_bytes)
527529

528530
# Augment CPU benchmark with computation (if need be)
529-
if args.cpu_work:
530-
if args.time:
531+
if cpu_work:
532+
if time_limit:
531533
for step in range(1, N-1):
532534
steps[step]["cpu_benchmark"].set_infinite_work()
533535
else:
534536
for step in range(1, N-1):
535-
chunk_work = int(args.cpu_work) // num_chunks + (int(args.cpu_work) % num_chunks > step - 1)
537+
chunk_work = int(cpu_work) // num_chunks + (int(cpu_work) % num_chunks > step - 1)
536538
steps[step]["cpu_benchmark"].set_work(chunk_work)
537539

538540
# Augment GPU benchmark with computation (if need be)
539-
if args.gpu_work:
540-
if args.time:
541+
if gpu_work:
542+
if time_limit:
541543
for step in range(1, N - 1):
542544
steps[step]["gpu_benchmark"].set_device()
543-
steps[step]["gpu_benchmark"].set_work(int(args.gpu_work))
544-
steps[step]["gpu_benchmark"].set_time(float(args.time))
545+
steps[step]["gpu_benchmark"].set_work(int(gpu_work))
546+
steps[step]["gpu_benchmark"].set_time(float(time_limit))
545547
else:
546548
for step in range(1, N - 1):
547-
chunk_work = int(args.gpu_work) // num_chunks + (int(args.gpu_work) % num_chunks > step - 1)
549+
chunk_work = int(gpu_work) // num_chunks + (int(gpu_work) % num_chunks > step - 1)
548550
steps[step]["gpu_benchmark"].set_device()
549551
steps[step]["gpu_benchmark"].set_work(chunk_work)
550552

@@ -565,9 +567,9 @@ def main():
565567
current_proc_handles[:] = [io_read_process, cpu_benchmark_process, memory_benchmark_process, gpu_benchmark_process]
566568

567569
# If time based, sleep the required amount of time and kill the process
568-
if args.time:
570+
if time_limit:
569571
if cpu_benchmark_process is not None or gpu_benchmark_process is not None:
570-
time.sleep(float(args.time) / num_chunks)
572+
time.sleep(float(time_limit) / num_chunks)
571573
if cpu_benchmark_process is not None:
572574
cpu_benchmark_process.terminate_along_with_children()
573575
if gpu_benchmark_process is not None:
@@ -603,10 +605,22 @@ def main():
603605
if core:
604606
unlock_core(path_locked, path_cores, core)
605607

606-
if args.with_flowcept:
608+
if with_flowcept:
607609
end_flowcept(flowcept, flowcept_task)
608610

609-
log_info(f"{args.name} benchmark completed")
611+
log_info(f"{name} benchmark completed")
612+
613+
def main():
614+
# Parse command-line argument
615+
parser = get_parser()
616+
args = parser.parse_args()
617+
618+
# Sanity checks
619+
if not args.time_limit and (not args.cpu_work and not args.gpu_work):
620+
log_error("At least one of --time-limit, --cpu-work, or --gpu-work should be provided.")
621+
sys.exit(1)
610622

623+
run(**vars(args))
624+
611625
if __name__ == "__main__":
612626
main()

0 commit comments

Comments
 (0)