Skip to content

Commit 3895e9c

Browse files
Merge branch 'wfperf' of github.com:wfcommons/wfcommons into wfperf
2 parents a1e5b5c + 09acb70 commit 3895e9c

12 files changed

Lines changed: 137 additions & 55 deletions

File tree

README.md

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,11 @@ When citing WfCommons, please use the following paper. You should also actually
7575
that paper, as it provides a recent and general overview on the framework.
7676

7777
```
78-
@inproceedings{ferreiradasilva2020works,
79-
title = {WorkflowHub: Community Framework for Enabling Scientific Workflow Research and Development},
80-
author = {Ferreira da Silva, Rafael and Pottier, Lo\"ic and Coleman, Tain\~a and Deelman, Ewa and Casanova, Henri},
81-
booktitle = {2020 IEEE/ACM Workflows in Support of Large-Scale Science (WORKS)},
82-
year = {2020},
83-
pages = {49--56},
84-
doi = {10.1109/WORKS51914.2020.00012}
78+
@article{coleman2021wfcommons,
79+
title={WfCommons: A Framework for Enabling Scientific Workflow Research and Development},
80+
author={Coleman, Tain{\~a} and Casanova, Henri and Pottier, Lo\"ic and Kaushik, Manav and Deelman, Ewa and Ferreira da Silva, Rafael},
81+
journal={arXiv preprint arXiv:2105.14352},
82+
year={2021}
8583
}
8684
```
8785

docs/source/generating_workflows_recipe.rst

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,8 @@ package for an application in WfCommons: ::
7979

8080
The snippet below shows an example of how to import the recipes: ::
8181

82-
# creating an Epigenomics workflow recipe
8382
from wfcommons.wfchef.recipes import EpigenomicsRecipe
8483

85-
8684
To check which recipes are installed in a system and how to import them use: ::
8785
8886
$ wfchef ls

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
license='GPLv3',
2323
author='WfCommons team',
2424
author_email='support@wfcommons.org',
25-
description='Community Framework for Enabling Scientific Workflow Research and Education',
25+
description='A Framework for Enabling Scientific Workflow Research and Education',
2626
long_description=long_description,
2727
long_description_content_type='text/markdown',
2828
url='https://github.com/wfcommons/wfcommons',

