1+ import os
2+ from typing import List , Any
3+ import time
4+ import stat
5+
6+ from src .manager .manager .launcher .launcher_interface import ILauncher , LauncherException
7+ from src .manager .manager .docker_thread .docker_thread import DockerThread
8+ import subprocess
9+
10+ import logging
11+
12+ class LauncherRos2ApiPhy (ILauncher ):
13+ exercise_id : str
14+ type : str
15+ module : str
16+ resource_folders : List [str ]
17+ model_folders : List [str ]
18+ plugin_folders : List [str ]
19+ parameters : List [str ]
20+ launch_file : str
21+ running = False
22+ threads = []
23+
24+ def run (self ,callback ):
25+ DRI_PATH = os .path .join ("/dev/dri" , os .environ .get ("DRI_NAME" , "card0" ))
26+ ACCELERATION_ENABLED = self .check_device (DRI_PATH )
27+
28+ logging .getLogger ("roslaunch" ).setLevel (logging .CRITICAL )
29+
30+ # expand variables in configuration paths
31+ launch_file = os .path .expandvars (self .launch_file )
32+
33+ print ("\n \n \n Launch file: " + str (launch_file ))
34+
35+ if (ACCELERATION_ENABLED ):
36+ exercise_launch_cmd = f"export VGL_DISPLAY={ DRI_PATH } ; vglrun ros2 launch { launch_file } "
37+ else :
38+ exercise_launch_cmd = f"ros2 launch { launch_file } "
39+
40+ exercise_launch_thread = DockerThread (exercise_launch_cmd )
41+ exercise_launch_thread .start ()
42+ self .threads .append (exercise_launch_thread )
43+
44+ self .running = True
45+
46+ def is_running (self ):
47+ return self .running
48+
49+ def check_device (self , device_path ):
50+ try :
51+ return stat .S_ISCHR (os .lstat (device_path )[stat .ST_MODE ])
52+ except :
53+ return False
54+
55+ def terminate (self ):
56+ for thread in self .threads :
57+ thread .terminate ()
58+ thread .join ()
59+ self .running = False
60+
61+ def died (self ):
62+ pass
0 commit comments