11import atexit
2+ import os
23import shutil
4+ import signal
35import subprocess
6+ import sys
47import tempfile
58from pathlib import Path
69
@@ -25,27 +28,59 @@ def run(cfg: DictConfig) -> None:
2528 # there were strange interactions when trying to spin the nodes by manually calling spin with threading or mp, but
2629 # this workaround seems to work with dynamic nodes.
2730 dataflow_path = write_tmp (cfg .dataflow , tmp_dir , "tmp_dataflow" )
28- process = subprocess .Popen (f"dora up && dora start { dataflow_path } " , shell = True )
31+ dora = subprocess .Popen (f"dora up && dora start { dataflow_path } " , shell = True , start_new_session = True )
2932
3033 yaml_paths = []
34+ node_processes = []
3135 for node_cfg in cfg .node_definitions .values ():
3236 # allow skipping of node definitions by setting to null when overriding configs
3337 if node_cfg is None :
3438 continue
3539
3640 node_name = f"tmp_{ node_cfg .node_id } "
3741 yaml_path = write_tmp (node_cfg , tmp_dir , node_name )
38- subprocess .Popen (f"python { Path (__file__ ).parent } /_launch_node.py -cp { tmp_dir } -cn { node_name } " , shell = True )
42+ p = subprocess .Popen (
43+ f"python { Path (__file__ ).parent } /_launch_node.py -cp { tmp_dir } -cn { node_name } " ,
44+ shell = True ,
45+ start_new_session = True ,
46+ )
3947 yaml_paths .append (yaml_path )
48+ node_processes .append (p )
4049
41- # register a cleanup function to remove the temp yaml files when the script exits
42- def _cleanup_tmp_dir ():
43- try :
44- shutil .rmtree (tmp_dir )
45- except Exception :
46- pass
50+ # register callback to tear down dora cleanly and terminate all node processes
51+ cleaned = False
52+ def _cleanup ():
53+ nonlocal cleaned
54+ if cleaned :
55+ return # idempotent cleanup
56+ cleaned = True
4757
48- atexit .register (_cleanup_tmp_dir )
58+ # tear down dora daemon and coordinator
59+ subprocess .run (["dora" , "destroy" ], check = True )
60+
61+ # terminate all node processes if they are still running
62+ for p in node_processes :
63+ try :
64+ if p .poll () is None :
65+ os .killpg (p .pid , signal .SIGINT )
66+ p .wait (timeout = 5 )
67+ except Exception :
68+ pass
69+
70+ # remove temporary yaml files
71+ shutil .rmtree (tmp_dir , ignore_errors = True )
72+
73+ atexit .register (_cleanup )
4974
5075 # don't terminate the script until the main dora process does
51- process .wait ()
76+ caught_keyboard_interrupt = False
77+ try :
78+ dora .wait ()
79+ except KeyboardInterrupt :
80+ caught_keyboard_interrupt = True
81+ pass
82+ finally :
83+ _cleanup ()
84+ if caught_keyboard_interrupt :
85+ raise KeyboardInterrupt ("Caught KeyboardInterrupt, cleaning up and exiting." )
86+ sys .exit (0 )
0 commit comments