wfcommons/common/task.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,15 @@ class Task:
3535
:param runtime: Task runtime in seconds.
3636
:type runtime: float
3737
:param cores: Number of cores required by the task.
38-
:type cores: int
38+
:type cores: float
39+
:param task_id: Job unique ID (e.g., ID0000001).
40+
:type task_id: str
41+
:param category: Job category (can be used, for example, to define jobs that use the same program).
42+
:type category: str
3943
:param machine: Machine on which is the task has been executed.
4044
:type machine: Machine
45+
:param program: Program name.
46+
:type program: str
4147
:param args: List of task arguments.
4248
:type args: List[str]
4349
:param avg_cpu: Average CPU utilization in %.
@@ -64,8 +70,11 @@ def __init__(self,
6470
name: str,
6571
task_type: TaskType,
6672
runtime: float,
67-
cores: int,
73+
cores: float = 1.0,
74+
task_id: Optional[str] = None,
75+
category: Optional[str] = None,
6876
machine: Optional[Machine] = None,
77+
program: Optional[str] = None,
6978
args: List[str] = [],
7079
avg_cpu: Optional[float] = None,
7180
bytes_read: Optional[int] = None,
@@ -82,7 +91,10 @@ def __init__(self,
8291
self.name: str = name
8392
self.type: TaskType = task_type
8493
self.runtime: float = runtime
85-
self.cores: Optional[int] = cores
94+
self.cores: Optional[float] = cores
95+
self.task_id: Optional[str] = task_id
96+
self.category: Optional[str] = category
97+
self.program: Optional[str] = program
8698
self.args: List[str] = args
8799
self.avg_cpu: Optional[float] = avg_cpu
88100
self.bytes_read: Optional[int] = bytes_read
@@ -112,12 +124,17 @@ def as_dict(self) -> Dict:
112124
'name': self.name,
113125
'type': self.type.value,
114126
'runtime': self.runtime,
127+
'command': {},
115128
'parents': [],
116129
'children': [],
117130
'files': task_files,
118131
}
119132
if self.cores:
120133
task_obj['cores'] = self.cores
134+
if self.task_id:
135+
task_obj['id'] = self.task_id
136+
if self.category:
137+
task_obj['category'] = self.category
121138
if self.avg_cpu:
122139
task_obj['avgCPU'] = self.avg_cpu
123140
if self.bytes_read:
@@ -132,8 +149,10 @@ def as_dict(self) -> Dict:
132149
task_obj['avgPower'] = self.avg_power
133150
if self.priority:
134151
task_obj['priority'] = self.priority
152+
if self.program:
153+
task_obj['command']['program'] = self.program
135154
if self.args:
136-
task_obj['arguments'] = self.args
155+
task_obj['command']['arguments'] = self.args
137156
if self.machine:
138157
task_obj['machine'] = self.machine.name
139158

wfcommons/common/workflow.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
from ..common.task import Task
1818
from ..version import __version__
1919

20-
from ..wfchef.utils import annotate, create_graph
20+
from ..wfchef.utils import create_graph
2121
import tempfile
2222

23+
2324
class Workflow(nx.DiGraph):
2425
"""
2526
Representation of a workflow. The workflow representation is an extension of the
@@ -53,7 +54,7 @@ def __init__(self,
5354
"""Create an object of a workflow representation."""
5455
self.description = description if description else 'Instance generated with WfCommons - https://wfcommons.org'
5556
self.created_at = str(datetime.utcnow().isoformat())
56-
self.schema_version = '1.0'
57+
self.schema_version = '1.2'
5758
self.wms_name = 'WfCommons' if not wms_name else wms_name
5859
self.wms_version = str(__version__) if not wms_version else wms_version
5960
self.wms_url = 'https://docs.wfcommons.org/en/v{}/'.format(__version__) if not wms_url else wms_url
@@ -137,4 +138,4 @@ def write_dot(self, dot_filename: str = None) -> None:
137138
def to_nx_digraph(self) -> nx.DiGraph:
138139
with tempfile.NamedTemporaryFile() as temp:
139140
self.write_json(temp.name)
140-
return create_graph(temp.name)
141+
return create_graph(temp.name)

wfcommons/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

11-
__version__ = "0.6-dev"
11+
__version__ = "0.7-dev"

wfcommons/wfchef/skeletons/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,3 @@
77
# it under the terms of the GNU General Public License as published by
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
10-
11-

wfcommons/wfgen/abstract_recipe.py

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,13 @@ def __init__(self, name: str,
7575
self.input_file_size_factor = input_file_size_factor
7676
self.output_file_size_factor = output_file_size_factor
7777
self.workflows: List[Workflow] = []
78+
self.tasks_map = {}
7879
self.tasks_files: Dict[str, List[File]] = {}
80+
self.tasks_files_names: Dict[str, List[str]] = {}
7981
self.task_id_counter = 1
8082
self.this_dir = this_dir
83+
self.tasks_children = {}
84+
self.tasks_parents = {}
8185

8286
@abstractmethod
8387
def _workflow_recipe(self) -> Dict[str, Any]:
@@ -142,7 +146,8 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
142146
:return: A synthetic workflow instance object.
143147
:rtype: Workflow
144148
"""
145-
workflow = Workflow(name=self.name + "-synthetic-instance" if not workflow_name else workflow_name, makespan=None)
149+
workflow = Workflow(name=self.name + "-synthetic-instance" if not workflow_name else workflow_name,
150+
makespan=0)
146151
graph = self.generate_nx_graph()
147152

148153
task_names = {}
@@ -156,11 +161,30 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
156161

157162
task_names[node] = task_name
158163

164+
# tasks dependencies
159165
for (src, dst) in graph.edges:
160166
if src in ["SRC", "DST"] or dst in ["SRC", "DST"]:
161167
continue
162168
workflow.add_edge(task_names[src], task_names[dst])
163169

170+
if task_names[src] not in self.tasks_children:
171+
self.tasks_children[task_names[src]] = []
172+
if task_names[dst] not in self.tasks_parents:
173+
self.tasks_parents[task_names[dst]] = []
174+
175+
self.tasks_children[task_names[src]].append(task_names[dst])
176+
self.tasks_parents[task_names[dst]].append(task_names[src])
177+
178+
# find leaf tasks
179+
leaf_tasks = []
180+
for node_name in workflow.nodes:
181+
task: Task = workflow.nodes[node_name]['task']
182+
if task.name not in self.tasks_children:
183+
leaf_tasks.append(task)
184+
185+
for task in leaf_tasks:
186+
self._generate_task_files(task)
187+
164188
workflow.nxgraph = graph
165189
self.workflows.append(workflow)
166190
return workflow
@@ -171,21 +195,13 @@ def _load_base_graph(self) -> nx.DiGraph:
171195
def _load_microstructures(self) -> Dict:
172196
return json.loads(self.this_dir.joinpath("microstructures.json").read_text())
173197

174-
def _generate_task(self, task_name: str,
175-
task_id: str,
176-
input_files: Optional[List[File]] = None,
177-
files_recipe: Optional[Dict[FileLink, Dict[str, int]]] = None
178-
) -> Task:
198+
def _generate_task(self, task_name: str, task_id: str) -> Task:
179199
"""Generate a synthetic task.
180200
181201
:param task_name: task name.
182202
:type task_name: str
183203
:param task_id: task ID.
184204
:type task_id: str
185-
:param input_files: List of input files to be included.
186-
:type input_files: List[File]
187-
:param files_recipe: Recipe for generating task files.
188-
:type files_recipe: Dict[FileLink, Dict[str, int]]
189205
190206
:return: A task object.
191207
:rtype: task
@@ -197,22 +213,17 @@ def _generate_task(self, task_name: str,
197213
task_recipe['runtime']['min'],
198214
task_recipe['runtime']['max']), '.3f'))
199215

200-
# linking previous generated output files as input files
216+
# # linking previous generated output files as input files
201217
self.tasks_files[task_id] = []
202-
if input_files:
203-
for f in input_files:
204-
if f.link == FileLink.OUTPUT:
205-
self.tasks_files[task_id].append(File(name=f.name, size=f.size, link=FileLink.INPUT))
206-
207-
# generate additional in/output files
208-
self._generate_files(task_id, task_recipe['input'], FileLink.INPUT, files_recipe)
209-
self._generate_files(task_id, task_recipe['output'], FileLink.OUTPUT, files_recipe)
210-
211-
return Task(
218+
self.tasks_files_names[task_id] = []
219+
task = Task(
212220
name=task_id,
221+
task_id='0{}'.format(task_id.split('_0')[1]),
222+
category=task_name,
213223
task_type=TaskType.COMPUTE,
214224
runtime=runtime,
215225
machine=None,
226+
program=task_name,
216227
args=[],
217228
cores=1,
218229
avg_cpu=None,
@@ -222,8 +233,11 @@ def _generate_task(self, task_name: str,
222233
energy=None,
223234
avg_power=None,
224235
priority=None,
225-
files=self.tasks_files[task_id]
236+
files=[]
226237
)
238+
self.tasks_map[task_id] = task
239+
240+
return task
227241

228242
def _generate_task_name(self, prefix: str) -> str:
229243
"""Generate a task name from a prefix appended with an ID.
@@ -238,8 +252,40 @@ def _generate_task_name(self, prefix: str) -> str:
238252
self.task_id_counter += 1
239253
return task_name
240254

241-
def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink,
242-
files_recipe: Optional[Dict[FileLink, Dict[str, int]]] = None) -> None:
255+
def _generate_task_files(self, task: Task) -> List[File]:
256+
"""Generate input and output files for a task.
257+
258+
:param task: task object.
259+
:type task: Task
260+
261+
:return: List of files output files.
262+
:rtype: List[File]
263+
"""
264+
task_recipe = self._workflow_recipe()[task.category]
265+
266+
# generate output files
267+
output_files_list = self._generate_files(task.name, task_recipe['output'], FileLink.OUTPUT)
268+
task.files = self.tasks_files[task.name]
269+
270+
# obtain input files from parents
271+
input_files = []
272+
if task.name in self.tasks_parents.keys():
273+
for parent_task_name in self.tasks_parents[task.name]:
274+
input_files.extend(self._generate_task_files(self.tasks_map[parent_task_name]))
275+
276+
for input_file in input_files:
277+
if input_file.name not in self.tasks_files_names[task.name]:
278+
self.tasks_files[task.name].append(File(name=input_file.name,
279+
link=FileLink.INPUT,
280+
size=input_file.size))
281+
self.tasks_files_names[task.name].append(input_file.name)
282+
283+
# generate additional input files
284+
self._generate_files(task.name, task_recipe['input'], FileLink.INPUT)
285+
286+
return output_files_list
287+
288+
def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink) -> List[File]:
243289
"""Generate files for a specific task ID.
244290
245291
:param task_id: task ID.
@@ -248,21 +294,27 @@ def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink,
248294
:type recipe: Dict[str, Any]
249295
:param link: Type of file link.
250296
:type link: FileLink
251-
:param files_recipe: Recipe for generating task files.
252-
:type files_recipe: Dict[FileLink, Dict[str, int]]
297+
298+
:return: List of files.
299+
:rtype: List[File]
253300
"""
301+
files_list = []
254302
extension_list: List[str] = []
255303
for f in self.tasks_files[task_id]:
256304
if f.link == link:
305+
files_list.append(f)
257306
extension_list.append(path.splitext(f.name)[1] if '.' in f.name else f.name)
258307

