|
8 | 8 | # the Free Software Foundation, either version 3 of the License, or |
9 | 9 | # (at your option) any later version. |
10 | 10 |
|
11 | | -from typing import Dict, Optional, Set |
12 | | - |
13 | | -from wfcommons.common.workflow import Workflow |
| 11 | +import json |
| 12 | +import pathlib |
14 | 13 |
|
15 | | -from wfcommons.generator.workflow.abstract_recipe import WorkflowRecipe |
16 | | -from wfcommons.wfchef.duplicate import duplicate |
| 14 | +from logging import Logger |
| 15 | +from typing import Dict, Optional, Set |
17 | 16 |
|
18 | | -import pathlib |
19 | | -import pickle |
20 | | -import networkx as nx |
21 | | -import numpy as np |
22 | | -import pandas as pd |
23 | | -import json |
| 17 | +from wfcommons.wfgen.abstract_recipe import WorkflowRecipe |
24 | 18 |
|
25 | 19 | this_dir = pathlib.Path(__file__).resolve().parent |
26 | 20 |
|
27 | 21 |
|
28 | 22 | class BlastRecipe(WorkflowRecipe): |
29 | 23 | """A Blast workflow recipe class for creating synthetic workflow traces. |
30 | 24 |
|
31 | | - :param num_pairs: The number of pair of signals to estimate earthquake STFs. |
32 | | - :type num_pairs: int |
33 | 25 | :param data_footprint: The upper bound for the workflow total data footprint (in bytes). |
34 | 26 | :type data_footprint: int |
35 | 27 | :param num_tasks: The upper bound for the total number of tasks in the workflow. |
36 | 28 | :type num_tasks: int |
| 29 | + :param exclude_graphs: |
| 30 | + :type exclude_graphs: Set |
| 31 | + :param runtime_factor: The factor of which tasks runtime will be increased/decreased. |
| 32 | + :type runtime_factor: float |
| 33 | + :param input_file_size_factor: The factor of which tasks input files size will be increased/decreased. |
| 34 | + :type input_file_size_factor: float |
| 35 | + :param output_file_size_factor: The factor of which tasks output files size will be increased/decreased. |
| 36 | + :type output_file_size_factor: float |
| 37 | + :param logger: The logger where to log information/warning or errors (optional). |
| 38 | + :type logger: Logger |
37 | 39 | """ |
38 | 40 |
|
39 | 41 | def __init__(self, |
40 | 42 | data_footprint: Optional[int] = 0, |
41 | 43 | num_tasks: Optional[int] = 3, |
42 | 44 | exclude_graphs: Set[str] = set(), |
| 45 | + runtime_factor: Optional[float] = 1.0, |
| 46 | + input_file_size_factor: Optional[float] = 1.0, |
| 47 | + output_file_size_factor: Optional[float] = 1.0, |
| 48 | + logger: Optional[Logger] = None, |
43 | 49 | **kwargs) -> None: |
44 | | - super().__init__("Blast", data_footprint, num_tasks) |
45 | | - self.exclude_graphs = exclude_graphs |
46 | | - |
47 | | - def generate_nx_graph(self) -> nx.DiGraph: |
48 | | - summary_path = this_dir.joinpath("microstructures", "summary.json") |
49 | | - summary = json.loads(summary_path.read_text()) |
50 | | - |
51 | | - metric_path = this_dir.joinpath("microstructures", "metric", "err.csv") |
52 | | - df = pd.read_csv(str(metric_path), index_col=0) |
53 | | - df = df.drop(self.exclude_graphs, axis=0, errors="ignore") |
54 | | - df = df.drop(self.exclude_graphs, axis=1, errors="ignore") |
55 | | - for col in df.columns: |
56 | | - df.loc[col, col] = np.nan |
57 | | - |
58 | | - reference_orders = [summary["base_graphs"][col]["order"] for col in df.columns] |
59 | | - idx = np.argmin([abs(self.num_tasks - ref_num_tasks) for ref_num_tasks in reference_orders]) |
60 | | - reference = df.columns[idx] |
61 | | - |
62 | | - base = df.index[df[reference].argmin()] |
63 | | - graph = duplicate(this_dir.joinpath("microstructures"), base, self.num_tasks) |
64 | | - return graph |
| 50 | + super().__init__("Blast", data_footprint, num_tasks, exclude_graphs, runtime_factor, input_file_size_factor, |
| 51 | + output_file_size_factor, logger, this_dir) |
65 | 52 |
|
66 | 53 | @classmethod |
67 | | - def from_num_tasks(cls, num_tasks: int, exclude_graphs: Set[str] = set()) -> 'BlastRecipe': |
| 54 | + def from_num_tasks(cls, |
| 55 | + num_tasks: int, |
| 56 | + exclude_graphs: Set[str] = set(), |
| 57 | + runtime_factor: Optional[float] = 1.0, |
| 58 | + input_file_size_factor: Optional[float] = 1.0, |
| 59 | + output_file_size_factor: Optional[float] = 1.0 |
| 60 | + ) -> 'BlastRecipe': |
68 | 61 | """ |
69 | 62 | Instantiate a Blast workflow recipe that will generate synthetic workflows up to |
70 | 63 | the total number of tasks provided. |
71 | 64 |
|
72 | 65 | :param num_tasks: The upper bound for the total number of tasks in the workflow (at least 3). |
73 | 66 | :type num_tasks: int |
| 67 | + :param exclude_graphs: |
| 68 | + :type exclude_graphs: Set |
| 69 | + :param runtime_factor: The factor of which tasks runtime will be increased/decreased. |
| 70 | + :type runtime_factor: float |
| 71 | + :param input_file_size_factor: The factor of which tasks input files size will be increased/decreased. |
| 72 | + :type input_file_size_factor: float |
| 73 | + :param output_file_size_factor: The factor of which tasks output files size will be increased/decreased. |
| 74 | + :type output_file_size_factor: float |
74 | 75 |
|
75 | 76 | :return: A Blast workflow recipe object that will generate synthetic workflows up |
76 | 77 | to the total number of tasks provided. |
77 | 78 | :rtype: BlastRecipe |
78 | 79 | """ |
79 | | - return BlastRecipe(num_tasks=num_tasks, exclude_graphs=exclude_graphs) |
80 | | - |
81 | | - def _load_base_graph(self) -> nx.DiGraph: |
82 | | - return pickle.loads(this_dir.joinpath("base_graph.pickle").read_bytes()) |
83 | | - |
84 | | - def _load_microstructures(self) -> Dict: |
85 | | - return json.loads(this_dir.joinpath("microstructures.json").read_text()) |
86 | | - |
87 | | - def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: |
88 | | - """Generate a synthetic workflow trace of a Blast workflow. |
89 | | -
|
90 | | - :param workflow_name: The workflow name |
91 | | - :type workflow_name: int |
92 | | -
|
93 | | - :return: A synthetic workflow trace object. |
94 | | - :rtype: Workflow |
95 | | - """ |
96 | | - workflow = Workflow(name=self.name + "-synthetic-trace" if not workflow_name else workflow_name, makespan=None) |
97 | | - graph = self.generate_nx_graph() |
98 | | - |
99 | | - task_names = {} |
100 | | - for node in graph.nodes: |
101 | | - if node in ["SRC", "DST"]: |
102 | | - continue |
103 | | - node_type = graph.nodes[node]["type"] |
104 | | - task_name = self._generate_task_name(node_type) |
105 | | - task = self._generate_task(node_type, task_name) |
106 | | - workflow.add_node(task_name, task=task) |
107 | | - |
108 | | - task_names[node] = task_name |
109 | | - |
110 | | - for (src, dst) in graph.edges: |
111 | | - if src in ["SRC", "DST"] or dst in ["SRC", "DST"]: |
112 | | - continue |
113 | | - workflow.add_edge(task_names[src], task_names[dst]) |
114 | | - |
115 | | - workflow.nxgraph = graph |
116 | | - self.workflows.append(workflow) |
117 | | - return workflow |
| 80 | + return BlastRecipe(num_tasks=num_tasks, exclude_graphs=exclude_graphs, runtime_factor=runtime_factor, |
| 81 | + input_file_size_factor=input_file_size_factor, |
| 82 | + output_file_size_factor=output_file_size_factor) |
118 | 83 |
|
119 | 84 | def _workflow_recipe(self) -> Dict: |
120 | 85 | """ |
|
0 commit comments