Skip to content

Commit 1d9843a

Browse files
committed
improving Dask translator
1 parent 4ad4c86 commit 1d9843a

5 files changed

Lines changed: 87 additions & 29 deletions

File tree

wfcommons/common/workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def __init__(self,
6060
self.wms_name: Optional[str] = "WfCommons" if not wms_name else wms_name
6161
self.wms_version: Optional[str] = str(__version__) if not wms_version else wms_version
6262
self.wms_url: Optional[str] = f"https://docs.wfcommons.org/en/v{__version__}/" if not wms_url else wms_url
63-
self.executed_at: Optional[str] = datetime.now().astimezone().isoformat()) if not executed_at else executed_at
63+
self.executed_at: Optional[str] = datetime.now().astimezone().isoformat() if not executed_at else executed_at
6464
self.makespan: Optional[int] = makespan
6565
self.tasks = {}
6666
self.tasks_parents = {}

wfcommons/wfbench/bench.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def create_benchmark(self,
152152

153153
task.runtime = 0
154154
task.files = []
155-
task.program = f"{this_dir.joinpath('wfbench.py')}"
155+
task.program = "wfbench.py"
156156
task.args = [task.name]
157157
task.args.extend(params)
158158

wfcommons/wfbench/translator/abstract_translator.py

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2021-2022 The WfCommons Team.
4+
# Copyright (c) 2021-2023 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

1111
import logging
12+
import os
1213
import pathlib
14+
import shutil
1315

1416
from abc import ABC, abstractmethod
15-
from typing import Dict, List, Optional, Union
17+
from typing import Optional, Union
1618

17-
from ...common import Task, Workflow
19+
from ...common import FileLink, Task, Workflow
1820
from ...wfinstances.instance import Instance
1921

2022

23+
this_dir = pathlib.Path(__file__).resolve().parent
24+
2125
class Translator(ABC):
2226
"""
2327
An abstract class of WfFormat parser for creating workflow benchmark applications.
@@ -69,13 +73,43 @@ def __init__(self,
6973
self.task_children[task['name']].append(child)
7074

7175
@abstractmethod
72-
def translate(self, output_file_path: pathlib.Path) -> None:
76+
def translate(self, output_folder: pathlib.Path) -> None:
7377
"""
7478
Translate a workflow benchmark description (WfFormat) into an actual workflow application.
7579
76-
:param output_file_path: The path of the output file.
77-
:type output_file_path: pathlib.Path
80+
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
81+
:type output_folder: pathlib.Path
82+
"""
83+
84+
def _copy_binary_files(self, output_folder: pathlib.Path) -> None:
85+
"""
86+
Copy binary files to workflow benchmark's bin folder.
87+
88+
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
89+
:type output_folder: pathlib.Path
90+
"""
91+
bin_folder = output_folder.joinpath("bin")
92+
bin_folder.mkdir()
93+
shutil.copy(this_dir.joinpath("../wfbench.py"), bin_folder)
94+
shutil.copy(shutil.which("cpu-benchmark"), bin_folder)
95+
96+
def _generate_input_files(self, output_folder: pathlib.Path) -> None:
97+
"""
98+
Generate workflow input files into workflow benchmark's data folder.
99+
100+
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
101+
:type output_folder: pathlib.Path
78102
"""
103+
generated_files = []
104+
data_folder = output_folder.joinpath("data")
105+
data_folder.mkdir()
106+
for task_name in self.root_task_names:
107+
task = self.tasks[task_name]
108+
for file in task.files:
109+
if file.name not in generated_files and file.link == FileLink.INPUT:
110+
generated_files.append(file.name)
111+
with open(data_folder.joinpath(file.name), "wb") as fp:
112+
fp.write(os.urandom(int(file.size)))
79113

80114
def _write_output_file(self, contents: str, output_file_path: pathlib.Path) -> None:
81115
"""
@@ -91,7 +125,7 @@ def _write_output_file(self, contents: str, output_file_path: pathlib.Path) -> N
91125
out.write(contents)
92126
self.logger.info(f"Translated content written to '{output_file_path}'")
93127

94-
def _find_children(self, task_name: str) -> List[Task]:
128+
def _find_children(self, task_name: str) -> list[Task]:
95129
"""
96130
Find the children for a specific task.
97131
@@ -108,7 +142,7 @@ def _find_children(self, task_name: str) -> List[Task]:
108142

109143
return children
110144

111-
def _find_parents(self, task_name: str) -> List[Task]:
145+
def _find_parents(self, task_name: str) -> list[Task]:
112146
"""
113147
Find the parents for a specific task.
114148

wfcommons/wfbench/translator/dask.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ def __init__(self,
3838
self.tasks_futures = {}
3939
self.task_id = 0
4040

41-
def translate(self, output_file_name: pathlib.Path) -> None:
41+
def translate(self, output_folder: pathlib.Path) -> None:
4242
"""
43-
Translate a workflow benchmark description (WfFormat) into a Dask workflow application.
43+
Translate a workflow benchmark description (WfFormat) into an actual workflow application.
4444
45-
:param output_file_name: The name of the output file (e.g., workflow.py).
46-
:type output_file_name: pathlib.Path
45+
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
46+
:type output_folder: pathlib.Path
4747
"""
48-
noindent_python_codelines = self._dask_wftasks_codelines("randomizer")
48+
noindent_python_codelines = self._dask_wftasks_codelines("randomizer", output_folder)
4949

5050
for task_name in self.root_task_names:
5151
noindent_python_codelines.extend(self._parse_tasks(task_name))
@@ -61,30 +61,53 @@ def translate(self, output_file_name: pathlib.Path) -> None:
6161
with open(this_dir.joinpath("templates/dask_template.py")) as fp:
6262
run_workflow_code = fp.read()
6363
run_workflow_code = run_workflow_code.replace("# Generated code goes here", wf_codelines)
64-
with open("dask_workflow.py", "w") as fp:
64+
65+
# write benchmark files
66+
output_folder.mkdir(parents=True)
67+
with open(output_folder.joinpath("dask_workflow.py"), "w") as fp:
6568
fp.write(run_workflow_code)
69+
70+
# additional files
71+
self._copy_binary_files(output_folder)
72+
self._generate_input_files(output_folder)
6673

6774
def _dask_wftasks_codelines(self,
6875
randomizer_varname: str,
76+
output_folder: pathlib.Path,
6977
simulate_minimum_execution_time: float = 0.1,
7078
simulate_maximum_execution_time: float = 1.1) -> list[str]:
7179
"""
7280
Build the code definining all tasks in the workflow, i.e. WorkflowTask instances.
7381
7482
:param randomizer_varname: The name of the randomizer.
7583
:type randomizer_varname: str
84+
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
85+
:type output_folder: pathlib.Path
7686
7787
:return: The non-indented Python lines of code used to instantiate the WorkflowTask instances.
7888
:rtype: list[str]
7989
"""
8090
codelines = ["randomizer = random.Random(seed)",
8191
"TASKS = {}"]
8292
for task in self.tasks.values():
83-
input_files = [f.name for f in task.files if f.link == FileLink.INPUT]
84-
output_files = [f.name for f in task.files if f.link == FileLink.OUTPUT]
93+
input_files = [str(output_folder.joinpath(f"data/{f.name}")) for f in task.files if f.link == FileLink.INPUT]
94+
output_files = [str(output_folder.joinpath(f"data/{f.name}")) for f in task.files if f.link == FileLink.OUTPUT]
95+
program = output_folder.joinpath(f'bin/{task.program}')
96+
args = []
97+
print(task.args)
98+
for a in task.args:
99+
if "--out" in a:
100+
a = a.replace("{", "\"{").replace("}", "}\"").replace(".txt'", ".txt\\\\\"").replace("'", "\\\\\"" + str(output_folder.joinpath("data")) + "/").replace(": ", ":")
101+
elif "--" not in a:
102+
a = str(output_folder.joinpath("data", a))
103+
else:
104+
a = a.replace("'", "\"")
105+
args.append(a)
106+
print(args)
107+
print("")
85108
code = [f"WorkflowTask(dag_id = '{task.name}',",
86109
f" name = '{task.name}',",
87-
f" command_arguments = {[task.program] + task.args},",
110+
f" command_arguments = {[str(program)] + args},",
88111
f" inputs = {input_files},",
89112
f" outputs = {output_files},",
90113
" simulate = simulate,",
@@ -94,6 +117,7 @@ def _dask_wftasks_codelines(self,
94117
" )"]
95118
codelines.append(f"TASKS['{task.name}'] = {code[0]}")
96119
codelines.extend([codeline for codeline in code[1:]])
120+
# exit(1)
97121
return codelines
98122

99123
def _parse_tasks(self, task_name: str) -> list[str]:
@@ -115,7 +139,7 @@ def _parse_tasks(self, task_name: str) -> list[str]:
115139
self.parsed_tasks.append(task_name)
116140
self.tasks_futures[task_name] = f"fut_dv_{self.task_id}"
117141
self.task_id += 1
118-
noindent_python_codelines = [f"{self.tasks_futures[task_name]} = client.submit(execute_task, TASKS['{task_name}'], [])"]
142+
noindent_python_codelines = [f"{self.tasks_futures[task_name]} = client.submit(execute_task, TASKS['{task_name}'], {self.task_parents[task_name]})"]
119143

120144
# parse children
121145
for child in self.task_children[task_name]:

wfcommons/wfbench/wfbench.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ def cpu_mem_benchmark(cpu_threads: Optional[int] = 5,
118118
os.sched_setaffinity(cpu_proc.pid, {core})
119119
cpu_procs.append(cpu_proc)
120120

121-
mem_proc = subprocess.Popen(mem_prog)
122-
if core:
123-
os.sched_setaffinity(mem_proc.pid, {core})
121+
# mem_proc = subprocess.Popen(mem_prog)
122+
# if core:
123+
# os.sched_setaffinity(mem_proc.pid, {core})
124124

125125
return cpu_procs
126126

@@ -148,19 +148,19 @@ def get_parser() -> argparse.ArgumentParser:
148148
return parser
149149

150150

151-
def io_read_benchmark_user_input_data_size(other):
151+
def io_read_benchmark_user_input_data_size(inputs):
152152
print("[WfBench] Starting IO Read Benchmark...")
153-
for file in other:
154-
with open(this_dir.joinpath(file), "rb") as fp:
153+
for file in inputs:
154+
with open(file, "rb") as fp:
155155
print(f"[WfBench] Reading '{file}'")
156156
fp.readlines()
157157
print("[WfBench] Completed IO Read Benchmark!\n")
158158

159159

160160
def io_write_benchmark_user_input_data_size(outputs):
161-
for task_name, file_size in outputs.items():
162-
print(f"[WfBench] Writing output file '{task_name}'\n")
163-
with open(this_dir.joinpath(task_name), "wb") as fp:
161+
for file_name, file_size in outputs.items():
162+
print(f"[WfBench] Writing output file '{file_name}'\n")
163+
with open(file_name, "wb") as fp:
164164
fp.write(os.urandom(int(file_size)))
165165

166166

0 commit comments

Comments
 (0)