Skip to content

Commit 3c300f9

Browse files
Merged/Added wfc-1.3_to_dask
1 parent e116826 commit 3c300f9

16 files changed

Lines changed: 749 additions & 0 deletions

File tree

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,9 @@ venv
4343

4444
# Workflow Schema
4545
workflow-schema.json
46+
47+
# wfc-1.3_to_dask specifics
48+
/wfcommons/wfc-1.3_to_dask/__pycache__/
49+
/wfcommons/wfc-1.3_to_dask/.coverage
50+
/wfcommons/wfc-1.3_to_dask/out
51+
/wfcommons/wfc-1.3_to_dask/htmlcov
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Objective
2+
wfc2dask.py is a tool that attempts to translate workflows specified using the WFCommons workflow format (version 1.3;
3+
see https://wfcommons.org/format or https://github.com/wfcommons/wfformat) to a workflow actionnable through dask
4+
5+
# TL;DR
6+
* You need to install dask (see https://docs.dask.org/en/stable/install.html that will tell you to run `conda install
7+
dask` or `pip install 'dask[distributed]'`). //TODO Check if this works if dask is not installed//
8+
* `python wfc2dask.py -h` to get help
9+
* `python wfc2dask.py <workflow_filename>` creates a directory (named out by default) where the Python code needed
10+
to execute the workflow is stored
11+
* `cd out` and then `python application.py` to run the workflow in a local dask
12+
13+
# The Gory Details
14+
## Configuration
15+
* The dask client is defined in `/out/dask_client.py`. Feel free to modify to your local configuration.
16+
* The workflow tasks and dask DAG are defined in `/out/run_workflow.py`. Feel free to modify as well but take into
17+
account the comments of the `__doc__` of that file
18+
* That's it for the configuration. You don't need to change the other files
19+
20+
## Implementation
21+
`/wfc2dask.py` contains the main. The WFCommons JSON document is digested using `/wfc2dask/wfctask.py` and its tasks
22+
are grouped in a `WFDag` defined in `/wfc2dask/wfdag.py`. Once all tasks have been ingested, the DAG is built and Python
23+
code is written to the output directory.
24+
25+
## Contents of the 'samples' directory
26+
### Contents of the 'samples/unittests' directory
27+
28+
Files necessary for the proper execution of unit tests
29+
30+
* hello-world-sequence.json:
31+
The first task (hello) creates a file whose contents are used in the second task (world).
32+
The DAG is the following:
33+
```
34+
(init) -> hello -> world -> (end)
35+
```
36+
37+
* hello-world-join.json:
38+
Three tasks in this workflow. The 'hello' and 'world' tasks each create a different file. Those files
39+
are then used in the third 'join' task
40+
The DAG is the following:
41+
```
42+
/-> hello -\
43+
(init) -> join -> (end)
44+
\-> world -/
45+
```
46+
47+
## Unit tests/Coverage
48+
### Unit tests
49+
```commandline
50+
python -m unittest discover -s tests
51+
````
52+
53+
## Coverage
54+
```
55+
conda install coverage # or pip install coverage
56+
coverage run --source=. --omit='code_templates/*' -m unittest discover -s tests/ && coverage html
57+
```
58+
Open htmlcov/index.html
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"""
2+
Looks like this is required
3+
"""
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""
2+
You don't need to modify this unless you know what you're doing
3+
4+
This is what you need to run with Python to execute the dask workflow
5+
application, main, driver, or custom name, feel free to rename this file.
6+
"""
7+
from dask_client import build_dask_client
8+
from run_workflow import run_workflow
9+
import json
10+
11+
12+
def to_json(obj):
13+
return json.dumps(obj, indent=2, default=lambda o: o.__dict__)
14+
15+
16+
def process_arguments():
17+
import argparse
18+
import sys
19+
parser = argparse.ArgumentParser(prog=sys.argv[0],
20+
description='Runs a workflow through dask') # TODO
21+
parser.add_argument("-nosim", "--do-not-simulate",
22+
help="Do not simulate all tasks (default: do simulate all tasks)", action="store_false")
23+
parser.add_argument("-s", "--seed", help="Randomizer seed (used when simulating)")
24+
return parser.parse_args()
25+
26+
27+
if __name__ == '__main__':
28+
args = process_arguments()
29+
with build_dask_client() as client:
30+
tasks = run_workflow(client, args.do_not_simulate, seed=int(args.seed))
31+
with open("run.json", "w") as fp:
32+
fp.write(to_json(tasks))
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"""
2+
Feel free to modify this to target your local dask configuration
3+
4+
Lots of info there:
5+
https://docs.dask.org/en/stable/configuration.html
6+
https://dask.pydata.org/en/latest/scheduling.html
7+
"""
8+
from dask.distributed import Client
9+
10+
11+
def build_dask_client():
12+
cpu_count = 8 # default is 4 for me
13+
threads_per_cpu = 4 # default value is 4 for me
14+
return Client(n_workers=cpu_count, threads_per_worker=threads_per_cpu)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""
2+
This is the method that should allow the execution of a task
3+
4+
"""
5+
# Those imports are required when pretending to run the commands
6+
import os
7+
import pathlib
8+
import time
9+
import logging
10+
from workflow_task import WorkflowTask
11+
12+
13+
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
14+
logger = logging.getLogger(__name__)
15+
16+
17+
def execute_task(task: WorkflowTask, fut_inputs_list) -> WorkflowTask:
18+
"""
19+
:param task: The task to be executed (it holds all relevant information)
20+
:param fut_inputs_list: Unused here but necessary for dask to build its own DAG
21+
:return:
22+
"""
23+
logger.info("Executing task %s/%s: %s / in=%s / out=%s" % (task.name, task.dag_id, task.command_arguments, task.inputs, task.outputs))
24+
start = time.time()
25+
if task.simulate or task.command_arguments is None or len(task.command_arguments) == 0:
26+
logger.info("Simulating execution of task %s" % task.name)
27+
# Pretend we do something/Wait some time
28+
task.simulate_execution()
29+
for output in task.outputs:
30+
logger.debug("Simulating %s => %s" % (task.command_arguments, output))
31+
pathlib.Path(output).touch()
32+
else:
33+
command = " ".join(task.command_arguments)
34+
logger.info("Running command for task %s/%s: %s" % (task.name, task.dag_id, command))
35+
os.system(command) # TODO Use subprocess?
36+
task.execution_time = time.time()-start
37+
logger.info("End of task %s/%s (%f)" % (task.name, task.dag_id, task.execution_time))
38+
return task
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"""
2+
The contents of this file have been largely generated
3+
4+
You may want to modify this to tune to your taste.
5+
It consists of two parts:
6+
+ The first part is the workflow tasks definition. Feel free to change the values if you need
7+
+ The second part is the dask code to execute the workflow. I wouldn't touch it too much if I were you (it's likely
8+
better to modify the WFCommons JSON workflow definition in my opinion but you hold the chainsaw eventually).
9+
"""
10+
from helpers import execute_task
11+
import random
12+
from workflow_task import WorkflowTask
13+
14+
15+
def run_workflow(client, simulate: bool, seed: int=42) -> list[WorkflowTask]:
16+
# Generated code goes here
17+
return TASKS
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""
2+
3+
A class used to define and help execute a task in a dask workflow
4+
5+
You don't need to modify this unless you know what you're doing
6+
7+
TODO The JSON should actually be a WFCTask
8+
"""
9+
import random
10+
import time
11+
12+
13+
class WorkflowTask:
14+
def __init__(self,
15+
dag_id: str = None,
16+
name: str = None,
17+
command_arguments: list[str] = None,
18+
inputs: list[str] = None,
19+
outputs: list[str] = None,
20+
simulate: bool = False,
21+
randomizer: random.Random = random.Random(),
22+
simulate_minimum_execution_time: float = 0.1,
23+
simulate_maximum_execution_time: float = 1.1,
24+
execution_time: float = None, # This is an execution output
25+
):
26+
self.dag_id = dag_id
27+
self.name = name
28+
self.command_arguments = command_arguments
29+
self.inputs = inputs
30+
self.outputs = outputs
31+
self.simulate = simulate
32+
self.randomizer = randomizer
33+
self.simulate_minimum_execution_time = simulate_minimum_execution_time
34+
self.simulate_maximum_execution_time = simulate_maximum_execution_time
35+
self.execution_time = execution_time
36+
37+
def simulate_execution(self):
38+
time.sleep(self.randomizer.uniform(self.simulate_minimum_execution_time,
39+
self.simulate_maximum_execution_time))
40+
41+
def pythonize(self, randomizer_varname: str = "randomizer") -> list[str]:
42+
codelines = ["WorkflowTask(dag_id = '%s'," % self.dag_id,
43+
" name = '%s'," % self.name,
44+
" command_arguments = %s," % self.command_arguments,
45+
" inputs = %s," % self.inputs,
46+
" outputs = %s," % self.outputs,
47+
" simulate = simulate,",
48+
" randomizer = %s," % randomizer_varname,
49+
" simulate_minimum_execution_time = %s," % self.simulate_minimum_execution_time,
50+
" simulate_maximum_execution_time = %s," % self.simulate_maximum_execution_time,
51+
" )"]
52+
return codelines
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
{
2+
"name": "hello-world-sequence",
3+
"description": "See README.md",
4+
"workflow": {
5+
"tasks": [
6+
{
7+
"name": "hello",
8+
"files": [
9+
{
10+
"link": "output",
11+
"name": "hello.dat"
12+
}
13+
]
14+
},
15+
{
16+
"name": "world",
17+
"files": [
18+
{
19+
"link": "output",
20+
"name": "world.dat"
21+
}
22+
]
23+
},
24+
{
25+
"name": "hello-world",
26+
"files": [
27+
{
28+
"link": "input",
29+
"name": "hello.dat"
30+
},
31+
{
32+
"link": "input",
33+
"name": "world.dat"
34+
},
35+
{
36+
"link": "output",
37+
"name": "helloworld.dat"
38+
}
39+
]
40+
}
41+
]
42+
}
43+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "hello-world-sequence",
3+
"description": "See README.md",
4+
"workflow": {
5+
"tasks": [
6+
{
7+
"name": "hello",
8+
"files": [
9+
{
10+
"link": "output",
11+
"name": "hello.dat"
12+
}
13+
]
14+
},
15+
{
16+
"name": "world",
17+
"parents": ["hello"],
18+
"files": [
19+
{
20+
"link": "input",
21+
"name": "hello.dat"
22+
},
23+
{
24+
"link": "output",
25+
"name": "world.dat"
26+
}
27+
]
28+
}
29+
]
30+
}
31+
}

0 commit comments

Comments
 (0)