Skip to content

Commit 39a0314

Browse files
feat: expose cluster ID via getClusterId() and describeCluster()
librdkafka provides rd_kafka_clusterid() to retrieve the cluster ID from broker metadata, but the Node.js binding never exposed it. This change: - Adds NodeGetClusterId NAN_METHOD to Connection (src/connection.cc) which calls RdKafka::Handle::clusterid() - Registers getClusterId on Producer, KafkaConsumer, and AdminClient native prototypes - Adds Client.prototype.getClusterId() JS wrapper (lib/client.js) - Adds admin.describeCluster() to the KafkaJS-compatible admin API (lib/kafkajs/_admin.js), matching the KafkaJS interface - Adds test for describeCluster Closes #28 (partially - adds describeCluster admin operation) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4a58328 commit 39a0314

8 files changed

Lines changed: 142 additions & 0 deletions

File tree

lib/client.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,23 @@ Client.prototype.setSaslCredentials = function(username, password) {
580580
this._client.setSaslCredentials(username, password);
581581
};
582582

583+
/**
584+
* Get the cluster ID reported by the broker metadata.
585+
*
586+
* Returns null if the client is not connected or if the cluster ID
587+
* could not be retrieved within the given timeout.
588+
*
589+
* @param {number} timeout - Timeout in milliseconds (default: 200)
590+
* @returns {string|null} - The cluster ID, or null
591+
*/
592+
Client.prototype.getClusterId = function(timeout) {
593+
if (!this.isConnected()) {
594+
return null;
595+
}
596+
597+
return this._client.getClusterId(timeout) || null;
598+
};
599+
583600
/**
584601
* Wrap a potential RdKafka error.
585602
*

lib/kafkajs/_admin.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,54 @@ class Admin {
513513
});
514514
}
515515

516+
/**
517+
* Describe the Kafka cluster.
518+
*
519+
* Returns the cluster ID, the controller broker, and the list
520+
* of brokers in the cluster.
521+
*
522+
* @param {object?} options
523+
* @param {number?} options.timeout - Timeout in ms (default: 5000).
524+
* @returns {Promise<{clusterId: string, controller: number,
525+
* brokers: Array<{nodeId: number, host: string, port: number}>}>}
526+
*/
527+
async describeCluster(options = {}) {
528+
if (this.#state !== AdminState.CONNECTED) {
529+
throw new error.KafkaJSError(
530+
"Admin client is not connected.",
531+
{ code: error.ErrorCodes.ERR__STATE }
532+
);
533+
}
534+
535+
const timeout = options.timeout ?? 5000;
536+
537+
const clusterId =
538+
this.#internalClient.getClusterId(timeout) || null;
539+
540+
return new Promise((resolve, reject) => {
541+
this.#internalClient.getMetadata(
542+
{ allTopics: true, timeout },
543+
(err, metadata) => {
544+
if (err) {
545+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
546+
} else {
547+
resolve({
548+
clusterId,
549+
controller:
550+
metadata.orig_broker_id != null
551+
? metadata.orig_broker_id : null,
552+
brokers: (metadata.brokers || []).map(b => ({
553+
nodeId: b.id,
554+
host: b.host,
555+
port: b.port,
556+
})),
557+
});
558+
}
559+
}
560+
);
561+
});
562+
}
563+
516564
/**
517565
* Fetch the offsets for topic partition(s) for consumer group(s).
518566
*

src/admin.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
136136
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
137137
Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials);
138138
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
139+
Nan::SetPrototypeMethod(tpl, "getClusterId", NodeGetClusterId);
139140
Nan::SetPrototypeMethod(tpl, "setOAuthBearerToken", NodeSetOAuthBearerToken);
140141
Nan::SetPrototypeMethod(tpl, "setOAuthBearerTokenFailure",
141142
NodeSetOAuthBearerTokenFailure);

src/connection.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,4 +700,30 @@ NAN_METHOD(Connection::NodeName) {
700700
info.GetReturnValue().Set(Nan::New(name).ToLocalChecked());
701701
}
702702

703+
NAN_METHOD(Connection::NodeGetClusterId) {
704+
Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());
705+
706+
int timeout_ms = 200;
707+
if (info[0]->IsNumber()) {
708+
Nan::Maybe<int64_t> maybeTimeout = Nan::To<int64_t>(info[0]);
709+
if (!maybeTimeout.IsNothing()) {
710+
timeout_ms = static_cast<int>(maybeTimeout.FromJust());
711+
}
712+
}
713+
714+
if (!obj->IsConnected()) {
715+
info.GetReturnValue().Set(Nan::Null());
716+
return;
717+
}
718+
719+
std::string cluster_id = obj->m_client->clusterid(timeout_ms);
720+
if (cluster_id.empty()) {
721+
info.GetReturnValue().Set(Nan::Null());
722+
return;
723+
}
724+
725+
info.GetReturnValue().Set(
726+
Nan::New<v8::String>(cluster_id).ToLocalChecked());
727+
}
728+
703729
} // namespace NodeKafka

src/connection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ class Connection : public Nan::ObjectWrap {
110110
static NAN_METHOD(NodeSetOAuthBearerToken);
111111
static NAN_METHOD(NodeSetOAuthBearerTokenFailure);
112112
static NAN_METHOD(NodeName);
113+
static NAN_METHOD(NodeGetClusterId);
113114
};
114115

115116
} // namespace NodeKafka

src/kafka-consumer.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
546546
Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
547547
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
548548
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
549+
Nan::SetPrototypeMethod(tpl, "getClusterId", NodeGetClusterId);
549550
Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT
550551
Nan::SetPrototypeMethod(tpl, "offsetsForTimes", NodeOffsetsForTimes);
551552
Nan::SetPrototypeMethod(tpl, "getWatermarkOffsets", NodeGetWatermarkOffsets);

src/producer.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ void Producer::Init(v8::Local<v8::Object> exports) {
7070
Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
7171
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
7272
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
73+
Nan::SetPrototypeMethod(tpl, "getClusterId", NodeGetClusterId);
7374
Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT
7475
Nan::SetPrototypeMethod(tpl, "poll", NodePoll);
7576
Nan::SetPrototypeMethod(tpl, "setPollInBackground", NodeSetPollInBackground);
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
jest.setTimeout(30000);
2+
3+
const {
4+
createAdmin,
5+
} = require('../testhelpers');
6+
7+
describe('Admin > describeCluster', () => {
8+
let admin;
9+
10+
beforeEach(async () => {
11+
admin = createAdmin({});
12+
});
13+
14+
afterEach(async () => {
15+
admin && (await admin.disconnect());
16+
});
17+
18+
it('should fail if not connected', async () => {
19+
await expect(admin.describeCluster()).rejects.toHaveProperty(
20+
'code',
21+
-172 // ERR__STATE
22+
);
23+
});
24+
25+
it('should describe the cluster', async () => {
26+
await admin.connect();
27+
28+
const result = await admin.describeCluster();
29+
30+
expect(result).toEqual(
31+
expect.objectContaining({
32+
clusterId: expect.any(String),
33+
controller: expect.any(Number),
34+
brokers: expect.arrayContaining([
35+
expect.objectContaining({
36+
nodeId: expect.any(Number),
37+
host: expect.any(String),
38+
port: expect.any(Number),
39+
}),
40+
]),
41+
})
42+
);
43+
44+
expect(result.clusterId.length).toBeGreaterThan(0);
45+
expect(result.brokers.length).toBeGreaterThan(0);
46+
});
47+
});

0 commit comments

Comments
 (0)