-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathlauncher_world.py
More file actions
102 lines (91 loc) · 2.81 KB
/
launcher_world.py
File metadata and controls
102 lines (91 loc) · 2.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from typing import Optional
from pydantic import BaseModel
from robotics_application_manager.libs import (
get_class,
class_from_module,
get_ros_version,
)
from robotics_application_manager import LogManager
from .launcher_interface import ILauncher
worlds = {
"gazebo": {
"2": [
{
"type": "gazebo",
"module": "ros2_api",
"parameters": [],
"launch_file": [],
}
],
},
"gz": {
"2": [
{
"type": "gz",
"module": "ros2_api",
"parameters": [],
"launch_file": [],
}
],
},
"o3de": {
"2": [
{
"width": 1024,
"height": 768,
"display": ":2",
"external_port": 6080,
"internal_port": 5900,
"type": "o3de",
"module": "o3de_api",
"parameters": [],
"launch_file": [],
}
],
},
"physical": {
"2": [
{
"type": "real",
"module": "ros2_api",
"parameters": [],
"launch_file": [],
}
],
},
}
class LauncherWorld(BaseModel):
type: str
launch_file_path: str
module: str = ".".join(__name__.split(".")[:-1])
ros_version: int = get_ros_version()
launchers: Optional[ILauncher] = []
def run(self):
for module in worlds[self.type][str(self.ros_version)]:
module["launch_file"] = self.launch_file_path
launcher = self.launch_module(module)
self.launchers.append(launcher)
def terminate(self):
LogManager.logger.info("Terminating world launcher")
if self.launchers:
for launcher in self.launchers:
launcher.terminate()
self.launchers = []
def launch_module(self, configuration):
def process_terminated(name, exit_code):
LogManager.logger.info(
f"LauncherEngine: {name} exited with code {exit_code}"
)
if self.terminated_callback is not None:
self.terminated_callback(name, exit_code)
launcher_module_name = configuration["module"]
launcher_module = f"{self.module}.launcher_{launcher_module_name}.Launcher{class_from_module(launcher_module_name)}"
launcher_class = get_class(launcher_module)
launcher = launcher_class.from_config(launcher_class, configuration)
launcher.run(process_terminated)
return launcher
def launch_command(self, configuration):
pass
class LauncherWorldException(Exception):
def __init__(self, message):
super(LauncherWorldException, self).__init__(message)