Skip to content

Commit 95e5a42

Browse files
authored
Add waitForMessage utility for one-shot message reception (#1444)
Adds `waitForMessage()`, the rclnodejs equivalent of rclpy's `wait_for_message(msg_type, node, topic, time_to_wait)`. This utility creates a temporary subscription, waits for the first message to arrive on a topic, and returns it as a Promise. The temporary subscription is always cleaned up — on success, timeout, or error. Supports an optional timeout and QoS profile. The node must be spinning before calling this function. **New files:** - `lib/wait_for_message.js` — Core implementation. Creates a temporary subscription with a one-shot callback that resolves the Promise on first message. Uses a `settled` guard to prevent double-resolution. Cleanup runs unconditionally via the `settle()` helper. Accepts optional `{ timeout, qos }` options. - `test/test-wait-for-message.js` — 7 tests covering: successful message reception, timeout rejection, first-message-only semantics, different message types (String, Int32), indefinite wait without timeout, subscription cleanup after receiving, and subscription cleanup on timeout. **Modified files:** - `index.js` — Added `require('./lib/wait_for_message.js')` and re-exported as `waitForMessage` property on the module. - `types/index.d.ts` — Added `WaitForMessageOptions` interface and generic `waitForMessage<T>(typeClass, node, topic, options?)` function declaration. Fix: #1443
1 parent aa6f1d3 commit 95e5a42

4 files changed

Lines changed: 344 additions & 0 deletions

File tree

index.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ const ParameterClient = require('./lib/parameter_client.js');
6464
const errors = require('./lib/errors.js');
6565
const ParameterWatcher = require('./lib/parameter_watcher.js');
6666
const ParameterEventHandler = require('./lib/parameter_event_handler.js');
67+
const waitForMessage = require('./lib/wait_for_message.js');
6768
const MessageIntrospector = require('./lib/message_introspector.js');
6869
const MessageInfo = require('./lib/message_info.js');
6970
const ObservableSubscription = require('./lib/observable_subscription.js');
@@ -466,6 +467,26 @@ let rcl = {
466467
node.spinOnce(timeout);
467468
},
468469

470+
/**
471+
* Wait for a single message on a topic.
472+
*
473+
* Creates a temporary subscription, waits for the first message to arrive,
474+
* and returns it. The temporary subscription is always cleaned up, even on
475+
* timeout or error. The node must be spinning before calling this function.
476+
*
477+
* This is the rclnodejs equivalent of rclpy's `wait_for_message`.
478+
*
479+
* @param {function|string|object} typeClass - The ROS message type class.
480+
* @param {Node} node - The node to create the temporary subscription on.
481+
* @param {string} topic - The topic name to listen on.
482+
* @param {object} [options] - Options.
483+
* @param {number} [options.timeout] - Timeout in milliseconds. If omitted, waits indefinitely.
484+
* @param {object} [options.qos] - QoS profile for the subscription.
485+
* @returns {Promise<object>} - Resolves with the received message.
486+
* @throws {Error} If timeout expires before a message arrives.
487+
*/
488+
waitForMessage: waitForMessage,
489+
469490
/**
470491
* Shutdown an RCL environment identified by a context. The shutdown process will
471492
* destroy all nodes and related resources in the context. If no context is

lib/wait_for_message.js

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright (c) 2026, The Robot Web Tools Contributors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
'use strict';
16+
17+
const { TimeoutError } = require('./errors.js');
18+
19+
/**
20+
* Wait for a single message on a topic.
21+
*
22+
* Creates a temporary subscription, waits for the first message to arrive,
23+
* and returns it. The temporary subscription is always cleaned up, even on
24+
* timeout or error. The node must be spinning before calling this function.
25+
*
26+
* This is the rclnodejs equivalent of rclpy's `wait_for_message`.
27+
*
28+
* @param {function|string|object} typeClass - The ROS message type class.
29+
* @param {Node} node - The node to create the temporary subscription on.
30+
* @param {string} topic - The topic name to listen on.
31+
* @param {object} [options] - Options.
32+
* @param {number} [options.timeout] - Timeout in milliseconds. If omitted, waits indefinitely.
33+
* @param {object} [options.qos] - QoS profile for the subscription.
34+
* @returns {Promise<object>} - Resolves with the received message.
35+
* @throws {Error} If timeout expires before a message arrives.
36+
*
37+
* @example
38+
* node.spin();
39+
* const msg = await waitForMessage(
40+
* 'std_msgs/msg/String',
41+
* node,
42+
* '/my_topic',
43+
* { timeout: 5000 }
44+
* );
45+
* console.log('Received:', msg.data);
46+
*/
47+
function waitForMessage(typeClass, node, topic, options = {}) {
48+
return new Promise((resolve, reject) => {
49+
let subscription = null;
50+
let timer = null;
51+
let settled = false;
52+
53+
const cleanup = () => {
54+
if (timer) {
55+
clearTimeout(timer);
56+
timer = null;
57+
}
58+
if (subscription) {
59+
try {
60+
node.destroySubscription(subscription);
61+
} catch {
62+
// Subscription may already be destroyed if node is shutting down
63+
}
64+
subscription = null;
65+
}
66+
};
67+
68+
const settle = (err, msg) => {
69+
if (settled) return;
70+
settled = true;
71+
cleanup();
72+
if (err) {
73+
reject(err);
74+
} else {
75+
resolve(msg);
76+
}
77+
};
78+
79+
try {
80+
const subOptions = {};
81+
if (options.qos) {
82+
subOptions.qos = options.qos;
83+
}
84+
85+
subscription = node.createSubscription(
86+
typeClass,
87+
topic,
88+
subOptions,
89+
(msg) => {
90+
settle(null, msg);
91+
}
92+
);
93+
94+
if (options.timeout != null && options.timeout >= 0) {
95+
timer = setTimeout(() => {
96+
settle(
97+
new TimeoutError(
98+
`waitForMessage timed out after ${options.timeout}ms on topic '${topic}'`,
99+
{ entityType: 'topic', entityName: topic }
100+
)
101+
);
102+
}, options.timeout);
103+
}
104+
} catch (err) {
105+
cleanup();
106+
reject(err);
107+
}
108+
});
109+
}
110+
111+
module.exports = waitForMessage;

test/test-wait-for-message.js

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Copyright (c) 2026, The Robot Web Tools Contributors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
'use strict';
16+
17+
const assert = require('assert');
18+
const rclnodejs = require('../index.js');
19+
20+
describe('waitForMessage tests', function () {
21+
this.timeout(60 * 1000);
22+
23+
let node;
24+
25+
before(function () {
26+
return rclnodejs.init();
27+
});
28+
29+
after(function () {
30+
rclnodejs.shutdown();
31+
});
32+
33+
beforeEach(function () {
34+
node = rclnodejs.createNode('wait_for_message_test_node');
35+
node.spin();
36+
});
37+
38+
afterEach(function () {
39+
node.stop();
40+
node.destroy();
41+
});
42+
43+
it('should receive a message', async function () {
44+
const publisher = node.createPublisher(
45+
'std_msgs/msg/String',
46+
'wfm_test_topic_1'
47+
);
48+
49+
// Publish after a short delay
50+
setTimeout(() => {
51+
publisher.publish('hello waitForMessage');
52+
}, 200);
53+
54+
const msg = await rclnodejs.waitForMessage(
55+
'std_msgs/msg/String',
56+
node,
57+
'wfm_test_topic_1',
58+
{ timeout: 5000 }
59+
);
60+
61+
assert.strictEqual(msg.data, 'hello waitForMessage');
62+
});
63+
64+
it('should timeout when no message arrives', async function () {
65+
await assert.rejects(
66+
() =>
67+
rclnodejs.waitForMessage(
68+
'std_msgs/msg/String',
69+
node,
70+
'wfm_nonexistent_topic',
71+
{ timeout: 500 }
72+
),
73+
(error) => {
74+
assert.strictEqual(error.name, 'TimeoutError');
75+
return true;
76+
}
77+
);
78+
});
79+
80+
it('should receive only the first message', async function () {
81+
const publisher = node.createPublisher(
82+
'std_msgs/msg/String',
83+
'wfm_test_topic_2'
84+
);
85+
86+
setTimeout(() => {
87+
publisher.publish('first');
88+
publisher.publish('second');
89+
publisher.publish('third');
90+
}, 200);
91+
92+
const msg = await rclnodejs.waitForMessage(
93+
'std_msgs/msg/String',
94+
node,
95+
'wfm_test_topic_2',
96+
{ timeout: 5000 }
97+
);
98+
99+
assert.strictEqual(msg.data, 'first');
100+
});
101+
102+
it('should work with different message types', async function () {
103+
const publisher = node.createPublisher(
104+
'std_msgs/msg/Int32',
105+
'wfm_test_topic_3'
106+
);
107+
108+
setTimeout(() => {
109+
publisher.publish({ data: 42 });
110+
}, 200);
111+
112+
const msg = await rclnodejs.waitForMessage(
113+
'std_msgs/msg/Int32',
114+
node,
115+
'wfm_test_topic_3',
116+
{ timeout: 5000 }
117+
);
118+
119+
assert.strictEqual(msg.data, 42);
120+
});
121+
122+
it('should wait indefinitely when no timeout is specified', async function () {
123+
const publisher = node.createPublisher(
124+
'std_msgs/msg/String',
125+
'wfm_test_topic_4'
126+
);
127+
128+
// Publish after a delay — should still be caught without timeout
129+
setTimeout(() => {
130+
publisher.publish('delayed message');
131+
}, 500);
132+
133+
const msg = await rclnodejs.waitForMessage(
134+
'std_msgs/msg/String',
135+
node,
136+
'wfm_test_topic_4'
137+
);
138+
139+
assert.strictEqual(msg.data, 'delayed message');
140+
});
141+
142+
it('should clean up subscription after receiving', async function () {
143+
const publisher = node.createPublisher(
144+
'std_msgs/msg/String',
145+
'wfm_test_topic_5'
146+
);
147+
148+
const subCountBefore = node._subscriptions.length;
149+
150+
setTimeout(() => {
151+
publisher.publish('cleanup test');
152+
}, 200);
153+
154+
await rclnodejs.waitForMessage(
155+
'std_msgs/msg/String',
156+
node,
157+
'wfm_test_topic_5',
158+
{ timeout: 5000 }
159+
);
160+
161+
// Subscription should be cleaned up
162+
assert.strictEqual(node._subscriptions.length, subCountBefore);
163+
});
164+
165+
it('should clean up subscription on timeout', async function () {
166+
const subCountBefore = node._subscriptions.length;
167+
168+
await assert.rejects(() =>
169+
rclnodejs.waitForMessage(
170+
'std_msgs/msg/String',
171+
node,
172+
'wfm_timeout_cleanup_topic',
173+
{ timeout: 300 }
174+
)
175+
);
176+
177+
// Subscription should be cleaned up even on timeout
178+
assert.strictEqual(node._subscriptions.length, subCountBefore);
179+
});
180+
});

types/index.d.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,38 @@ declare module 'rclnodejs' {
8787
* @deprecated since 0.18.0, Use Node.spinOnce(timeout)*/
8888
function spinOnce(node: Node, timeout?: number): void;
8989

90+
/**
91+
* Options for waitForMessage.
92+
*/
93+
interface WaitForMessageOptions {
94+
/** Timeout in milliseconds. If omitted, waits indefinitely. */
95+
timeout?: number;
96+
/** QoS profile for the temporary subscription. */
97+
qos?: QoS;
98+
}
99+
100+
/**
101+
* Wait for a single message on a topic.
102+
*
103+
* Creates a temporary subscription, waits for the first message to arrive,
104+
* and returns it. The node must be spinning before calling this function.
105+
*
106+
* This is the rclnodejs equivalent of rclpy's `wait_for_message`.
107+
*
108+
* @param typeClass - The ROS message type class.
109+
* @param node - The node to create the temporary subscription on.
110+
* @param topic - The topic name to listen on.
111+
* @param options - Options including timeout and QoS.
112+
* @returns Resolves with the received message.
113+
* @throws Error if timeout expires before a message arrives.
114+
*/
115+
function waitForMessage<T extends TypeClass<MessageTypeClassName>>(
116+
typeClass: T,
117+
node: Node,
118+
topic: string,
119+
options?: WaitForMessageOptions
120+
): Promise<MessageType<T>>;
121+
90122
/**
91123
* Stop all activity, destroy all nodes and node components.
92124
*

0 commit comments

Comments
 (0)