Skip to content

Commit 3d8647f

Browse files
Merge branch 'wfperf' into feature/wfchef
2 parents 95b7b24 + 5623be4 commit 3d8647f

1 file changed

Lines changed: 198 additions & 0 deletions

File tree

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2020-2021 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import json
12+
import pickle
13+
import random
14+
import pandas as pd
15+
import networkx as nx
16+
import numpy as np
17+
18+
from logging import Logger
19+
from typing import Any, Dict, List, Optional, Set, Union
20+
from wfcommons.common.workflow import Workflow
21+
from wfcommons.wfchef.duplicate import duplicate
22+
from wfcommons.wfgen.abstract_recipe import WorkflowRecipe
23+
24+
from enum import Enum
25+
import pathlib
26+
27+
28+
class BaseMethod(Enum):
29+
ERROR_TABLE = 0
30+
SMALLEST = 1
31+
BIGGEST = 2
32+
RANDOM = 3
33+
34+
this_dir = pathlib.Path(__file__).resolve().parent
35+
36+
37+
class WfChefWorkflowRecipe(WorkflowRecipe):
38+
"""An abstract class of workflow recipes for creating synthetic workflow instances.
39+
40+
:param name: The workflow recipe name.
41+
:type name: str
42+
:param data_footprint: The upper bound for the workflow total data footprint (in bytes).
43+
:type data_footprint: int
44+
:param num_tasks: The upper bound for the total number of tasks in the workflow.
45+
:type num_tasks: int
46+
:param runtime_factor: The factor of which tasks runtime will be increased/decreased.
47+
:type runtime_factor: float
48+
:param input_file_size_factor: The factor of which tasks input files size will be increased/decreased.
49+
:type input_file_size_factor: float
50+
:param output_file_size_factor: The factor of which tasks output files size will be increased/decreased.
51+
:type output_file_size_factor: float
52+
:param logger: The logger where to log information/warning or errors (optional).
53+
:type logger: Logger
54+
"""
55+
56+
def __init__(self, name: str,
57+
data_footprint: Optional[int],
58+
num_tasks: Optional[int],
59+
exclude_graphs: Set[str] = set(),
60+
runtime_factor: Optional[float] = 1.0,
61+
input_file_size_factor: Optional[float] = 1.0,
62+
output_file_size_factor: Optional[float] = 1.0,
63+
logger: Optional[Logger] = None,
64+
this_dir: Union[str, pathlib.Path] = None,
65+
base_method: Optional[Enum] = BaseMethod.ERROR_TABLE) -> None:
66+
"""Create an object of the workflow recipe."""
67+
super().__init__(
68+
name=name,
69+
data_footprint=data_footprint,
70+
num_tasks=num_tasks,
71+
runtime_factor=runtime_factor,
72+
input_file_size_factor=input_file_size_factor,
73+
output_file_size_factor=output_file_size_factor,
74+
logger=logger
75+
)
76+
77+
self.exclude_graphs = exclude_graphs
78+
self.workflows: List[Workflow] = []
79+
self.this_dir = pathlib.Path(this_dir).resolve(strict=True)
80+
self.base_method = base_method
81+
82+
def _workflow_recipe(self) -> Dict[str, Any]:
83+
"""Recipe for generating synthetic instances for a workflow. Recipes can be
84+
generated by using the :class:`~wfcommons.wfinstances.instance_analyzer.InstanceAnalyzer`.
85+
86+
:return: A recipe in the form of a dictionary in which keys are task prefixes.
87+
:rtype: Dict[str, Any]
88+
"""
89+
return json.loads(self.this_dir.joinpath("task_type_stats.json").read_text())
90+
91+
@classmethod
92+
def from_num_tasks(cls,
93+
num_tasks: int,
94+
exclude_graphs: Set[str] = set(),
95+
runtime_factor: Optional[float] = 1.0,
96+
input_file_size_factor: Optional[float] = 1.0,
97+
output_file_size_factor: Optional[float] = 1.0
98+
) -> 'WfChefWorkflowRecipe':
99+
"""
100+
Instantiate a workflow recipe that will generate synthetic workflows up to the
101+
total number of tasks provided.
102+
103+
:param num_tasks: The upper bound for the total number of tasks in the workflow.
104+
:type num_tasks: int
105+
:param exclude_graphs:
106+
:type exclude_graphs: Set
107+
:param runtime_factor: The factor of which tasks runtime will be increased/decreased.
108+
:type runtime_factor: float
109+
:param input_file_size_factor: The factor of which tasks input files size will be increased/decreased.
110+
:type input_file_size_factor: float
111+
:param output_file_size_factor: The factor of which tasks output files size will be increased/decreased.
112+
:type output_file_size_factor: float
113+
114+
:return: A workflow recipe object that will generate synthetic workflows up to
115+
the total number of tasks provided.
116+
:rtype: WfChefWorkflowRecipe
117+
118+
"""
119+
return cls(num_tasks=num_tasks, exclude_graphs=exclude_graphs, runtime_factor=runtime_factor,
120+
input_file_size_factor=input_file_size_factor,
121+
output_file_size_factor=output_file_size_factor)
122+
123+
124+
def generate_nx_graph(self) -> nx.DiGraph:
125+
summary_path = self.this_dir.joinpath("microstructures", "summary.json")
126+
summary = json.loads(summary_path.read_text())
127+
128+
metric_path = self.this_dir.joinpath("microstructures", "metric", "err.csv")
129+
df = pd.read_csv(str(metric_path), index_col=0)
130+
df = df.drop(self.exclude_graphs, axis=0, errors="ignore")
131+
df = df.drop(self.exclude_graphs, axis=1, errors="ignore")
132+
for col in df.columns:
133+
df.loc[col, col] = np.nan
134+
135+
reference_orders = [summary["base_graphs"][col]["order"] for col in df.columns]
136+
idx = np.argmin([abs(self.num_tasks - ref_num_tasks) for ref_num_tasks in reference_orders])
137+
reference = df.columns[idx]
138+
139+
if self.base_method == BaseMethod.ERROR_TABLE:
140+
base = df.index[df[reference].argmin()]
141+
elif self.base_method == BaseMethod.SMALLEST:
142+
base = min(
143+
[k for k in summary["base_graphs"].keys() if summary["base_graphs"][k] not in self.exclude_graphs],
144+
key=lambda k: summary["base_graphs"][k]["order"]
145+
)
146+
elif self.base_method == BaseMethod.BIGGEST:
147+
base = max(
148+
[k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and
149+
summary["base_graphs"][k] not in self.exclude_graphs],
150+
key=lambda k: summary["base_graphs"][k]["order"]
151+
)
152+
else:
153+
base = random.choice(
154+
[k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and
155+
summary["base_graphs"][k] not in self.exclude_graphs]
156+
)
157+
158+
graph = duplicate(self.this_dir.joinpath("microstructures"), base, self.num_tasks)
159+
return graph
160+
161+
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
162+
"""Generate a synthetic workflow instance.
163+
164+
:param workflow_name: The workflow name
165+
:type workflow_name: int
166+
167+
:return: A synthetic workflow instance object.
168+
:rtype: Workflow
169+
"""
170+
workflow = Workflow(name=self.name + "-synthetic-instance" if not workflow_name else workflow_name, makespan=None)
171+
graph = self.generate_nx_graph()
172+
173+
task_names = {}
174+
for node in graph.nodes:
175+
if node in ["SRC", "DST"]:
176+
continue
177+
node_type = graph.nodes[node]["type"]
178+
task_name = self._generate_task_name(node_type)
179+
task = self._generate_task(node_type, task_name)
180+
workflow.add_node(task_name, task=task)
181+
182+
task_names[node] = task_name
183+
184+
for (src, dst) in graph.edges:
185+
if src in ["SRC", "DST"] or dst in ["SRC", "DST"]:
186+
continue
187+
workflow.add_edge(task_names[src], task_names[dst])
188+
189+
workflow.nxgraph = graph
190+
self.workflows.append(workflow)
191+
return workflow
192+
193+
def _load_base_graph(self) -> nx.DiGraph:
194+
return pickle.loads(self.this_dir.joinpath("base_graph.pickle").read_bytes())
195+
196+
def _load_microstructures(self) -> Dict:
197+
return json.loads(self.this_dir.joinpath("microstructures.json").read_text())
198+

0 commit comments

Comments
 (0)