-
Notifications
You must be signed in to change notification settings - Fork 213
Expand file tree
/
Copy pathtest_cli_duplicate_node_names.py
More file actions
125 lines (113 loc) · 4.62 KB
/
test_cli_duplicate_node_names.py
File metadata and controls
125 lines (113 loc) · 4.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import os
import sys
import unittest
from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch_ros.actions import Node
import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools
import pytest
from rclpy.utilities import get_available_rmw_implementations
from ros2node.api import INFO_NONUNIQUE_WARNING_TEMPLATE
@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
def generate_test_description(rmw_implementation):
path_to_complex_node_script = os.path.join(
os.path.dirname(__file__), 'fixtures', 'complex_node.py'
)
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
return LaunchDescription([
# Always restart daemon to isolate tests.
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
# Add test fixture actions.
Node(
executable=sys.executable,
arguments=[path_to_complex_node_script],
name='complex_node',
additional_env=additional_env,
),
Node(
executable=sys.executable,
arguments=[path_to_complex_node_script],
name='complex_node',
additional_env=additional_env,
),
Node(
executable=sys.executable,
arguments=[path_to_complex_node_script],
name='complex_node_2',
additional_env=additional_env,
),
launch_testing.actions.ReadyToTest(),
],
additional_env=additional_env
)
]
),
])
class TestROS2NodeCLIWithDuplicateNodeNames(unittest.TestCase):
@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
@contextlib.contextmanager
def launch_node_command(self, arguments):
node_command_action = ExecuteProcess(
cmd=['ros2', 'node', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
},
name='ros2node-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, node_command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
# ignore launch_ros and ros2cli daemon nodes
filtered_patterns=['.*launch_ros.*', '.*ros2cli.*'],
filtered_rmw_implementation=rmw_implementation
)
) as node_command:
yield node_command
cls.launch_node_command = launch_node_command
@launch_testing.markers.retry_on_failure(times=5, delay=1)
def test_info_warning(self):
with self.launch_node_command(arguments=['info', '/complex_node']) as node_command:
assert node_command.wait_for_shutdown(timeout=20)
assert node_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
INFO_NONUNIQUE_WARNING_TEMPLATE.format(num_nodes='2', node_name='/complex_node')
],
text=node_command.output, strict=False
), 'Output does not match:\n' + node_command.output