Skip to content

Commit aa6f1d3

Browse files
authored
Expose MessageInfo metadata on subscription callbacks (#1440)
Exposes ROS 2 message metadata (timestamps, sequence numbers, publisher GID) to subscription callbacks, matching rclpy's `MessageInfo` pattern. When a subscription callback declares two parameters, it automatically receives a `MessageInfo` object as the second argument containing metadata from the middleware layer. Single-parameter callbacks are unchanged — no performance overhead, full backward compatibility. **New files:** - `lib/message_info.js` — `MessageInfo` class wrapping native `rmw_message_info_t` fields: `sourceTimestamp` (bigint), `receivedTimestamp` (bigint), `publicationSequenceNumber` (bigint), `receptionSequenceNumber` (bigint), `publisherGid` (Buffer). Includes `toPlainObject()` helper. - `types/message_info.d.ts` — Full TypeScript declarations for `MessageInfo`. - `test/test-message-info.js` — 6 tests covering: MessageInfo delivery with 2-param callbacks, non-delivery with 1-param callbacks, timestamp validity, timestamp ordering, `toPlainObject()`, and export verification. **Modified files:** - `src/rcl_subscription_bindings.cpp` — Added `RclTakeWithInfo()` native function that calls `rcl_take()` with `rmw_message_info_t` (instead of `nullptr`) and returns a JS object with all metadata fields as BigInts and a GID Buffer. Registered as `rclTakeWithInfo` binding. Added `#include <rmw/types.h>`. - `lib/subscription.js` — Added `_wantsMessageInfo` flag detected via `callback.length >= 2`. Added `wantsMessageInfo` readonly getter. Updated `processResponse()` to accept optional `messageInfo` second parameter and forward it to the callback when the flag is set. - `lib/node.js` — Updated subscription spin loop: when `subscription.wantsMessageInfo` is true, uses `rclTakeWithInfo` and wraps the result in `MessageInfo`; otherwise uses the existing `rclTake` path (zero overhead for 1-param callbacks). Added `MessageInfo` require. - `index.js` — Exported `MessageInfo` class. - `types/index.d.ts` — Added `/// <reference>` for `message_info.d.ts`. - `types/subscription.d.ts` — Updated `SubscriptionCallback` and `SubscriptionWithRawMessageCallback` type aliases to accept optional second `MessageInfo` parameter via union types. Fix: #1439
1 parent 607e03a commit aa6f1d3

10 files changed

Lines changed: 463 additions & 9 deletions

index.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ const errors = require('./lib/errors.js');
6565
const ParameterWatcher = require('./lib/parameter_watcher.js');
6666
const ParameterEventHandler = require('./lib/parameter_event_handler.js');
6767
const MessageIntrospector = require('./lib/message_introspector.js');
68+
const MessageInfo = require('./lib/message_info.js');
6869
const ObservableSubscription = require('./lib/observable_subscription.js');
6970
const { spawn } = require('child_process');
7071
const {
@@ -247,6 +248,9 @@ let rcl = {
247248
/** {@link ParameterEventHandler} class */
248249
ParameterEventHandler: ParameterEventHandler,
249250

251+
/** {@link MessageInfo} class */
252+
MessageInfo: MessageInfo,
253+
250254
/** {@link ObservableSubscription} class */
251255
ObservableSubscription: ObservableSubscription,
252256

lib/message_info.js

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (c) 2026, The Robot Web Tools Contributors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
'use strict';
16+
17+
/**
18+
* @class MessageInfo
19+
*
20+
* Contains metadata about a received message, including timestamps,
21+
* sequence numbers, and the publisher's globally unique identifier (GID).
22+
*
23+
* This is the rclnodejs equivalent of rclpy's MessageInfo.
24+
* It is passed as the second argument to subscription callbacks when the
25+
* callback accepts two parameters.
26+
*
27+
* @example
28+
* node.createSubscription(
29+
* 'std_msgs/msg/String',
30+
* 'topic',
31+
* (msg, messageInfo) => {
32+
* console.log('Source timestamp:', messageInfo.sourceTimestamp);
33+
* console.log('Received at:', messageInfo.receivedTimestamp);
34+
* console.log('Publisher GID:', messageInfo.publisherGid);
35+
* }
36+
* );
37+
*/
38+
class MessageInfo {
39+
/**
40+
* Create a MessageInfo from a raw info object returned by the native layer.
41+
*
42+
* @param {object} rawInfo - Raw message info from rclTakeWithInfo
43+
* @hideconstructor
44+
*/
45+
constructor(rawInfo) {
46+
/**
47+
* The timestamp when the message was published (nanoseconds since epoch).
48+
* @type {bigint}
49+
*/
50+
this.sourceTimestamp = rawInfo.source_timestamp;
51+
52+
/**
53+
* The timestamp when the message was received by the subscription (nanoseconds since epoch).
54+
* @type {bigint}
55+
*/
56+
this.receivedTimestamp = rawInfo.received_timestamp;
57+
58+
/**
59+
* The publication sequence number assigned by the publisher.
60+
* @type {bigint}
61+
*/
62+
this.publicationSequenceNumber = rawInfo.publication_sequence_number;
63+
64+
/**
65+
* The reception sequence number assigned by the subscriber.
66+
* @type {bigint}
67+
*/
68+
this.receptionSequenceNumber = rawInfo.reception_sequence_number;
69+
70+
/**
71+
* The globally unique identifier (GID) of the publisher.
72+
* A Buffer containing the raw GID bytes.
73+
* @type {Buffer}
74+
*/
75+
this.publisherGid = rawInfo.publisher_gid;
76+
}
77+
78+
/**
79+
* Convert to a plain object representation.
80+
*
81+
* @returns {object} Plain object with all metadata fields
82+
*/
83+
toPlainObject() {
84+
return {
85+
sourceTimestamp: this.sourceTimestamp,
86+
receivedTimestamp: this.receivedTimestamp,
87+
publicationSequenceNumber: this.publicationSequenceNumber,
88+
receptionSequenceNumber: this.receptionSequenceNumber,
89+
publisherGid: this.publisherGid,
90+
};
91+
}
92+
}
93+
94+
module.exports = MessageInfo;

lib/node.js

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const Rates = require('./rate.js');
4747
const Service = require('./service.js');
4848
const Subscription = require('./subscription.js');
4949
const ObservableSubscription = require('./observable_subscription.js');
50+
const MessageInfo = require('./message_info.js');
5051
const TimeSource = require('./time_source.js');
5152
const Timer = require('./timer.js');
5253
const TypeDescriptionService = require('./type_description_service.js');
@@ -271,9 +272,22 @@ class Node extends rclnodejs.ShadowNode {
271272
this._runWithMessageType(
272273
subscription.typeClass,
273274
(message, deserialize) => {
274-
let success = rclnodejs.rclTake(subscription.handle, message);
275-
if (success) {
276-
subscription.processResponse(deserialize());
275+
if (subscription.wantsMessageInfo) {
276+
let rawInfo = rclnodejs.rclTakeWithInfo(
277+
subscription.handle,
278+
message
279+
);
280+
if (rawInfo) {
281+
subscription.processResponse(
282+
deserialize(),
283+
new MessageInfo(rawInfo)
284+
);
285+
}
286+
} else {
287+
let success = rclnodejs.rclTake(subscription.handle, message);
288+
if (success) {
289+
subscription.processResponse(deserialize());
290+
}
277291
}
278292
}
279293
);

lib/subscription.js

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,44 @@ class Subscription extends Entity {
4545
this._isRaw = options.isRaw || false;
4646
this._serializationMode = options.serializationMode || 'default';
4747
this._node = node;
48+
this._wantsMessageInfo = callback.length >= 2;
4849

4950
if (node && eventCallbacks) {
5051
this._events = eventCallbacks.createEventHandlers(this.handle);
5152
node._events.push(...this._events);
5253
}
5354
}
5455

55-
processResponse(msg) {
56+
/**
57+
* Whether this subscription's callback wants MessageInfo as a second argument.
58+
* Determined by callback.length >= 2.
59+
* @type {boolean}
60+
* @readonly
61+
*/
62+
get wantsMessageInfo() {
63+
return this._wantsMessageInfo;
64+
}
65+
66+
processResponse(msg, messageInfo) {
5667
debug(`Message of topic ${this._topic} received.`);
5768
if (this._isRaw) {
58-
this._callback(msg);
69+
if (this._wantsMessageInfo && messageInfo) {
70+
this._callback(msg, messageInfo);
71+
} else {
72+
this._callback(msg);
73+
}
5974
} else {
6075
let message = msg.toPlainObject(this.typedArrayEnabled);
6176

6277
if (this._serializationMode !== 'default') {
6378
message = applySerializationMode(message, this._serializationMode);
6479
}
6580

66-
this._callback(message);
81+
if (this._wantsMessageInfo && messageInfo) {
82+
this._callback(message, messageInfo);
83+
} else {
84+
this._callback(message);
85+
}
6786
}
6887
}
6988

src/rcl_subscription_bindings.cpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <rcl/error_handling.h>
1818
#include <rcl/rcl.h>
19+
#include <rmw/types.h>
1920

2021
#include <cstdio>
2122
#include <memory>
@@ -53,6 +54,53 @@ Napi::Value RclTake(const Napi::CallbackInfo& info) {
5354
return env.Undefined();
5455
}
5556

57+
Napi::Value RclTakeWithInfo(const Napi::CallbackInfo& info) {
58+
Napi::Env env = info.Env();
59+
60+
RclHandle* subscription_handle =
61+
RclHandle::Unwrap(info[0].As<Napi::Object>());
62+
rcl_subscription_t* subscription =
63+
reinterpret_cast<rcl_subscription_t*>(subscription_handle->ptr());
64+
void* msg_taken = info[1].As<Napi::Buffer<char>>().Data();
65+
66+
rmw_message_info_t message_info = rmw_get_zero_initialized_message_info();
67+
rcl_ret_t ret = rcl_take(subscription, msg_taken, &message_info, nullptr);
68+
69+
if (ret != RCL_RET_OK && ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
70+
std::string error_string = rcl_get_error_string().str;
71+
rcl_reset_error();
72+
Napi::Error::New(env, error_string).ThrowAsJavaScriptException();
73+
return env.Undefined();
74+
}
75+
76+
if (ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
77+
return env.Undefined();
78+
}
79+
80+
// Build JS object with message info fields
81+
Napi::Object js_info = Napi::Object::New(env);
82+
js_info.Set("source_timestamp",
83+
Napi::BigInt::New(env, message_info.source_timestamp));
84+
js_info.Set("received_timestamp",
85+
Napi::BigInt::New(env, message_info.received_timestamp));
86+
js_info.Set(
87+
"publication_sequence_number",
88+
Napi::BigInt::New(
89+
env, static_cast<int64_t>(message_info.publication_sequence_number)));
90+
js_info.Set(
91+
"reception_sequence_number",
92+
Napi::BigInt::New(
93+
env, static_cast<int64_t>(message_info.reception_sequence_number)));
94+
95+
// Publisher GID as Buffer
96+
auto gid_buf =
97+
Napi::Buffer<uint8_t>::Copy(env, message_info.publisher_gid.data,
98+
sizeof(message_info.publisher_gid.data));
99+
js_info.Set("publisher_gid", gid_buf);
100+
101+
return js_info;
102+
}
103+
56104
Napi::Value CreateSubscription(const Napi::CallbackInfo& info) {
57105
Napi::Env env = info.Env();
58106

@@ -422,6 +470,7 @@ Napi::Value GetPublisherCount(const Napi::CallbackInfo& info) {
422470

423471
Napi::Object InitSubscriptionBindings(Napi::Env env, Napi::Object exports) {
424472
exports.Set("rclTake", Napi::Function::New(env, RclTake));
473+
exports.Set("rclTakeWithInfo", Napi::Function::New(env, RclTakeWithInfo));
425474
exports.Set("createSubscription",
426475
Napi::Function::New(env, CreateSubscription));
427476
exports.Set("rclTakeRaw", Napi::Function::New(env, RclTakeRaw));

0 commit comments

Comments
 (0)