-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathlauncher_robot.py
More file actions
99 lines (83 loc) · 3.07 KB
/
launcher_robot.py
File metadata and controls
99 lines (83 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""LauncherRobot module for managing robot launchers in different simulation worlds."""
from typing import Optional
from pydantic import BaseModel
from robotics_application_manager.libs import (
get_class,
class_from_module,
get_ros_version,
)
from robotics_application_manager import LogManager
from .launcher_interface import ILauncher
worlds = {
"gazebo": {
"2": [
{
"type": "gazebo",
"module": "robot_ros2_api",
"parameters": [],
"launch_file": [],
}
],
},
"gz": {
"2": [
{
"type": "gz",
"module": "robot_ros2_api",
"parameters": [],
"launch_file": [],
}
],
},
"o3de": {},
"physical": {},
}
class LauncherRobot(BaseModel):
"""Class for managing robot launchers in different simulation worlds."""
type: str
launch_file_path: str
module: str = ".".join(__name__.split(".")[:-1])
ros_version: int = get_ros_version()
launchers: Optional[ILauncher] = []
start_pose: Optional[list] = []
def run(self, start_pose=None):
"""Run the robot launcher with an optional start pose."""
if start_pose is not None:
self.start_pose = start_pose
for module in worlds[self.type][str(self.ros_version)]:
module["launch_file"] = self.launch_file_path
launcher = self.launch_module(module)
self.launchers.append(launcher)
LogManager.logger.info(self.launchers)
def terminate(self):
"""Terminate all robot launchers and clear the launchers list."""
LogManager.logger.info("Terminating robot launcher")
if self.launchers:
for launcher in self.launchers:
launcher.terminate()
self.launchers = []
def launch_module(self, configuration):
"""Launch a robot module based on the provided configuration."""
def process_terminated(name, exit_code):
LogManager.logger.info(
f"LauncherEngine: {name} exited with code {exit_code}"
)
if self.terminated_callback is not None:
self.terminated_callback(name, exit_code)
launcher_module_name = configuration["module"]
launcher_module = (
f"{self.module}.launcher_{launcher_module_name}."
f"Launcher{class_from_module(launcher_module_name)}"
)
launcher_class = get_class(launcher_module)
launcher = launcher_class.from_config(launcher_class, configuration)
launcher.run(self.start_pose, process_terminated)
return launcher
def launch_command(self, configuration):
"""Launch a robot command based on the provided configuration."""
pass
class LauncherRobotException(Exception):
"""Exception class for errors related to LauncherRobot."""
def __init__(self, message):
"""Initialize the LauncherRobotException with a message."""
super(LauncherRobotException, self).__init__(message)