259308
for extension in recipe:
260309
if extension not in extension_list:
261310
num_files = 1
262-
if files_recipe and link in files_recipe and extension in files_recipe[link]:
263-
num_files = files_recipe[link][extension]
264311
for _ in range(0, num_files):
265-
self.tasks_files[task_id].append(self._generate_file(extension, recipe, link))
312+
file = self._generate_file(extension, recipe, link)
313+
files_list.append(file)
314+
self.tasks_files[task_id].append(file)
315+
self.tasks_files_names[task_id].append(file.name)
316+
317+
return files_list
266318

267319
def _generate_file(self, extension: str, recipe: Dict[str, Any], link: FileLink) -> File:
268320
"""Generate a file according to a file recipe.

wfcommons/wfinstances/instance.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,20 @@ def __init__(self, input_instance: str, schema_file: Optional[str] = None, logge
117117
machine = task.get('machine', None)
118118
machine = None if machine is None else self.machines[machine]
119119

120+
# Fetch the command associated to this task
121+
command = task.get('command', None)
122+
120123
self.workflow.add_node(
121124
task['name'],
122125
task=Task(
123126
name=task['name'],
127+
task_id=task.get('id', None),
128+
category=task.get('category', None),
124129
task_type=TaskType(task['type']),
125130
runtime=task['runtime'],
126131
machine=machine,
127-
args=task.get('arguments', None),
132+
program=command.get('program', None) if command else None,
133+
args=command.get('arguments', None) if command else None,
128134
cores=task.get('cores', None),
129135
avg_cpu=task.get('avgCPU', None),
130136
bytes_read=task.get('bytesRead', None),

0 commit comments

Comments
 (0)