Skip to content

Commit a932769

Browse files
committed
tests: add tests for to transition
1 parent d4eda2c commit a932769

2 files changed

Lines changed: 236 additions & 2 deletions

File tree

manager/manager/manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ def on_launch_world(self, event):
317317

318318
# Launch world
319319
try:
320-
if world_cfg["world"] == None:
320+
if world_cfg["world"] is None:
321321
self.world_launcher = None
322322
LogManager.logger.info("Launch transition finished")
323323
return
@@ -339,7 +339,7 @@ def on_launch_world(self, event):
339339

340340
# Launch robot
341341
try:
342-
if robot_cfg["world"] == None:
342+
if robot_cfg["world"] is None:
343343
self.robot_launcher = None
344344
LogManager.logger.info("Launch transition finished")
345345
return
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
"""Tests for transitioning Manager from 'connected' to 'world_ready' state."""
2+
3+
import pytest
4+
5+
from manager.manager.manager import Manager
6+
7+
8+
class DummyConsumer:
9+
"""A dummy consumer to capture messages sent by the Manager."""
10+
11+
def __init__(self):
12+
"""
13+
Initialize the DummyConsumer with empty message storage.
14+
15+
This constructor sets up the messages list and last_message attribute.
16+
"""
17+
self.messages = []
18+
self.last_message = None
19+
20+
def send_message(self, *args, **kwargs):
21+
"""
22+
Capture and store a message sent by the Manager.
23+
24+
Stores the message arguments and updates the last_message attribute.
25+
"""
26+
self.messages.append((args, kwargs))
27+
self.last_message = (args, kwargs)
28+
29+
30+
@pytest.fixture
31+
def manager(monkeypatch):
32+
"""Fixture to provide a Manager instance with patched dependencies for testing."""
33+
34+
# Patch subprocess.check_output for ROS_DISTRO and IMAGE_TAG
35+
def fake_check_output(cmd, *a, **k):
36+
if "ROS_DISTRO" in cmd[-1]:
37+
return b"humble"
38+
if "IMAGE_TAG" in cmd[-1]:
39+
return b"test_image_tag"
40+
return b""
41+
42+
monkeypatch.setattr("subprocess.check_output", fake_check_output)
43+
44+
# Patch check_gpu_acceleration where it is used
45+
monkeypatch.setattr(
46+
"manager.manager.manager.check_gpu_acceleration", lambda x=None: "OFF"
47+
)
48+
49+
# Patch os.makedirs and os.path.isdir to avoid real FS operations
50+
monkeypatch.setattr("os.makedirs", lambda path, exist_ok=False: None)
51+
monkeypatch.setattr("os.path.isdir", lambda path: True)
52+
53+
# Patch LauncherWorld to avoid launching real processes
54+
class DummyLauncherWorld:
55+
def __init__(self, *a, **k):
56+
self.launched = False
57+
58+
def launch(self):
59+
self.launched = True
60+
61+
def run(self):
62+
self.launched = True
63+
# Simulate running the world
64+
return
65+
66+
def terminate(self):
67+
pass
68+
69+
monkeypatch.setattr("manager.manager.manager.LauncherWorld", DummyLauncherWorld)
70+
71+
# Setup Manager with dummy consumer
72+
m = Manager(host="localhost", port=12345)
73+
m.consumer = DummyConsumer()
74+
# Move to 'connected' state first
75+
m.trigger("connect", event=None)
76+
return m
77+
78+
79+
def test_connected_to_world_ready(manager):
80+
"""Test transitioning Manager from 'connected' to 'world_ready' state."""
81+
# Initial state should be 'connected'
82+
assert manager.state == "connected"
83+
84+
# Use ConfigurationModel for valid world config
85+
from manager.libs.launch_world_model import ConfigurationModel
86+
87+
valid_world_cfg = ConfigurationModel(
88+
world="test_world", launch_file_path="/path/to/launch_file.launch"
89+
).model_dump()
90+
event_data = {
91+
"world": valid_world_cfg,
92+
"robot": {
93+
"world": None, # No robot specified
94+
"robot_config": {"name": "test_robot", "type": "simple"},
95+
},
96+
}
97+
manager.trigger("launch_world", data=event_data)
98+
99+
# State should now be 'world_ready'
100+
assert manager.state == "world_ready"
101+
102+
# Check that the consumer received the expected state change message
103+
msgs = manager.consumer.messages
104+
state_change_msgs = [
105+
msg for msg in msgs if msg[1].get("command") == "state-changed"
106+
]
107+
assert state_change_msgs
108+
assert state_change_msgs[-1][0][0]["state"] == "world_ready"
109+
110+
111+
def test_launch_world_with_invalid_world_config(manager, monkeypatch):
112+
"""Test that launching world with invalid world config logs error."""
113+
114+
# Patch ConfigurationManager.validate to simulate a failed validation
115+
# but still return a dummy config
116+
class DummyConfig:
117+
def model_dump(self):
118+
return {}
119+
120+
def fake_validate(cfg):
121+
# Simulate logging error, but return a dummy config to avoid UnboundLocalError
122+
return DummyConfig()
123+
124+
monkeypatch.setattr(
125+
"manager.libs.launch_world_model.ConfigurationManager.validate", fake_validate
126+
)
127+
128+
invalid_world_cfg = {"world": "bad_world"} # missing launch_file_path
129+
event_data = {
130+
"world": invalid_world_cfg,
131+
"robot": {
132+
"world": None,
133+
"robot_config": {"name": "test_robot", "type": "simple"},
134+
},
135+
}
136+
manager.trigger("launch_world", data=event_data)
137+
# Assert that world_launcher is created but has no useful config
138+
assert manager.world_launcher is not None
139+
assert (
140+
getattr(manager.world_launcher, "world", None) is None
141+
or manager.world_launcher.world == ""
142+
)
143+
144+
145+
def test_launch_world_with_invalid_robot_config(manager, monkeypatch):
146+
"""Test that launching world with invalid robot config logs error."""
147+
148+
# Patch ConfigurationManager.validate to simulate a failed validation
149+
# but still return a dummy config
150+
class DummyConfig:
151+
def model_dump(self):
152+
return {}
153+
154+
def fake_validate(cfg):
155+
# Simulate logging error, but return a dummy config to avoid UnboundLocalError
156+
return DummyConfig()
157+
158+
monkeypatch.setattr(
159+
"manager.libs.launch_world_model.ConfigurationManager.validate", fake_validate
160+
)
161+
162+
valid_world_cfg = {
163+
"world": "test_world",
164+
"launch_file_path": "/path/to/launch_file.launch",
165+
}
166+
invalid_robot_cfg = {"name": "", "type": ""} # Invalid robot config
167+
event_data = {
168+
"world": valid_world_cfg,
169+
"robot": {
170+
"world": valid_world_cfg,
171+
"robot_config": invalid_robot_cfg,
172+
},
173+
}
174+
175+
with pytest.raises(ValueError):
176+
# This should raise an error due to invalid robot config
177+
manager.trigger("launch_world", data=event_data)
178+
179+
# Assert that robot_launcher is not created
180+
assert manager.robot_launcher is None
181+
assert (
182+
getattr(manager.robot_launcher, "robot_config", None) is None
183+
or manager.robot_launcher.robot_config == {}
184+
)
185+
186+
187+
def test_launch_world_with_no_world_config(manager):
188+
"""Test that launching world with no world config does not raise an error."""
189+
# Initial state should be 'connected'
190+
assert manager.state == "connected"
191+
192+
# Use ConfigurationModel for valid robot config
193+
from manager.libs.launch_world_model import ConfigurationModel
194+
195+
valid_robot_cfg = ConfigurationModel(
196+
world="test_world", # No world specified
197+
launch_file_path="/path/to/robot_launch_file.launch",
198+
).model_dump()
199+
event_data = {
200+
"world": {
201+
"world": None, # No world specified
202+
"launch_file_path": None, # No launch file specified
203+
}, # No world specified
204+
"robot": valid_robot_cfg,
205+
}
206+
207+
manager.trigger("launch_world", data=event_data)
208+
209+
# State should now be 'world_ready'
210+
assert manager.state == "world_ready"
211+
assert manager.world_launcher is None
212+
213+
214+
def test_launch_world_with_no_robot_config(manager):
215+
"""Test that launching world with no robot config does not raise an error."""
216+
# Initial state should be 'connected'
217+
assert manager.state == "connected"
218+
219+
# Use ConfigurationModel for valid world config
220+
from manager.libs.launch_world_model import ConfigurationModel
221+
222+
valid_world_cfg = ConfigurationModel(
223+
world="test_world", launch_file_path="/path/to/launch_file.launch"
224+
).model_dump()
225+
226+
event_data = {
227+
"world": valid_world_cfg,
228+
"robot": {"world": None, "robot_config": None}, # No robot specified
229+
}
230+
manager.trigger("launch_world", data=event_data)
231+
232+
# State should now be 'world_ready'
233+
assert manager.state == "world_ready"
234+
assert manager.robot_launcher is None

0 commit comments

Comments
 (0)