Skip to content

Commit 0aaf9f9

Browse files
authored
[ISSUE #660] Add namespace in java client (#661)
* Add namespace for java client * Add checkNotNull
1 parent 2d3cdf7 commit 0aaf9f9

15 files changed

Lines changed: 140 additions & 56 deletions

File tree

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,19 @@ public class ClientConfiguration {
2828
private final SessionCredentialsProvider sessionCredentialsProvider;
2929
private final Duration requestTimeout;
3030
private final boolean sslEnabled;
31+
private final String namespace;
3132

3233
/**
3334
* The caller is supposed to have validated the arguments and handled throwing exceptions or
3435
* logging warnings already, so we avoid repeating args check here.
3536
*/
3637
ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider,
37-
Duration requestTimeout, boolean sslEnabled) {
38+
Duration requestTimeout, boolean sslEnabled, String namespace) {
3839
this.endpoints = endpoints;
3940
this.sessionCredentialsProvider = sessionCredentialsProvider;
4041
this.requestTimeout = requestTimeout;
4142
this.sslEnabled = sslEnabled;
43+
this.namespace = namespace;
4244
}
4345

4446
public static ClientConfigurationBuilder newBuilder() {
@@ -60,4 +62,8 @@ public Duration getRequestTimeout() {
6062
public boolean isSslEnabled() {
6163
return sslEnabled;
6264
}
65+
66+
public String getNamespace() {
67+
return namespace;
68+
}
6369
}

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class ClientConfigurationBuilder {
3131
private SessionCredentialsProvider sessionCredentialsProvider = null;
3232
private Duration requestTimeout = Duration.ofSeconds(3);
3333
private boolean sslEnabled = true;
34+
private String namespace = "";
3435

3536
/**
3637
* Configure the access point with which the SDK should communicate.
@@ -82,6 +83,16 @@ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) {
8283
return this;
8384
}
8485

86+
/**
87+
* Configure namespace for client
88+
* @param namespace namespace
89+
* @return The {@link ClientConfigurationBuilder} instance, to allow for method chaining.
90+
*/
91+
public ClientConfigurationBuilder setNamespace(String namespace) {
92+
this.namespace = checkNotNull(namespace, "namespace should not be null");
93+
return this;
94+
}
95+
8596
/**
8697
* Finalize the build of {@link ClientConfiguration}.
8798
*
@@ -90,6 +101,6 @@ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) {
90101
public ClientConfiguration build() {
91102
checkNotNull(endpoints, "endpoints should not be null");
92103
checkNotNull(requestTimeout, "requestTimeout should not be null");
93-
return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled);
104+
return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled, namespace);
94105
}
95106
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,10 @@ public void onFailure(Throwable t) {
608608
}
609609

610610
protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
611-
Resource topicResource = Resource.newBuilder().setName(topic).build();
611+
Resource topicResource = Resource.newBuilder()
612+
.setResourceNamespace(clientConfiguration.getNamespace())
613+
.setName(topic)
614+
.build();
612615
final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
613616
.setEndpoints(endpoints.toProtobuf()).build();
614617
final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =

java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,26 @@
2525
import org.apache.rocketmq.client.java.route.Endpoints;
2626

2727
public abstract class Settings {
28+
protected final String namespace;
2829
protected final ClientId clientId;
2930
protected final ClientType clientType;
3031
protected final Endpoints accessPoint;
3132
protected volatile RetryPolicy retryPolicy;
3233
protected final Duration requestTimeout;
3334

34-
public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, RetryPolicy retryPolicy,
35-
Duration requestTimeout) {
35+
public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
36+
RetryPolicy retryPolicy, Duration requestTimeout) {
37+
this.namespace = namespace;
3638
this.clientId = clientId;
3739
this.clientType = clientType;
3840
this.accessPoint = accessPoint;
3941
this.retryPolicy = retryPolicy;
4042
this.requestTimeout = requestTimeout;
4143
}
4244

43-
public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, Duration requestTimeout) {
44-
this(clientId, clientType, accessPoint, null, requestTimeout);
45+
public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
46+
Duration requestTimeout) {
47+
this(namespace, clientId, clientType, accessPoint, null, requestTimeout);
4548
}
4649

4750
public abstract apache.rocketmq.v2.Settings toProtobuf();

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRe
123123
}
124124

125125
private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {
126-
final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
126+
final Resource topicResource = Resource.newBuilder()
127+
.setResourceNamespace(clientConfiguration.getNamespace())
128+
.setName(messageView.getTopic())
129+
.build();
127130
final AckMessageEntry entry = AckMessageEntry.newBuilder()
128131
.setMessageId(messageView.getMessageId().toString())
129132
.setReceiptHandle(messageView.getReceiptHandle())
@@ -134,7 +137,9 @@ private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {
134137

135138
private ChangeInvisibleDurationRequest wrapChangeInvisibleDuration(MessageViewImpl messageView,
136139
Duration invisibleDuration) {
137-
final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
140+
final Resource topicResource = Resource.newBuilder()
141+
.setResourceNamespace(clientConfiguration.getNamespace())
142+
.setName(messageView.getTopic()).build();
138143
return ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
139144
.setReceiptHandle(messageView.getReceiptHandle())
140145
.setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos()))
@@ -219,7 +224,10 @@ public void onFailure(Throwable t) {
219224
}
220225

221226
protected Resource getProtobufGroup() {
222-
return Resource.newBuilder().setName(consumerGroup).build();
227+
return Resource.newBuilder()
228+
.setResourceNamespace(clientConfiguration.getNamespace())
229+
.setName(consumerGroup)
230+
.build();
223231
}
224232

225233
@Override

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
127127
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
128128
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
129129
this.clientConfiguration = clientConfiguration;
130-
Resource groupResource = new Resource(consumerGroup);
131-
this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId, endpoints, groupResource,
132-
clientConfiguration.getRequestTimeout(), subscriptionExpressions);
130+
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
131+
this.pushSubscriptionSettings = new PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
132+
endpoints, groupResource, clientConfiguration.getRequestTimeout(), subscriptionExpressions);
133133
this.consumerGroup = consumerGroup;
134134
this.subscriptionExpressions = subscriptionExpressions;
135135
this.cacheAssignments = new ConcurrentHashMap<>();
@@ -261,7 +261,10 @@ private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic
261261
}
262262

263263
private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
264-
apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
264+
apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder()
265+
.setResourceNamespace(clientConfiguration.getNamespace())
266+
.setName(topic)
267+
.build();
265268
return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
266269
.setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
267270
}
@@ -500,7 +503,10 @@ public void onFailure(Throwable t) {
500503
private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest(
501504
MessageViewImpl messageView) {
502505
final apache.rocketmq.v2.Resource topicResource =
503-
apache.rocketmq.v2.Resource.newBuilder().setName(messageView.getTopic()).build();
506+
apache.rocketmq.v2.Resource.newBuilder()
507+
.setResourceNamespace(clientConfiguration.getNamespace())
508+
.setName(messageView.getTopic())
509+
.build();
504510
return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
505511
.setReceiptHandle(messageView.getReceiptHandle())
506512
.setMessageId(messageView.getMessageId().toString())

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public class PushSubscriptionSettings extends Settings {
5050
private volatile int receiveBatchSize = 32;
5151
private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
5252

53-
public PushSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
53+
public PushSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
5454
Duration requestTimeout, Map<String, FilterExpression> subscriptionExpression) {
55-
super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
55+
super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
5656
this.group = group;
5757
this.subscriptionExpressions = subscriptionExpression;
5858
}
@@ -75,7 +75,10 @@ public apache.rocketmq.v2.Settings toProtobuf() {
7575
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
7676
final FilterExpression filterExpression = entry.getValue();
7777
apache.rocketmq.v2.Resource topic =
78-
apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
78+
apache.rocketmq.v2.Resource.newBuilder()
79+
.setResourceNamespace(namespace)
80+
.setName(entry.getKey())
81+
.build();
7982
final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
8083
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
8184
final FilterExpressionType type = filterExpression.getFilterExpressionType();

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
7474
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
7575
Map<String, FilterExpression> subscriptionExpressions) {
7676
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
77-
Resource groupResource = new Resource(consumerGroup);
78-
this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId, endpoints,
79-
groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
77+
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
78+
this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
79+
endpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
8080
this.consumerGroup = consumerGroup;
8181
this.awaitDuration = awaitDuration;
8282

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public class SimpleSubscriptionSettings extends Settings {
4545
private final Duration longPollingTimeout;
4646
private final Map<String, FilterExpression> subscriptionExpressions;
4747

48-
public SimpleSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
48+
public SimpleSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
4949
Duration requestTimeout, Duration longPollingTimeout, Map<String, FilterExpression> subscriptionExpression) {
50-
super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
50+
super(namespace, clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
5151
this.group = group;
5252
this.subscriptionExpressions = subscriptionExpression;
5353
this.longPollingTimeout = longPollingTimeout;
@@ -59,7 +59,9 @@ public apache.rocketmq.v2.Settings toProtobuf() {
5959
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
6060
final FilterExpression filterExpression = entry.getValue();
6161
apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder()
62-
.setName(entry.getKey()).build();
62+
.setResourceNamespace(namespace)
63+
.setName(entry.getKey())
64+
.build();
6365
final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
6466
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
6567
final FilterExpressionType type = filterExpression.getFilterExpressionType();

java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ class ProducerImpl extends ClientImpl implements Producer {
101101
TransactionChecker checker) {
102102
super(clientConfiguration, topics);
103103
ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
104-
this.publishingSettings = new PublishingSettings(clientId, endpoints, retryPolicy,
105-
clientConfiguration.getRequestTimeout(), topics);
104+
this.publishingSettings = new PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
105+
retryPolicy, clientConfiguration.getRequestTimeout(), topics);
106106
this.checker = checker;
107107
this.publishingRouteDataCache = new ConcurrentHashMap<>();
108108
}
@@ -259,7 +259,10 @@ public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, M
259259
String transactionId, final TransactionResolution resolution) throws ClientException {
260260
final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
261261
.setMessageId(messageId.toString()).setTransactionId(transactionId)
262-
.setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
262+
.setTopic(apache.rocketmq.v2.Resource.newBuilder()
263+
.setResourceNamespace(clientConfiguration.getNamespace())
264+
.setName(generalMessage.getTopic())
265+
.build());
263266
switch (resolution) {
264267
case COMMIT:
265268
builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
@@ -415,7 +418,8 @@ private ListenableFuture<List<SendReceiptImpl>> send(List<Message> messages, boo
415418
*/
416419
private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
417420
final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
418-
.map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
421+
.map(publishingMessage -> publishingMessage.toProtobuf(clientConfiguration.getNamespace(), mq))
422+
.collect(Collectors.toList());
419423
return SendMessageRequest.newBuilder().addAllMessages(messages).build();
420424
}
421425

0 commit comments

Comments
 (0)