Skip to content

Commit 7e8b103

Browse files
committed
fixing data dependency issue (closes #17)
1 parent e90e5ca commit 7e8b103

1 file changed

Lines changed: 76 additions & 28 deletions

File tree

wfcommons/wfgen/abstract_recipe.py

Lines changed: 76 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,13 @@ def __init__(self, name: str,
7575
self.input_file_size_factor = input_file_size_factor
7676
self.output_file_size_factor = output_file_size_factor
7777
self.workflows: List[Workflow] = []
78+
self.tasks_map = {}
7879
self.tasks_files: Dict[str, List[File]] = {}
80+
self.tasks_files_names: Dict[str, List[str]] = {}
7981
self.task_id_counter = 1
8082
self.this_dir = this_dir
83+
self.tasks_children = {}
84+
self.tasks_parents = {}
8185

8286
@abstractmethod
8387
def _workflow_recipe(self) -> Dict[str, Any]:
@@ -157,11 +161,30 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
157161

158162
task_names[node] = task_name
159163

164+
# tasks dependencies
160165
for (src, dst) in graph.edges:
161166
if src in ["SRC", "DST"] or dst in ["SRC", "DST"]:
162167
continue
163168
workflow.add_edge(task_names[src], task_names[dst])
164169

170+
if task_names[src] not in self.tasks_children:
171+
self.tasks_children[task_names[src]] = []
172+
if task_names[dst] not in self.tasks_parents:
173+
self.tasks_parents[task_names[dst]] = []
174+
175+
self.tasks_children[task_names[src]].append(task_names[dst])
176+
self.tasks_parents[task_names[dst]].append(task_names[src])
177+
178+
# find leaf tasks
179+
leaf_tasks = []
180+
for node_name in workflow.nodes:
181+
task: Task = workflow.nodes[node_name]['task']
182+
if task.name not in self.tasks_children:
183+
leaf_tasks.append(task)
184+
185+
for task in leaf_tasks:
186+
self._generate_task_files(task)
187+
165188
workflow.nxgraph = graph
166189
self.workflows.append(workflow)
167190
return workflow
@@ -172,21 +195,13 @@ def _load_base_graph(self) -> nx.DiGraph:
172195
def _load_microstructures(self) -> Dict:
173196
return json.loads(self.this_dir.joinpath("microstructures.json").read_text())
174197

175-
def _generate_task(self, task_name: str,
176-
task_id: str,
177-
input_files: Optional[List[File]] = None,
178-
files_recipe: Optional[Dict[FileLink, Dict[str, int]]] = None
179-
) -> Task:
198+
def _generate_task(self, task_name: str, task_id: str) -> Task:
180199
"""Generate a synthetic task.
181200
182201
:param task_name: task name.
183202
:type task_name: str
184203
:param task_id: task ID.
185204
:type task_id: str
186-
:param input_files: List of input files to be included.
187-
:type input_files: List[File]
188-
:param files_recipe: Recipe for generating task files.
189-
:type files_recipe: Dict[FileLink, Dict[str, int]]
190205
191206
:return: A task object.
192207
:rtype: task
@@ -198,18 +213,10 @@ def _generate_task(self, task_name: str,
198213
task_recipe['runtime']['min'],
199214
task_recipe['runtime']['max']), '.3f'))
200215

201-
# linking previous generated output files as input files
216+
# # linking previous generated output files as input files
202217
self.tasks_files[task_id] = []
203-
if input_files:
204-
for f in input_files:
205-
if f.link == FileLink.OUTPUT:
206-
self.tasks_files[task_id].append(File(name=f.name, size=f.size, link=FileLink.INPUT))
207-
208-
# generate additional in/output files
209-
self._generate_files(task_id, task_recipe['input'], FileLink.INPUT, files_recipe)
210-
self._generate_files(task_id, task_recipe['output'], FileLink.OUTPUT, files_recipe)
211-
212-
return Task(
218+
self.tasks_files_names[task_id] = []
219+
task = Task(
213220
name=task_id,
214221
task_id='0{}'.format(task_id.split('_0')[1]),
215222
category=task_name,
@@ -226,8 +233,11 @@ def _generate_task(self, task_name: str,
226233
energy=None,
227234
avg_power=None,
228235
priority=None,
229-
files=self.tasks_files[task_id]
236+
files=[]
230237
)
238+
self.tasks_map[task_id] = task
239+
240+
return task
231241

232242
def _generate_task_name(self, prefix: str) -> str:
233243
"""Generate a task name from a prefix appended with an ID.
@@ -242,8 +252,40 @@ def _generate_task_name(self, prefix: str) -> str:
242252
self.task_id_counter += 1
243253
return task_name
244254

245-
def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink,
246-
files_recipe: Optional[Dict[FileLink, Dict[str, int]]] = None) -> None:
255+
def _generate_task_files(self, task: Task) -> List[File]:
256+
"""Generate input and output files for a task.
257+
258+
:param task: task object.
259+
:type task: Task
260+
261+
:return: List of files output files.
262+
:rtype: List[File]
263+
"""
264+
task_recipe = self._workflow_recipe()[task.category]
265+
266+
# generate output files
267+
output_files_list = self._generate_files(task.name, task_recipe['output'], FileLink.OUTPUT)
268+
task.files = self.tasks_files[task.name]
269+
270+
# obtain input files from parents
271+
input_files = []
272+
if task.name in self.tasks_parents.keys():
273+
for parent_task_name in self.tasks_parents[task.name]:
274+
input_files.extend(self._generate_task_files(self.tasks_map[parent_task_name]))
275+
276+
for input_file in input_files:
277+
if input_file.name not in self.tasks_files_names[task.name]:
278+
self.tasks_files[task.name].append(File(name=input_file.name,
279+
link=FileLink.INPUT,
280+
size=input_file.size))
281+
self.tasks_files_names[task.name].append(input_file.name)
282+
283+
# generate additional input files
284+
self._generate_files(task.name, task_recipe['input'], FileLink.INPUT)
285+
286+
return output_files_list
287+
288+
def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink) -> List[File]:
247289
"""Generate files for a specific task ID.
248290
249291
:param task_id: task ID.
@@ -252,21 +294,27 @@ def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink,
252294
:type recipe: Dict[str, Any]
253295
:param link: Type of file link.
254296
:type link: FileLink
255-
:param files_recipe: Recipe for generating task files.
256-
:type files_recipe: Dict[FileLink, Dict[str, int]]
297+
298+
:return: List of files.
299+
:rtype: List[File]
257300
"""
301+
files_list = []
258302
extension_list: List[str] = []
259303
for f in self.tasks_files[task_id]:
260304
if f.link == link:
305+
files_list.append(f)
261306
extension_list.append(path.splitext(f.name)[1] if '.' in f.name else f.name)
262307

263308
for extension in recipe:
264309
if extension not in extension_list:
265310
num_files = 1
266-
if files_recipe and link in files_recipe and extension in files_recipe[link]:
267-
num_files = files_recipe[link][extension]
268311
for _ in range(0, num_files):
269-
self.tasks_files[task_id].append(self._generate_file(extension, recipe, link))
312+
file = self._generate_file(extension, recipe, link)
313+
files_list.append(file)
314+
self.tasks_files[task_id].append(file)
315+
self.tasks_files_names[task_id].append(file.name)
316+
317+
return files_list
270318

271319
def _generate_file(self, extension: str, recipe: Dict[str, Any], link: FileLink) -> File:
272320
"""Generate a file according to a file recipe.

0 commit comments

Comments
 (0)