-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathlauncher_robot_ros2_api.py
More file actions
77 lines (63 loc) · 2.35 KB
/
launcher_robot_ros2_api.py
File metadata and controls
77 lines (63 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
from typing import List, Any
import time
import stat
from .launcher_interface import (
ILauncher,
LauncherException,
)
from robotics_application_manager.manager.docker_thread import DockerThread
import subprocess
import logging
class LauncherRobotRos2Api(ILauncher):
type: str
module: str
launch_file: str
threads: List[Any] = []
def run(self, robot_pose, callback):
DRI_PATH = self.get_dri_path()
ACCELERATION_ENABLED = self.check_device(DRI_PATH)
logging.getLogger("roslaunch").setLevel(logging.CRITICAL)
xserver_cmd = f"/usr/bin/Xorg -quiet -noreset +extension GLX +extension RANDR +extension RENDER -logfile ./xdummy.log -config ./xorg.conf :0"
xserver_thread = DockerThread(xserver_cmd)
xserver_thread.start()
self.threads.append(xserver_thread)
ROBOT_POSE = f"ROBOT_X={robot_pose[0]} ROBOT_Y={robot_pose[1]} ROBOT_Z={robot_pose[2]} ROBOT_ROLL={robot_pose[3]} ROBOT_PITCH={robot_pose[4]} ROBOT_YAW={robot_pose[5]}"
if ACCELERATION_ENABLED:
exercise_launch_cmd = f"export VGL_DISPLAY={DRI_PATH}; vglrun {ROBOT_POSE} ros2 launch {self.launch_file}"
else:
exercise_launch_cmd = f"{ROBOT_POSE} ros2 launch {self.launch_file}"
exercise_launch_thread = DockerThread(exercise_launch_cmd)
exercise_launch_thread.start()
def terminate(self):
if self.threads is not None:
for thread in self.threads:
if thread.is_alive():
thread.terminate()
thread.join()
self.threads.remove(thread)
# kill_cmd = "pkill -9 -f "
# cmd = kill_cmd + "spawn_robot_warehouse.launch.py"
# subprocess.call(
# cmd,
# shell=True,
# stdout=subprocess.PIPE,
# bufsize=1024,
# universal_newlines=True,
# )
# cmd = kill_cmd + "/opt/ros/humble"
# subprocess.call(
# cmd,
# shell=True,
# stdout=subprocess.PIPE,
# bufsize=1024,
# universal_newlines=True,
# )
# cmd = kill_cmd + "robot_state_publisher"
# subprocess.call(
# cmd,
# shell=True,
# stdout=subprocess.PIPE,
# bufsize=1024,
# universal_newlines=True,
# )