-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathlauncher_rviz.py
More file actions
111 lines (90 loc) · 3.18 KB
/
launcher_rviz.py
File metadata and controls
111 lines (90 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
RViz Visualization Launcher for ROS 2.
Handles the initialization and lifecycle of the ROS 2 Visualization tool (RViz).
Orchestrates the loading of .rviz configuration files and manages the X11
display environment to project sensor data (Lidar, Cameras, TF frames)
to the web frontend.
"""
from .launcher_interface import ILauncher
from robotics_application_manager.manager.docker_thread import DockerThread
from robotics_application_manager.manager.vnc import Vnc_server
from robotics_application_manager.libs import check_gpu_acceleration
import os
import stat
from typing import List, Any
class LauncherRviz(ILauncher):
display: str
internal_port: int
external_port: int
running: bool = False
acceptsMsgs: bool = False
threads: List[Any] = []
console_vnc: Any = Vnc_server()
def run(self, config_file, callback):
"""
Launches an RViz instance with a specific display configuration.
Args:
config_file (str): Path to the .rviz file defining the
displays and robot model.
callback (function): Lifecycle state-change callback.
"""
DRI_PATH = self.get_dri_path()
ACCELERATION_ENABLED = self.check_device(DRI_PATH)
config = "ros2 run rviz2 rviz2"
if config_file != None:
config = f"ros2 launch {config_file}"
print(config)
if ACCELERATION_ENABLED:
self.console_vnc.start_vnc_gpu(
self.display, self.internal_port, self.external_port, DRI_PATH
)
# Write display config and start the console
console_cmd = f"export VGL_DISPLAY={DRI_PATH}; export DISPLAY={self.display}; vglrun {config}"
else:
self.console_vnc.start_vnc(
self.display, self.internal_port, self.external_port
)
# Write display config and start the console
console_cmd = f"export DISPLAY={self.display};{config}"
console_thread = DockerThread(console_cmd)
console_thread.start()
self.threads.append(console_thread)
self.running = True
def pause(self):
pass
def unpause(self):
pass
def reset(self):
pass
def is_running(self):
return self.running
def terminate(self):
self.console_vnc.terminate()
for thread in self.threads:
if thread.is_alive():
thread.terminate()
thread.join()
self.threads.remove(thread)
self.running = False
def died(self):
pass
# rviz_node_full = Node(
# package="rviz2",
# executable="rviz2",
# name="rviz2",
# output="log",
# arguments=["-d", rviz_full_config],
# parameters=[
# robot_description,
# robot_description_semantic,
# kinematics_yaml,
# pilz_planning_pipeline_config,
# joint_limits,
# pilz_cartesian_limits,
# trajectory_execution,
# moveit_controllers,
# planning_scene_monitor_parameters,
# move_group_capabilities,
# {"use_sim_time": True},
# ]
# )