Skip to content

Commit f437f52

Browse files
authored
[ISSUE #654] Support namespace for rocketmq-v5-client-spring-boot and rocketmq-spring-boot (#655)
* Add namespace * Add namespace in toString
1 parent 6f2cef6 commit f437f52

19 files changed

Lines changed: 131 additions & 7 deletions

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@
116116
*/
117117
String namespace() default "";
118118

119+
/**
120+
* The namespace v2 version of consumer, it can not be used in combination with namespace.
121+
*/
122+
String namespaceV2() default "";
123+
119124
/**
120125
* The property of "instanceName".
121126
*/

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,17 @@
8989
* The property of "tlsEnable" default false.
9090
*/
9191
String tlsEnable() default "false";
92+
9293
/**
9394
* The namespace of producer.
9495
*/
9596
String namespace() default "";
9697

98+
/**
99+
* The namespace v2 version of producer, it can not be used in combination with namespace.
100+
*/
101+
String namespaceV2() default "";
102+
97103
/**
98104
* The property of "instanceName".
99105
*/

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@
142142
*/
143143
String namespace() default "";
144144

145+
/**
146+
* The namespace V2 version of listener, it can not be used in combination with namespace.
147+
*/
148+
String namespaceV2() default "";
149+
145150
/**
146151
* Message consume retry strategy in concurrently mode.
147152
*

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ private DefaultLitePullConsumer createConsumer(ExtRocketMQConsumerConfiguration
132132
litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(), consumerConfig.getCustomizedTraceTopic()));
133133
String namespace = environment.resolvePlaceholders(annotation.namespace());
134134
litePullConsumer.setNamespace(RocketMQUtil.getNamespace(namespace, consumerConfig.getNamespace()));
135+
String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2());
136+
litePullConsumer.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2, consumerConfig.getNamespaceV2()));
135137
litePullConsumer.setInstanceName(annotation.instanceName());
136138
return litePullConsumer;
137139
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota
129129
producer.setUseTLS(useTLS);
130130
String namespace = environment.resolvePlaceholders(annotation.namespace());
131131
producer.setNamespace(RocketMQUtil.getNamespace(namespace, producerConfig.getNamespace()));
132+
String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2());
133+
producer.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2, producerConfig.getNamespaceV2()));
132134
producer.setInstanceName(annotation.instanceName());
133135
return producer;
134136
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties
118118
if (StringUtils.hasText(producerConfig.getNamespace())) {
119119
producer.setNamespace(producerConfig.getNamespace());
120120
}
121+
if (StringUtils.hasText(producerConfig.getNamespaceV2())) {
122+
producer.setNamespaceV2(producerConfig.getNamespaceV2());
123+
}
121124
producer.setInstanceName(producerConfig.getInstanceName());
122125
log.info("a producer ({}) init on namesrv {}", groupName, nameServer);
123126
return producer;
@@ -152,6 +155,9 @@ public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocket
152155
if (StringUtils.hasText(consumerConfig.getNamespace())) {
153156
litePullConsumer.setNamespace(consumerConfig.getNamespace());
154157
}
158+
if (StringUtils.hasText(consumerConfig.getNamespaceV2())) {
159+
litePullConsumer.setNamespaceV2(consumerConfig.getNamespaceV2());
160+
}
155161
litePullConsumer.setInstanceName(consumerConfig.getInstanceName());
156162
log.info("a pull consumer({} sub {}) init on namesrv {}", groupName, topicName, nameServer);
157163
return litePullConsumer;

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ public static class Producer {
108108
*/
109109
private String namespace;
110110

111+
/**
112+
* The namespace v2 version of producer, it can not be used in combination with namespace.
113+
*/
114+
private String namespaceV2;
115+
111116
/**
112117
* Millis of send message timeout.
113118
*/
@@ -274,6 +279,14 @@ public void setNamespace(String namespace) {
274279
this.namespace = namespace;
275280
}
276281

282+
public String getNamespaceV2() {
283+
return namespaceV2;
284+
}
285+
286+
public void setNamespaceV2(String namespaceV2) {
287+
this.namespaceV2 = namespaceV2;
288+
}
289+
277290
public String getInstanceName() {
278291
return instanceName;
279292
}
@@ -294,6 +307,11 @@ public static class PullConsumer {
294307
*/
295308
private String namespace;
296309

310+
/**
311+
* The namespace v2 version of consumer, it can not be used in combination with namespace.
312+
*/
313+
private String namespaceV2;
314+
297315
/**
298316
* Topic name of consumer.
299317
*/
@@ -445,6 +463,14 @@ public void setNamespace(String namespace) {
445463
this.namespace = namespace;
446464
}
447465

466+
public String getNamespaceV2() {
467+
return namespaceV2;
468+
}
469+
470+
public void setNamespaceV2(String namespaceV2) {
471+
this.namespaceV2 = namespaceV2;
472+
}
473+
448474
public String getInstanceName() {
449475
return instanceName;
450476
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
135135
private int replyTimeout;
136136
private String tlsEnable;
137137
private String namespace;
138+
private String namespaceV2;
138139
private long awaitTerminationMillisWhenShutdown;
139140

140141
private String instanceName;
@@ -246,6 +247,7 @@ public void setRocketMQMessageListener(RocketMQMessageListener anno) {
246247
this.replyTimeout = anno.replyTimeout();
247248
this.tlsEnable = anno.tlsEnable();
248249
this.namespace = anno.namespace();
250+
this.namespaceV2 = anno.namespaceV2();
249251
this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
250252
this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
251253
this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
@@ -288,6 +290,14 @@ public void setNamespace(String namespace) {
288290
this.namespace = namespace;
289291
}
290292

293+
public String getNamespaceV2() {
294+
return namespaceV2;
295+
}
296+
297+
public void setNamespaceV2(String namespaceV2) {
298+
this.namespaceV2 = namespaceV2;
299+
}
300+
291301
public DefaultMQPushConsumer getConsumer() {
292302
return consumer;
293303
}
@@ -394,6 +404,7 @@ public String toString() {
394404
return "DefaultRocketMQListenerContainer{" +
395405
"consumerGroup='" + consumerGroup + '\'' +
396406
", namespace='" + namespace + '\'' +
407+
", namespaceV2='" + namespaceV2 + '\'' +
397408
", nameServer='" + nameServer + '\'' +
398409
", topic='" + topic + '\'' +
399410
", consumeMode=" + consumeMode +
@@ -631,6 +642,7 @@ private void initRocketMQPushConsumer() throws MQClientException {
631642
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
632643
}
633644
consumer.setNamespace(namespace);
645+
consumer.setNamespaceV2(namespaceV2);
634646

635647
String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
636648
if (customizedNameServer != null) {

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String
151151
String namespace = environment.resolvePlaceholders(annotation.namespace());
152152
container.setNamespace(RocketMQUtil.getNamespace(namespace,
153153
rocketMQProperties.getConsumer().getNamespace()));
154+
155+
String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2());
156+
container.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2,
157+
rocketMQProperties.getConsumer().getNamespaceV2()));
154158
return container;
155159
}
156160

rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ public void testSetRocketMQMessageListener() {
251251
assertEquals(anno.selectorExpression(), container.getSelectorExpression());
252252
assertEquals(anno.tlsEnable(), container.getTlsEnable());
253253
assertEquals(anno.namespace(), container.getNamespace());
254+
assertEquals(anno.namespaceV2(), container.getNamespaceV2());
254255
assertEquals(anno.delayLevelWhenNextConsume(), container.getDelayLevelWhenNextConsume());
255256
assertEquals(anno.suspendCurrentQueueTimeMillis(), container.getSuspendCurrentQueueTimeMillis());
256257
assertEquals(anno.instanceName(), container.getInstanceName());
@@ -264,6 +265,7 @@ public void testSetRocketMQMessageListener() {
264265
selectorExpression = "selectorExpression",
265266
tlsEnable = "tlsEnable",
266267
namespace = "namespace",
268+
namespaceV2 = "namespaceV2",
267269
delayLevelWhenNextConsume = 1234,
268270
suspendCurrentQueueTimeMillis = 2345,
269271
instanceName = "instanceName"

0 commit comments

Comments
 (0)