-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathexecutorlib.py
More file actions
78 lines (65 loc) · 2.25 KB
/
executorlib.py
File metadata and controls
78 lines (65 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from concurrent.futures import Executor
from importlib import import_module
from inspect import isfunction
from python_workflow_definition.models import PythonWorkflowDefinitionWorkflow
from python_workflow_definition.shared import (
get_dict,
get_list,
get_kwargs,
get_source_handles,
convert_nodes_list_to_dict,
remove_result,
NODES_LABEL,
EDGES_LABEL,
SOURCE_LABEL,
SOURCE_PORT_LABEL,
)
from python_workflow_definition.purepython import resort_total_lst, group_edges
def get_item(obj, key):
return obj[key]
def _get_value(result_dict: dict, nodes_new_dict: dict, link_dict: dict, exe: Executor):
source, source_handle = link_dict[SOURCE_LABEL], link_dict[SOURCE_PORT_LABEL]
if source in result_dict.keys():
result = result_dict[source]
elif source in nodes_new_dict.keys():
result = nodes_new_dict[source]
else:
raise KeyError()
if source_handle is None:
return result
else:
return exe.submit(get_item, obj=result, key=source_handle)
def load_workflow_json(file_name: str, exe: Executor):
content = remove_result(
workflow_dict=PythonWorkflowDefinitionWorkflow.load_json_file(
file_name=file_name
)
)
edges_new_lst = content[EDGES_LABEL]
nodes_new_dict = {}
for k, v in convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items():
if isinstance(v, str) and "." in v:
p, m = v.rsplit(".", 1)
mod = import_module(p)
nodes_new_dict[int(k)] = getattr(mod, m)
else:
nodes_new_dict[int(k)] = v
total_lst = group_edges(edges_new_lst)
total_new_lst = resort_total_lst(total_lst=total_lst, nodes_dict=nodes_new_dict)
result_dict = {}
last_key = None
for lst in total_new_lst:
node = nodes_new_dict[lst[0]]
if isfunction(node):
kwargs = {
k: _get_value(
result_dict=result_dict,
nodes_new_dict=nodes_new_dict,
link_dict=v,
exe=exe,
)
for k, v in lst[1].items()
}
result_dict[lst[0]] = exe.submit(node, **kwargs)
last_key = lst[0]
return result_dict[last_key]