Skip to content

Commit 5b7cc78

Browse files
authored
Support feedback subscription content filter for action client (#1457)
Add enableFeedbackMsgOptimization option to ActionClient that uses the DDS content filter to avoid receiving irrelevant feedback messages when multiple action clients share the same action server. This mirrors the rclpy implementation (ros2/rclpy#1633). When enabled, goal IDs are added to the feedback subscription's content filter on goal acceptance and removed on terminal status, result receipt, or cancel. The optimization supports up to 6 concurrent goals (limited by the DDS 100-parameter maximum) and auto-disables gracefully when the limit is exceeded or the RMW does not support content filtering. Changes: - lib/action/client.js: Add content filter add/remove calls matching rclpy's pattern — direct native API calls with try/catch and warning on error, no JS-side tracking structures. - src/rcl_action_client_bindings.cpp: N-API bindings for rcl_action_client_configure_feedback_subscription_filter_add/remove_goal_id, guarded by ROS_VERSION >= 5000 (Rolling). - src/executor.cpp, src/executor.h: Add condition variable synchronization so the background thread waits after uv_async_send until the main thread finishes ExecuteReadyHandles. This prevents a data race where the background thread re-enters rcl_wait (holding subscription references) while the main thread modifies the content filter. Use RAII guard to ensure notification on exception. - types/action_client.d.ts: Add enableFeedbackMsgOptimization option. - test/test-action-client.js: 7 new tests covering default/enable flag, normal feedback, multiple goals with optimization, cancel+new goal, concurrent goals, and >6 goals overflow with auto-disable. Fix: #1456
1 parent fee38f3 commit 5b7cc78

7 files changed

Lines changed: 378 additions & 5 deletions

File tree

lib/action/client.js

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ class ActionClient extends Entity {
5151
* @param {QoS} options.qos.feedbackSubQosProfile - Quality of service option for the feedback subscription,
5252
* default: new QoS(QoS.HistoryPolicy.RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT, 10).
5353
* @param {QoS} options.qos.statusSubQosProfile - Quality of service option for the status subscription, default: QoS.profileActionStatusDefault.
54+
* @param {boolean} options.enableFeedbackMsgOptimization - Enable feedback subscription content filter to
55+
* optimize the handling of feedback messages. When enabled, the content filter is used to configure
56+
* the goal ID for the subscription, which helps avoid the reception of irrelevant feedback messages.
57+
* An action client can handle up to 6 goals simultaneously with this optimization. If the number
58+
* of goals exceeds the limit or the RMW doesn't support content filter, optimization is automatically
59+
* disabled. Default: false.
5460
*/
5561
constructor(node, typeClass, actionName, options) {
5662
super(null, null, options);
@@ -87,6 +93,15 @@ class ActionClient extends Entity {
8793
checkTypes: true,
8894
};
8995

96+
// Enable feedback subscription content filter optimization.
97+
// Only supported on ROS2 Rolling and only effective when the native
98+
// binding provides the required functions AND the RMW implementation
99+
// actually supports content filtering on the feedback subscription.
100+
this._enableFeedbackMsgOptimization =
101+
this._options.enableFeedbackMsgOptimization === true &&
102+
DistroUtils.getDistroId() >= DistroUtils.DistroId.ROLLING &&
103+
typeof rclnodejs.actionConfigureFeedbackSubFilterAddGoalId === 'function';
104+
90105
let type = this.typeClass.type();
91106

92107
this._handle = rclnodejs.actionCreateClient(
@@ -126,6 +141,7 @@ class ActionClient extends Entity {
126141
}
127142

128143
this._goalHandles.set(uuid, goalHandle);
144+
this._feedbackSubFilterAddGoalId(goalHandle.goalId);
129145
} else {
130146
// Clean up feedback callback for rejected goals
131147
let uuid = ActionUuid.fromMessage(
@@ -205,6 +221,9 @@ class ActionClient extends Entity {
205221
status === ActionInterfaces.GoalStatus.STATUS_ABORTED
206222
) {
207223
this._goalHandles.delete(uuid);
224+
this._feedbackSubFilterRemoveGoalId(
225+
statusMessage.goal_info.goal_id
226+
);
208227
}
209228
}
210229
} else {
@@ -393,6 +412,8 @@ class ActionClient extends Entity {
393412
this._removePendingCancelRequest(sequenceNumber)
394413
);
395414

415+
this._feedbackSubFilterRemoveGoalId(goalHandle.goalId);
416+
396417
return deferred.promise;
397418
}
398419

@@ -442,9 +463,10 @@ class ActionClient extends Entity {
442463
goalHandle.status = result.status;
443464
return result.result;
444465
});
445-
deferred.setDoneCallback(() =>
446-
this._removePendingResultRequest(sequenceNumber)
447-
);
466+
deferred.setDoneCallback(() => {
467+
this._removePendingResultRequest(sequenceNumber);
468+
this._feedbackSubFilterRemoveGoalId(goalHandle.goalId);
469+
});
448470

449471
this._pendingResultRequests.set(sequenceNumber, deferred);
450472

@@ -464,6 +486,42 @@ class ActionClient extends Entity {
464486
this._pendingCancelRequests.delete(sequenceNumber);
465487
}
466488

489+
/**
490+
* Add a goal ID to the feedback subscription content filter.
491+
* @ignore
492+
* @param {object} goalId - The goal UUID message.
493+
*/
494+
_feedbackSubFilterAddGoalId(goalId) {
495+
if (!this._enableFeedbackMsgOptimization) return;
496+
try {
497+
rclnodejs.actionConfigureFeedbackSubFilterAddGoalId(
498+
this.handle,
499+
Buffer.from(goalId.uuid)
500+
);
501+
} catch (e) {
502+
this._enableFeedbackMsgOptimization = false;
503+
this._node.getLogger().warn(`${e.message}`);
504+
}
505+
}
506+
507+
/**
508+
* Remove a goal ID from the feedback subscription content filter.
509+
* @ignore
510+
* @param {object} goalId - The goal UUID message.
511+
*/
512+
_feedbackSubFilterRemoveGoalId(goalId) {
513+
if (!this._enableFeedbackMsgOptimization) return;
514+
try {
515+
rclnodejs.actionConfigureFeedbackSubFilterRemoveGoalId(
516+
this.handle,
517+
Buffer.from(goalId.uuid)
518+
);
519+
} catch (e) {
520+
this._enableFeedbackMsgOptimization = false;
521+
this._node.getLogger().warn(`${e.message}`);
522+
}
523+
}
524+
467525
/**
468526
* Destroy the underlying action client handle.
469527
* @return {undefined}

src/executor.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ Executor::Executor(Napi::Env env, HandleManager* handle_manager,
4848
handle_manager_(handle_manager),
4949
delegate_(delegate),
5050
context_(nullptr),
51-
env_(env) {
51+
env_(env),
52+
work_pending_(false) {
5253
running_.store(false);
5354
}
5455

@@ -105,6 +106,8 @@ void Executor::Stop() {
105106
// Stop thread first, and then uv_close
106107
// Make sure async_ is not used anymore
107108
running_.store(false);
109+
// Wake the background thread in case it is waiting on the condvar.
110+
work_done_cv_.notify_all();
108111
handle_manager_->StopWaitingHandles();
109112
uv_thread_join(&background_thread_);
110113

@@ -133,6 +136,21 @@ bool Executor::IsMainThread() {
133136

134137
void Executor::DoWork(uv_async_t* handle) {
135138
Executor* executor = reinterpret_cast<Executor*>(handle->data);
139+
140+
// RAII guard: always clear work_pending_ and notify the background thread,
141+
// even if ExecuteReadyHandles() throws (e.g. from N-API callbacks).
142+
// Without this, the background thread would block forever on work_done_cv_.
143+
struct WorkDoneGuard {
144+
Executor* exec;
145+
~WorkDoneGuard() {
146+
{
147+
std::lock_guard<std::mutex> lock(exec->work_done_mutex_);
148+
exec->work_pending_ = false;
149+
}
150+
exec->work_done_cv_.notify_one();
151+
}
152+
} guard{executor};
153+
136154
executor->ExecuteReadyHandles();
137155
}
138156

@@ -159,7 +177,23 @@ void Executor::Run(void* arg) {
159177

160178
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(executor->async_)) &&
161179
handle_manager->ready_handles_count() > 0) {
180+
// Tell the main thread there is work to do, then wait for it to
181+
// finish before re-entering rcl_wait. This prevents a data race
182+
// where the background thread holds subscriptions in the wait set
183+
// while the main thread modifies their state (e.g. content filter).
184+
{
185+
std::lock_guard<std::mutex> lock(executor->work_done_mutex_);
186+
executor->work_pending_ = true;
187+
}
162188
uv_async_send(executor->async_);
189+
190+
// Wait until DoWork() signals completion.
191+
{
192+
std::unique_lock<std::mutex> lock(executor->work_done_mutex_);
193+
executor->work_done_cv_.wait(lock, [executor] {
194+
return !executor->work_pending_ || !executor->running_.load();
195+
});
196+
}
163197
}
164198
}
165199

src/executor.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
#include <uv.h>
2121

2222
#include <atomic>
23+
#include <condition_variable>
2324
#include <exception>
25+
#include <mutex>
2426
#include <vector>
2527

2628
#include "rcl_handle.h"
@@ -72,6 +74,15 @@ class Executor {
7274
Napi::Env env_;
7375

7476
std::atomic_bool running_;
77+
78+
// Synchronization: the background thread waits after uv_async_send until
79+
// the main thread finishes ExecuteReadyHandles. This prevents the
80+
// background thread from re-entering rcl_wait (which holds a reference to
81+
// subscriptions) while the main thread modifies subscription state (e.g.
82+
// content filter changes).
83+
std::mutex work_done_mutex_;
84+
std::condition_variable work_done_cv_;
85+
bool work_pending_; // true while the main thread is processing handles
7586
};
7687

7788
} // namespace rclnodejs

src/rcl_action_client_bindings.cpp

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,67 @@ Napi::Value ActionSendCancelRequest(const Napi::CallbackInfo& info) {
250250
return Napi::Number::New(env, static_cast<double>(sequence_number));
251251
}
252252

253+
#if ROS_VERSION >= 5000 // ROS2 Rolling
254+
Napi::Value ActionConfigureFeedbackSubFilterAddGoalId(
255+
const Napi::CallbackInfo& info) {
256+
Napi::Env env = info.Env();
257+
258+
RclHandle* action_client_handle =
259+
RclHandle::Unwrap(info[0].As<Napi::Object>());
260+
rcl_action_client_t* action_client =
261+
reinterpret_cast<rcl_action_client_t*>(action_client_handle->ptr());
262+
263+
auto goal_id_buffer = info[1].As<Napi::Buffer<uint8_t>>();
264+
const uint8_t* goal_id_array = goal_id_buffer.Data();
265+
size_t goal_id_size = goal_id_buffer.Length();
266+
267+
rcl_ret_t ret =
268+
rcl_action_client_configure_feedback_subscription_filter_add_goal_id(
269+
action_client, goal_id_array, goal_id_size);
270+
271+
if (RCL_RET_OK != ret) {
272+
std::string error_text{
273+
"Failed to add goal id to feedback subscription content filter: "};
274+
error_text += rcl_get_error_string().str;
275+
rcl_reset_error();
276+
Napi::Error::New(env, error_text).ThrowAsJavaScriptException();
277+
return Napi::Boolean::New(env, false);
278+
}
279+
280+
return Napi::Boolean::New(env, true);
281+
}
282+
283+
Napi::Value ActionConfigureFeedbackSubFilterRemoveGoalId(
284+
const Napi::CallbackInfo& info) {
285+
Napi::Env env = info.Env();
286+
287+
RclHandle* action_client_handle =
288+
RclHandle::Unwrap(info[0].As<Napi::Object>());
289+
rcl_action_client_t* action_client =
290+
reinterpret_cast<rcl_action_client_t*>(action_client_handle->ptr());
291+
292+
auto goal_id_buffer = info[1].As<Napi::Buffer<uint8_t>>();
293+
const uint8_t* goal_id_array = goal_id_buffer.Data();
294+
size_t goal_id_size = goal_id_buffer.Length();
295+
296+
rcl_ret_t ret =
297+
rcl_action_client_configure_feedback_subscription_filter_remove_goal_id(
298+
action_client, goal_id_array, goal_id_size);
299+
300+
if (RCL_RET_OK != ret) {
301+
std::string error_text{
302+
"Failed to remove goal id from feedback subscription content "
303+
"filter: "};
304+
error_text += rcl_get_error_string().str;
305+
rcl_reset_error();
306+
Napi::Error::New(env, error_text).ThrowAsJavaScriptException();
307+
return Napi::Boolean::New(env, false);
308+
}
309+
310+
return Napi::Boolean::New(env, true);
311+
}
312+
#endif // ROS_VERSION >= 5000
313+
253314
#if ROS_VERSION >= 2505 // ROS2 >= Kilted
254315
Napi::Value ConfigureActionClientIntrospection(const Napi::CallbackInfo& info) {
255316
Napi::Env env = info.Env();
@@ -307,7 +368,15 @@ Napi::Object InitActionClientBindings(Napi::Env env, Napi::Object exports) {
307368
#if ROS_VERSION >= 2505 // ROS2 >= Kilted
308369
exports.Set("configureActionClientIntrospection",
309370
Napi::Function::New(env, ConfigureActionClientIntrospection));
310-
#endif // ROS_VERSION >= 2505
371+
#endif // ROS_VERSION >= 2505
372+
#if ROS_VERSION >= 5000 // ROS2 Rolling
373+
exports.Set(
374+
"actionConfigureFeedbackSubFilterAddGoalId",
375+
Napi::Function::New(env, ActionConfigureFeedbackSubFilterAddGoalId));
376+
exports.Set(
377+
"actionConfigureFeedbackSubFilterRemoveGoalId",
378+
Napi::Function::New(env, ActionConfigureFeedbackSubFilterRemoveGoalId));
379+
#endif // ROS_VERSION >= 5000
311380
return exports;
312381
}
313382

0 commit comments

Comments
 (0)