1+ from fractions import Fraction
2+ from wfcommons .wfgen .abstract_recipe import WorkflowRecipe
3+ from wfcommons import WorkflowGenerator
4+ from typing import Dict , Union , List , Type , Tuple
5+ from numpy .random import choice
6+ from wfcommons .wfperf .data_gen import generate_sys_data , cleanup_sys_files
7+ import pathlib
8+ import json
9+ import subprocess
10+
11+ class WorkflowBenchmark ():
12+ def __init__ (self , Recipe : Type [WorkflowRecipe ], num_tasks : int ) -> None :
13+ self .Recipe = Recipe
14+ self .num_tasks = num_tasks
15+
16+ def create (self ,
17+ save_dir : pathlib .Path ,
18+ tasks : Dict [str , Tuple [int , int ]],
19+ mem_total_size : str = "1000T" ,
20+ block_size : str = "4096" ,
21+ verbose : bool = False ) -> Dict :
22+
23+
24+ if verbose :
25+ print ("Checking if the sysbench is installed." )
26+ self ._check_sysbench ()
27+ if verbose :
28+ print ("Creating directory." )
29+ save_dir = pathlib .Path (save_dir ).resolve ()
30+ save_dir .mkdir (exist_ok = True , parents = True )
31+
32+ if verbose :
33+ print ("Generating workflow" )
34+ generator = WorkflowGenerator (self .Recipe .from_num_tasks (self .num_tasks ))
35+ workflow = generator .build_workflow ()
36+ workflow .write_json (f'{ save_dir .joinpath (workflow .name )} .json' )
37+
38+ with open (f'{ save_dir .joinpath (workflow .name )} .json' ) as json_file :
39+ wf = json .load (json_file )
40+
41+ params = {
42+ job : [
43+ f"--memory-block-size={ block_size } " ,
44+ f"--memory-total-size={ mem_total_size } " ,
45+ f"--cpu-max-prime={ cpu_max_prime } " ,
46+ f"--percent_cpu={ percent_cpu } " ,
47+ f"--forced-shutdown=0" ,
48+ f"--time={ time } "
49+ ]
50+ for job , (cpu_max_prime , percent_cpu , time ) in tasks .items ()
51+ }
52+
53+ for job in wf ["workflow" ]["jobs" ]:
54+ job ["files" ] = []
55+ job .setdefault ("command" , {})
56+ job ["command" ]["program" ] = f"sys_test.py"
57+ job_name = job ["name" ].rsplit ("_" , 1 )[0 ]
58+ job ["command" ]["arguments" ] = params [job_name ]
59+
60+
61+ with open (f'{ save_dir .joinpath (workflow .name )} .json' , 'w' ) as fp :
62+ json .dump (wf , fp , indent = 4 )
63+
64+
65+ def _check_sysbench (self ,):
66+ proc = subprocess .Popen (["which" , "sysbench" ], stdout = subprocess .PIPE )
67+ out , _ = proc .communicate ()
68+ if not out :
69+ raise FileNotFoundError ("Sysbench not found. Please install sysbench: https://github.com/aakopytov/sysbench" )
0 commit comments