Skip to content

Commit 93c21d0

Browse files
authored
Merge pull request #20 from wfcommons/feature/wfchef
Feature/wfchef
2 parents 7e8b103 + d97883d commit 93c21d0

18 files changed

Lines changed: 1028 additions & 598 deletions

File tree

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@
6060
python_requires='>=3.6',
6161
entry_points={
6262
'console_scripts': [
63-
'wfchef=wfcommons.wfchef.chef:main',
63+
'wfchef=wfcommons.wfchef.chef:main'
64+
6465
],
6566
'workflow_recipes': [
6667
'epigenomics_recipe = wfcommons.wfchef.recipes:EpigenomicsRecipe',

wfcommons-schema.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"$schema": "http://json-schema.org/schema#", "type": "object", "title": "WfFormat", "properties": {"name": {"description": "Workflow instance name", "type": "string", "minLength": 1}, "description": {"description": "General description of the instance", "type": "string", "minLength": 1}, "createdAt": {"description": "Instance creation timestamp", "type": "string", "minLength": 1, "format": "date-time"}, "schemaVersion": {"description": "Version of the schema used", "type": "string", "enum": ["1.0", "1.1"]}, "wms": {"type": "object", "properties": {"name": {"description": "WMS name", "type": "string", "minLength": 1}, "version": {"description": "WMS version", "type": "string", "minLength": 1}, "url": {"description": "WMS url", "type": "string", "minLength": 1, "format": "uri"}}, "required": ["name", "version"]}, "author": {"type": "object", "properties": {"name": {"description": "Author name", "type": "string", "minLength": 1}, "email": {"description": "Author email", "type": "string", "minLength": 1, "format": "email"}, "institution": {"description": "Author institution", "type": "string", "minLength": 1}, "country": {"description": "Author country", "type": "string", "minLength": 1}}, "required": ["name", "email"]}, "workflow": {"type": "object", "properties": {"makespan": {"description": "Workflow makespan in seconds", "type": "number"}, "executedAt": {"description": "Workflow start execution timestamp", "type": "string", "minLength": 1}, "machines": {"description": "Sets of machines used for workflow jobs", "type": "array", "minItems": 1, "items": {"description": "Description of the machine (node) that ran the job", "type": "object", "properties": {"system": {"description": "Machine system", "type": "string", "enum": ["linux", "macos", "windows"]}, "architecture": {"description": "Machine architecture", "type": "string", "minLength": 1}, "nodeName": {"description": "Machine node name", "type": "string", "format": "hostname", "minLength": 1}, "release": {"description": "Machine release", "type": "string", "minLength": 1}, "memory": {"description": "Total machine's RAM memory in KB", "type": "integer", "minimum": 1}, "cpu": {"description": "Machine's CPU information", "type": "object", "properties": {"count": {"description": "Number of CPU cores", "type": "integer", "minimum": 1}, "speed": {"description": "CPU speed in MHz", "type": "integer", "minimum": 1}, "vendor": {"description": "CPU vendor", "type": "string", "minLength": 1}}}}, "required": ["nodeName"]}}, "jobs": {"description": "Sets of workflow jobs", "type": "array", "minItems": 1, "items": {"type": "object", "properties": {"name": {"description": "Job name", "type": "string", "minLength": 1}, "type": {"description": "Job type (whether it is a compute or an auxiliary job)", "type": "string", "enum": ["compute", "transfer", "auxiliary"]}, "arguments": {"description": "Sets of job arguments", "type": "array", "items": {"description": "A job argument", "type": "string", "minLength": 1}}, "parents": {"description": "Sets of parent jobs", "type": "array", "items": {"type": "string", "description": "Id of the parent job", "minLength": 1, "pattern": "^[0-9a-zA-Z-_]*$"}}, "files": {"description": "Sets of input/output data", "type": "array", "items": {"type": "object", "properties": {"name": {"description": "A human-readable name for the file", "type": "string", "minLength": 1}, "size": {"description": "File size in KB", "type": "integer", "minimum": 0}, "link": {"description": "Whether it is an input or output data", "type": "string", "enum": ["input", "output"]}}, "required": ["name", "size", "link"]}}, "runtime": {"description": "Job runtime in seconds", "type": "number"}, "cores": {"description": "Number of cores required by the job", "type": "integer", "minimum": 1}, "avgCPU": {"description": "Average CPU utilization in %", "type": "number"}, "bytesRead": {"description": "Total bytes read in KB", "type": "number"}, "bytesWritten": {"description": "Total bytes written in KB", "type": "number"}, "memory": {"description": "Memory (resident set) size of the process in KB", "type": "number"}, "energy": {"description": "Total energy consumption in kWh", "type": "number"}, "avgPower": {"description": "Average power consumption in W", "type": "number"}, "priority": {"description": "Job priority", "type": "number"}, "machine": {"description": "Machine name used", "type": "string", "minLength": 1}}, "required": ["name", "type", "runtime"]}}}, "required": ["makespan", "executedAt", "jobs"]}}, "required": ["name", "schemaVersion", "workflow"]}

wfcommons/wfchef/chef.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

11+
import pathlib
1112
import argparse
13+
from email.mime import base
1214
import json
13-
import math
15+
import traceback
16+
from typing import Dict, Optional, Union
17+
import argparse
1418
import networkx as nx
1519
import numpy as np
1620
import pandas as pd
@@ -19,16 +23,15 @@
1923
import pkg_resources
2024
import subprocess
2125
import traceback
22-
2326
from typing import Dict, Optional, Union
2427
from stringcase import capitalcase
25-
2628
from .duplicate import duplicate, NoMicrostructuresError
2729
from .find_microstructures import save_microstructures
2830
from .utils import create_graph
2931
from ..wfgen.abstract_recipe import WorkflowRecipe
3032
from ..wfinstances.instance import Instance
3133
from ..wfinstances.instance_analyzer import InstanceAnalyzer
34+
import math
3235

3336
this_dir = pathlib.Path(__file__).resolve().parent
3437
skeleton_path = this_dir.joinpath("skeletons")
@@ -170,12 +173,13 @@ def analyzer_summary(path_to_instances: pathlib.Path) -> Dict:
170173

171174
return stats_dict
172175

176+
def get_recipe(recipe: str) -> "Module":
177+
for entry_point in pkg_resources.iter_entry_points('workflow_recipes'):
178+
att = entry_point.attrs[0]
179+
if att == recipe:
180+
return entry_point.load()
173181

174-
def ls_recipe():
175-
"""
176-
Inspired by UNIX `ls` command, it lists the recipes already installed into the system and
177-
how to import it to use.
178-
"""
182+
def get_recipes() -> pd.DataFrame:
179183
rows = []
180184
for entry_point in pkg_resources.iter_entry_points('workflow_recipes'):
181185
try:
@@ -185,8 +189,14 @@ def ls_recipe():
185189
except Exception as e:
186190
traceback.print_exc()
187191
print(f"Could not load {entry_point.module_name}")
188-
df = pd.DataFrame(rows, columns=["name", "module", "import command"])
189-
print(df.to_string(index=None))
192+
return pd.DataFrame(rows, columns=["name", "module", "import command"])
193+
194+
def ls_recipe():
195+
"""
196+
Inspired by UNIX `ls` command, it lists the recipes already installed into the system and
197+
how to import it to use.
198+
"""
199+
print(get_recipes())
190200

191201

192202
def uninstall_recipe(module_name: str):
@@ -209,7 +219,7 @@ def create_recipe(path_to_instances: Union[str, pathlib.Path],
209219
wf_name: str,
210220
cutoff: int = 4000,
211221
verbose: bool = False,
212-
runs: int = 1) -> WorkflowRecipe:
222+
runs: int = 1) -> "WorkflowRecipe":
213223
"""
214224
Creates a recipe for a workflow application by automatically replacing custom information
215225
from the recipe skeleton.
@@ -218,7 +228,7 @@ def create_recipe(path_to_instances: Union[str, pathlib.Path],
218228
:type path_to_instances: str or pathlib.Path
219229
:param savedir: path to save the recipe.
220230
:type savedir: pathlib.Path
221-
:param wf_name: name of the workflow apllication.
231+
:param wf_name: name of the workflow application.
222232
:type wf_name: str
223233
:param cutoff: when set, only consider instances of smaller or equal sizes.
224234
:type cutoff: int
@@ -242,8 +252,8 @@ def create_recipe(path_to_instances: Union[str, pathlib.Path],
242252
err_savepath = microstructures_path.joinpath("metric", "err.csv")
243253
err_savepath.parent.mkdir(exist_ok=True, parents=True)
244254
df = find_err(microstructures_path, runs=runs)
245-
246255
err_savepath.write_text(df.to_csv())
256+
247257
# Recipe
248258
with skeleton_path.joinpath("recipe.py").open() as fp:
249259
skeleton_str = fp.read()

wfcommons/wfchef/duplicate.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,10 @@
1212
import json
1313
import pickle
1414
import networkx as nx
15-
from typing import Set, Optional, List, Union, Dict
15+
from typing import Set, List, Union, Dict
1616
from uuid import uuid4
17-
1817
import numpy as np
19-
from .utils import draw
2018
import random
21-
import argparse
22-
import pandas as pd
23-
from functools import partial
2419

2520
this_dir = pathlib.Path(__file__).resolve().parent
2621

@@ -73,23 +68,24 @@ def duplicate(path: pathlib.Path,
7368
:param path: path to the summary JSON file.
7469
:type path: pathlib.Path.
7570
:param base: name (for samples available in WfCommons) or path to the specific
76-
graph to be used as base (if not set WfChef chooses the best fitting one).
71+
graph to be used as base (if not set WfChef chooses the best fitting one).
7772
:type base: str or pathlib.Path.
7873
:param num_nodes: total amount of nodes desired in the synthetic instance.
7974
:type num_nodes: int.
8075
81-
8276
:return: graph with the desired number of tasks.
8377
:rtype: networkX DiGraph.
8478
"""
8579
summary = json.loads(path.joinpath("summary.json").read_text())
80+
8681
if base:
8782
base_path = pathlib.Path(base)
8883
if not base_path.is_absolute():
8984
base_path = path.joinpath(base_path)
9085
else:
9186
base_path = path.joinpath(min(summary["base_graphs"].keys(), key=lambda k: summary["base_graphs"][k]["order"]))
92-
87+
88+
9389
graph = pickle.loads(base_path.joinpath("base_graph.pickle").read_bytes())
9490
if num_nodes < graph.order():
9591
raise ValueError(

wfcommons/wfchef/recipes/blast/recipe.py

Lines changed: 18 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,16 @@
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

11-
import json
1211
import pathlib
13-
1412
from logging import Logger
15-
from typing import Dict, Optional, Set
16-
17-
from wfcommons.wfgen.abstract_recipe import WorkflowRecipe
13+
from typing import Optional, Set
14+
import pathlib
15+
from wfcommons.wfchef.wfchef_abstract_recipe import BaseMethod, WfChefWorkflowRecipe
1816

1917
this_dir = pathlib.Path(__file__).resolve().parent
2018

2119

22-
class BlastRecipe(WorkflowRecipe):
20+
class BlastRecipe(WfChefWorkflowRecipe):
2321
"""A Blast workflow recipe class for creating synthetic workflow instances.
2422
2523
:param data_footprint: The upper bound for the workflow total data footprint (in bytes).
@@ -46,47 +44,18 @@ def __init__(self,
4644
input_file_size_factor: Optional[float] = 1.0,
4745
output_file_size_factor: Optional[float] = 1.0,
4846
logger: Optional[Logger] = None,
47+
base_method: BaseMethod = BaseMethod.ERROR_TABLE,
4948
**kwargs) -> None:
50-
super().__init__("Blast", data_footprint, num_tasks, exclude_graphs, runtime_factor, input_file_size_factor,
51-
output_file_size_factor, logger, this_dir)
52-
53-
@classmethod
54-
def from_num_tasks(cls,
55-
num_tasks: int,
56-
exclude_graphs: Set[str] = set(),
57-
runtime_factor: Optional[float] = 1.0,
58-
input_file_size_factor: Optional[float] = 1.0,
59-
output_file_size_factor: Optional[float] = 1.0
60-
) -> 'BlastRecipe':
61-
"""
62-
Instantiate a Blast workflow recipe that will generate synthetic workflows up to
63-
the total number of tasks provided.
64-
65-
:param num_tasks: The upper bound for the total number of tasks in the workflow (at least 3).
66-
:type num_tasks: int
67-
:param exclude_graphs:
68-
:type exclude_graphs: Set
69-
:param runtime_factor: The factor of which tasks runtime will be increased/decreased.
70-
:type runtime_factor: float
71-
:param input_file_size_factor: The factor of which tasks input files size will be increased/decreased.
72-
:type input_file_size_factor: float
73-
:param output_file_size_factor: The factor of which tasks output files size will be increased/decreased.
74-
:type output_file_size_factor: float
75-
76-
:return: A Blast workflow recipe object that will generate synthetic workflows up
77-
to the total number of tasks provided.
78-
:rtype: BlastRecipe
79-
"""
80-
return BlastRecipe(num_tasks=num_tasks, exclude_graphs=exclude_graphs, runtime_factor=runtime_factor,
81-
input_file_size_factor=input_file_size_factor,
82-
output_file_size_factor=output_file_size_factor)
83-
84-
def _workflow_recipe(self) -> Dict:
85-
"""
86-
Recipe for generating synthetic instances of the Blast workflow. Recipes can be
87-
generated by using the :class:`~wfcommons.wfinstances.instance_analyzer.InstanceAnalyzer`.
88-
89-
:return: A recipe in the form of a dictionary in which keys are task prefixes.
90-
:rtype: Dict[str, Any]
91-
"""
92-
return json.loads(this_dir.joinpath("task_type_stats.json").read_text())
49+
super().__init__(
50+
name="Blast",
51+
data_footprint=data_footprint,
52+
num_tasks=num_tasks,
53+
exclude_graphs=exclude_graphs,
54+
runtime_factor=runtime_factor,
55+
input_file_size_factor=input_file_size_factor,
56+
output_file_size_factor=output_file_size_factor,
57+
logger=logger,
58+
this_dir=this_dir,
59+
base_method=base_method,
60+
**kwargs
61+
)

wfcommons/wfchef/recipes/bwa/recipe.py

Lines changed: 18 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,16 @@
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

11-
import json
1211
import pathlib
13-
1412
from logging import Logger
15-
from typing import Dict, Optional, Set
16-
17-
from wfcommons.wfgen.abstract_recipe import WorkflowRecipe
13+
from typing import Optional, Set
14+
import pathlib
15+
from wfcommons.wfchef.wfchef_abstract_recipe import BaseMethod, WfChefWorkflowRecipe
1816

1917
this_dir = pathlib.Path(__file__).resolve().parent
2018

2119

22-
class BwaRecipe(WorkflowRecipe):
20+
class BwaRecipe(WfChefWorkflowRecipe):
2321
"""A Bwa workflow recipe class for creating synthetic workflow instances.
2422
2523
:param data_footprint: The upper bound for the workflow total data footprint (in bytes).
@@ -46,47 +44,18 @@ def __init__(self,
4644
input_file_size_factor: Optional[float] = 1.0,
4745
output_file_size_factor: Optional[float] = 1.0,
4846
logger: Optional[Logger] = None,
47+
base_method: BaseMethod = BaseMethod.ERROR_TABLE,
4948
**kwargs) -> None:
50-
super().__init__("Bwa", data_footprint, num_tasks, exclude_graphs, runtime_factor, input_file_size_factor,
51-
output_file_size_factor, logger, this_dir)
52-
53-
@classmethod
54-
def from_num_tasks(cls,
55-
num_tasks: int,
56-
exclude_graphs: Set[str] = set(),
57-
runtime_factor: Optional[float] = 1.0,
58-
input_file_size_factor: Optional[float] = 1.0,
59-
output_file_size_factor: Optional[float] = 1.0
60-
) -> 'BwaRecipe':
61-
"""
62-
Instantiate a Bwa workflow recipe that will generate synthetic workflows up to
63-
the total number of tasks provided.
64-
65-
:param num_tasks: The upper bound for the total number of tasks in the workflow (at least 3).
66-
:type num_tasks: int
67-
:param exclude_graphs:
68-
:type exclude_graphs: Set
69-
:param runtime_factor: The factor of which tasks runtime will be increased/decreased.
70-
:type runtime_factor: float
71-
:param input_file_size_factor: The factor of which tasks input files size will be increased/decreased.
72-
:type input_file_size_factor: float
73-
:param output_file_size_factor: The factor of which tasks output files size will be increased/decreased.
74-
:type output_file_size_factor: float
75-
76-
:return: A Bwa workflow recipe object that will generate synthetic workflows up
77-
to the total number of tasks provided.
78-
:rtype: BwaRecipe
79-
"""
80-
return BwaRecipe(num_tasks=num_tasks, exclude_graphs=exclude_graphs, runtime_factor=runtime_factor,
81-
input_file_size_factor=input_file_size_factor,
82-
output_file_size_factor=output_file_size_factor)
83-
84-
def _workflow_recipe(self) -> Dict:
85-
"""
86-
Recipe for generating synthetic instances of the Bwa workflow. Recipes can be
87-
generated by using the :class:`~wfcommons.wfinstances.instance_analyzer.InstanceAnalyzer`.
88-
89-
:return: A recipe in the form of a dictionary in which keys are task prefixes.
90-
:rtype: Dict[str, Any]
91-
"""
92-
return json.loads(this_dir.joinpath("task_type_stats.json").read_text())
49+
super().__init__(
50+
name="Bwa",
51+
data_footprint=data_footprint,
52+
num_tasks=num_tasks,
53+
exclude_graphs=exclude_graphs,
54+
runtime_factor=runtime_factor,
55+
input_file_size_factor=input_file_size_factor,
56+
output_file_size_factor=output_file_size_factor,
57+
logger=logger,
58+
this_dir=this_dir,
59+
base_method=base_method,
60+
**kwargs
61+
)

0 commit comments

Comments
 (0)