Skip to content

Commit 1ec84b4

Browse files
committed
improving Dask translator
1 parent 0a2eedb commit 1ec84b4

5 files changed

Lines changed: 244 additions & 3 deletions

File tree

wfcommons/wfbench/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@
99
# (at your option) any later version.
1010

1111
from .bench import WorkflowBenchmark
12-
from .translator import PegasusTranslator, SwiftTTranslator
12+
from .translator import DaskTranslator, PegasusTranslator, SwiftTTranslator
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2021-2022 The WfCommons Team.
4+
# Copyright (c) 2021-2023 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

11+
from .dask import DaskTranslator
1112
from .pegasus import PegasusTranslator
12-
from.swift_t import SwiftTTranslator
13+
from .swift_t import SwiftTTranslator
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2023 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import pathlib
12+
13+
from logging import Logger
14+
from typing import Optional, Union
15+
16+
from .abstract_translator import Translator
17+
from ...common import FileLink, Workflow
18+
19+
this_dir = pathlib.Path(__file__).resolve().parent
20+
21+
22+
class DaskTranslator(Translator):
23+
"""
24+
A WfFormat parser for creating Dask workflow applications.
25+
26+
:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
27+
:type workflow: Union[Workflow, pathlib.Path],
28+
:param logger: The logger where to log information/warning or errors (optional).
29+
:type logger: Logger
30+
"""
31+
32+
def __init__(self,
33+
workflow: Union[Workflow, pathlib.Path],
34+
logger: Optional[Logger] = None) -> None:
35+
"""Create an object of the translator."""
36+
super().__init__(workflow, logger)
37+
self.parsed_tasks = []
38+
self.tasks_futures = {}
39+
self.task_id = 0
40+
41+
def translate(self, output_file_name: pathlib.Path) -> None:
42+
"""
43+
Translate a workflow benchmark description (WfFormat) into a Dask workflow application.
44+
45+
:param output_file_name: The name of the output file (e.g., workflow.py).
46+
:type output_file_name: pathlib.Path
47+
"""
48+
noindent_python_codelines = self._dask_wftasks_codelines("randomizer")
49+
50+
for task_name in self.root_task_names:
51+
noindent_python_codelines.extend(self._parse_tasks(task_name))
52+
53+
# generate results
54+
while self.task_id > 0:
55+
self.task_id -= 1
56+
noindent_python_codelines.append(f"TASKS['{self.parsed_tasks[self.task_id]}'] = fut_dv_{self.task_id}.result()")
57+
58+
# generate code
59+
INDENT = " "
60+
wf_codelines = "\n".join(["%s%s" % (INDENT, codeline) for codeline in noindent_python_codelines])
61+
with open(this_dir.joinpath("templates/dask_template.py")) as fp:
62+
run_workflow_code = fp.read()
63+
run_workflow_code = run_workflow_code.replace("# Generated code goes here", wf_codelines)
64+
with open("dask_workflow.py", "w") as fp:
65+
fp.write(run_workflow_code)
66+
67+
def _dask_wftasks_codelines(self,
68+
randomizer_varname: str,
69+
simulate_minimum_execution_time: float = 0.1,
70+
simulate_maximum_execution_time: float = 1.1) -> list[str]:
71+
"""
72+
Build the code definining all tasks in the workflow, i.e. WorkflowTask instances.
73+
74+
:param randomizer_varname: The name of the randomizer.
75+
:type randomizer_varname: str
76+
77+
:return: The non-indented Python lines of code used to instantiate the WorkflowTask instances.
78+
:rtype: list[str]
79+
"""
80+
codelines = ["randomizer = random.Random(seed)",
81+
"TASKS = {}"]
82+
for task in self.tasks.values():
83+
input_files = [f.name for f in task.files if f.link == FileLink.INPUT]
84+
output_files = [f.name for f in task.files if f.link == FileLink.OUTPUT]
85+
code = [f"WorkflowTask(dag_id = '{task.name}',",
86+
f" name = '{task.name}',",
87+
f" command_arguments = {[task.program] + task.args},",
88+
f" inputs = {input_files},",
89+
f" outputs = {output_files},",
90+
" simulate = simulate,",
91+
f" randomizer = {randomizer_varname},",
92+
f" simulate_minimum_execution_time = {simulate_minimum_execution_time},",
93+
f" simulate_maximum_execution_time = {simulate_maximum_execution_time},",
94+
" )"]
95+
codelines.append(f"TASKS['{task.name}'] = {code[0]}")
96+
codelines.extend([codeline for codeline in code[1:]])
97+
return codelines
98+
99+
def _parse_tasks(self, task_name: str) -> list[str]:
100+
"""
101+
Recursively iterates over workflow tasks to generate submit command.
102+
103+
:param task_name: The name of a task.
104+
:type task_name: str
105+
106+
:return: The
107+
:rtype: list[str]
108+
"""
109+
if task_name not in self.parsed_tasks:
110+
# check for dependencies
111+
for parent in self.task_parents[task_name]:
112+
if parent not in self.parsed_tasks:
113+
return []
114+
115+
self.parsed_tasks.append(task_name)
116+
self.tasks_futures[task_name] = f"fut_dv_{self.task_id}"
117+
self.task_id += 1
118+
noindent_python_codelines = [f"{self.tasks_futures[task_name]} = client.submit(execute_task, TASKS['{task_name}'], [])"]
119+
120+
# parse children
121+
for child in self.task_children[task_name]:
122+
noindent_python_codelines.extend(self._parse_tasks(child))
123+
124+
return noindent_python_codelines
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2023 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import argparse
12+
import json
13+
import logging
14+
import os
15+
import pathlib
16+
import random
17+
import sys
18+
import time
19+
20+
from dask.distributed import Client
21+
22+
23+
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
24+
logger = logging.getLogger(__name__)
25+
26+
27+
def build_dask_client():
28+
"""
29+
Feel free to modify this to target your local dask configuration
30+
31+
Lots of info there:
32+
https://docs.dask.org/en/stable/configuration.html
33+
https://dask.pydata.org/en/latest/scheduling.html
34+
"""
35+
cpu_count = 2
36+
threads_per_cpu = 2
37+
return Client(n_workers=cpu_count, threads_per_worker=threads_per_cpu)
38+
39+
40+
class WorkflowTask:
41+
def __init__(self,
42+
dag_id: str = None,
43+
name: str = None,
44+
command_arguments: list[str] = None,
45+
inputs: list[str] = None,
46+
outputs: list[str] = None,
47+
simulate: bool = False,
48+
randomizer: random.Random = random.Random(),
49+
simulate_minimum_execution_time: float = 0.1,
50+
simulate_maximum_execution_time: float = 1.1,
51+
execution_time: float = None, # This is an execution output
52+
):
53+
self.dag_id = dag_id
54+
self.name = name
55+
self.command_arguments = command_arguments
56+
self.inputs = inputs
57+
self.outputs = outputs
58+
self.simulate = simulate
59+
self.randomizer = randomizer
60+
self.simulate_minimum_execution_time = simulate_minimum_execution_time
61+
self.simulate_maximum_execution_time = simulate_maximum_execution_time
62+
self.execution_time = execution_time
63+
64+
def simulate_execution(self):
65+
time.sleep(self.randomizer.uniform(self.simulate_minimum_execution_time,
66+
self.simulate_maximum_execution_time))
67+
68+
69+
def execute_task(task: WorkflowTask, fut_inputs_list) -> WorkflowTask:
70+
"""
71+
:param task: The task to be executed (it holds all relevant information)
72+
:param fut_inputs_list: Unused here but necessary for dask to build its own DAG
73+
:return:
74+
"""
75+
logger.info("Executing task %s/%s: %s / in=%s / out=%s" % (task.name, task.dag_id, task.command_arguments, task.inputs, task.outputs))
76+
start = time.time()
77+
if task.simulate or task.command_arguments is None or len(task.command_arguments) == 0:
78+
logger.info("Simulating execution of task %s" % task.name)
79+
# Pretend we do something/Wait some time
80+
task.simulate_execution()
81+
for output in task.outputs:
82+
logger.debug("Simulating %s => %s" % (task.command_arguments, output))
83+
pathlib.Path(output).touch()
84+
else:
85+
command = " ".join(task.command_arguments)
86+
logger.info("Running command for task %s/%s: %s" % (task.name, task.dag_id, command))
87+
os.system(command) # TODO Use subprocess?
88+
task.execution_time = time.time()-start
89+
logger.info("End of task %s/%s (%f)" % (task.name, task.dag_id, task.execution_time))
90+
return task
91+
92+
93+
def run_workflow(client, simulate: bool, seed: int=42) -> list[WorkflowTask]:
94+
# Generated code goes here
95+
return TASKS
96+
97+
98+
def process_arguments():
99+
parser = argparse.ArgumentParser(prog=sys.argv[0],
100+
description='Runs a workflow through dask') # TODO
101+
parser.add_argument("-nosim", "--do-not-simulate",
102+
help="Do not simulate all tasks (default: do simulate all tasks)", action="store_false")
103+
parser.add_argument("-s", "--seed", help="Randomizer seed (used when simulating)")
104+
return parser.parse_args()
105+
106+
107+
def to_json(obj):
108+
return json.dumps(obj, indent=2, default=lambda o: o.__dict__)
109+
110+
111+
if __name__ == '__main__':
112+
args = process_arguments()
113+
with build_dask_client() as client:
114+
tasks = run_workflow(client, args.do_not_simulate, seed=int(args.seed))
115+
with open("run.json", "w") as fp:
116+
fp.write(to_json(tasks))

wfcommons/wfbench/wfbench.py

100644100755
File mode changed.

0 commit comments

Comments
 (0)