11#!/usr/bin/env python
22# -*- coding: utf-8 -*-
33#
4- # Copyright (c) 2020 The WorkflowHub Team.
4+ # Copyright (c) 2020-2021 The WorkflowHub Team.
55#
66# This program is free software: you can redistribute it and/or modify
77# it under the terms of the GNU General Public License as published by
@@ -31,17 +31,39 @@ class WorkflowRecipe(ABC):
3131 :type data_footprint: int
3232 :param num_tasks: The upper bound for the total number of tasks in the workflow.
3333 :type num_tasks: int
34+ :param runtime_factor: The factor of which tasks runtime will be increased/decreased.
35+ :type runtime_factor: float
36+ :param input_file_size_factor: The factor of which tasks input files size will be increased/decreased.
37+ :type input_file_size_factor: float
38+ :param output_file_size_factor: The factor of which tasks output files size will be increased/decreased.
39+ :type output_file_size_factor: float
3440 :param logger: The logger where to log information/warning or errors (optional).
3541 :type logger: Logger
3642 """
3743
38- def __init__ (self , name : str , data_footprint : Optional [int ], num_tasks : Optional [int ],
44+ def __init__ (self , name : str ,
45+ data_footprint : Optional [int ],
46+ num_tasks : Optional [int ],
47+ runtime_factor : Optional [float ] = 1.0 ,
48+ input_file_size_factor : Optional [float ] = 1.0 ,
49+ output_file_size_factor : Optional [float ] = 1.0 ,
3950 logger : Optional [Logger ] = None ) -> None :
4051 """Create an object of the workflow recipe."""
52+ # sanity checks
53+ if runtime_factor <= 0.0 :
54+ raise ValueError ("The runtime factor should be a number higher than 0.0." )
55+ if input_file_size_factor <= 0.0 :
56+ raise ValueError ("The input file size factor should be a number higher than 0.0." )
57+ if output_file_size_factor <= 0.0 :
58+ raise ValueError ("The output file size factor should be a number higher than 0.0." )
59+
4160 self .logger = logging .getLogger (__name__ ) if logger is None else logger
4261 self .name = name
4362 self .data_footprint = data_footprint
4463 self .num_tasks = num_tasks
64+ self .runtime_factor = runtime_factor
65+ self .input_file_size_factor = input_file_size_factor
66+ self .output_file_size_factor = output_file_size_factor
4567 self .workflows : List [Workflow ] = []
4668 self .tasks_files : Dict [str , List [File ]] = {}
4769 self .task_id_counter = 1
@@ -57,13 +79,24 @@ def _workflow_recipe(self) -> Dict[str, Any]:
5779
5880 @classmethod
5981 @abstractmethod
60- def from_num_tasks (cls , num_tasks : int ) -> 'WorkflowRecipe' :
82+ def from_num_tasks (cls ,
83+ num_tasks : int ,
84+ runtime_factor : Optional [float ] = 1.0 ,
85+ input_file_size_factor : Optional [float ] = 1.0 ,
86+ output_file_size_factor : Optional [float ] = 1.0
87+ ) -> 'WorkflowRecipe' :
6188 """
6289 Instantiate a workflow recipe that will generate synthetic workflows up to the
6390 total number of tasks provided.
6491
6592 :param num_tasks: The upper bound for the total number of tasks in the workflow.
6693 :type num_tasks: int
94+ :param runtime_factor: The factor of which tasks runtime will be increased/decreased.
95+ :type runtime_factor: float
96+ :param input_file_size_factor: The factor of which tasks input files size will be increased/decreased.
97+ :type input_file_size_factor: float
98+ :param output_file_size_factor: The factor of which tasks output files size will be increased/decreased.
99+ :type output_file_size_factor: float
67100
68101 :return: A workflow recipe object that will generate synthetic workflows up to
69102 the total number of tasks provided.
@@ -100,9 +133,10 @@ def _generate_task(self, task_name: str, task_id: str, input_files: Optional[Lis
100133 task_recipe = self ._workflow_recipe ()[task_name ]
101134
102135 # runtime
103- runtime : float = float (format (generate_rvs (task_recipe ['runtime' ]['distribution' ],
104- task_recipe ['runtime' ]['min' ],
105- task_recipe ['runtime' ]['max' ]), '.3f' ))
136+ runtime : float = float (format (
137+ self .runtime_factor * generate_rvs (task_recipe ['runtime' ]['distribution' ],
138+ task_recipe ['runtime' ]['min' ],
139+ task_recipe ['runtime' ]['max' ]), '.3f' ))
106140
107141 # linking previous generated output files as input files
108142 self .tasks_files [task_id ] = []
@@ -184,11 +218,13 @@ def _generate_file(self, extension: str, recipe: Dict[str, Any], link: FileLink)
184218 :return: The generated file.
185219 :rtype: File
186220 """
221+ size = int ((self .input_file_size_factor if link == FileLink .INPUT
222+ else self .output_file_size_factor ) * generate_rvs (recipe [extension ]['distribution' ],
223+ recipe [extension ]['min' ],
224+ recipe [extension ]['max' ]))
187225 return File (name = str (uuid .uuid4 ()) + extension ,
188226 link = link ,
189- size = int (generate_rvs (recipe [extension ]['distribution' ],
190- recipe [extension ]['min' ],
191- recipe [extension ]['max' ])))
227+ size = size )
192228
193229 def _get_files_by_task_and_link (self , task_id : str , link : FileLink ) -> List [File ]:
194230 """Get the list of files for a task ID and link type.
0 commit comments