Skip to content

Commit 025d58d

Browse files
zhangjidi2016zhangjidi2016
andauthored
[ISSUE #377] Add the replyTimeout configuration parameter (#384)
* [ISSUE #377]Add the replyTimeout configuration parameter * add unit test Co-authored-by: zhangjidi2016 <zhangjidi@cmss.chinamobile.com>
1 parent b35e3c5 commit 025d58d

4 files changed

Lines changed: 63 additions & 4 deletions

File tree

rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
* The consumer that replying String
2626
*/
2727
@Service
28-
@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
28+
@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}",
29+
selectorExpression = "${demo.rocketmq.tag}", replyTimeout = 10000)
2930
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
3031

3132
@Override

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@
8888
*/
8989
long consumeTimeout() default 15L;
9090

91+
/**
92+
* Timeout for sending reply messages.
93+
*/
94+
int replyTimeout() default 3000;
95+
9196
/**
9297
* The property of "access-key".
9398
*/

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
3535
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
3636
import org.apache.rocketmq.client.exception.MQClientException;
37+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
3738
import org.apache.rocketmq.client.producer.SendCallback;
3839
import org.apache.rocketmq.client.producer.SendResult;
3940
import org.apache.rocketmq.client.producer.SendStatus;
@@ -121,6 +122,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
121122
private MessageModel messageModel;
122123
private long consumeTimeout;
123124
private int maxReconsumeTimes;
125+
private int replyTimeout;
124126

125127
public long getSuspendCurrentQueueTimeMillis() {
126128
return suspendCurrentQueueTimeMillis;
@@ -221,6 +223,7 @@ public void setRocketMQMessageListener(RocketMQMessageListener anno) {
221223
this.selectorExpression = anno.selectorExpression();
222224
this.consumeTimeout = anno.consumeTimeout();
223225
this.maxReconsumeTimes = anno.maxReconsumeTimes();
226+
this.replyTimeout = anno.replyTimeout();
224227
}
225228

226229
public ConsumeMode getConsumeMode() {
@@ -399,7 +402,9 @@ private void handleMessage(
399402
Message<?> message = MessageBuilder.withPayload(replyContent).build();
400403

401404
org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
402-
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
405+
DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
406+
producer.setSendMsgTimeout(replyTimeout);
407+
producer.send(replyMessage, new SendCallback() {
403408
@Override public void onSuccess(SendResult sendResult) {
404409
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
405410
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());

rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,15 @@
2020
import java.lang.reflect.Method;
2121
import java.lang.reflect.ParameterizedType;
2222

23+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
24+
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
25+
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
26+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
27+
import org.apache.rocketmq.client.producer.SendCallback;
2328
import org.apache.rocketmq.common.message.Message;
29+
import org.apache.rocketmq.common.message.MessageAccessor;
30+
import org.apache.rocketmq.common.message.MessageConst;
2431
import org.apache.rocketmq.common.message.MessageExt;
25-
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
2632
import org.apache.rocketmq.spring.core.RocketMQListener;
2733
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
2834
import org.junit.Test;
@@ -34,9 +40,12 @@
3440
import java.util.ArrayList;
3541
import java.util.Arrays;
3642
import java.util.Date;
37-
import org.springframework.messaging.support.MessageBuilder;
3843

3944
import static org.assertj.core.api.Assertions.assertThat;
45+
import static org.mockito.ArgumentMatchers.any;
46+
import static org.mockito.Mockito.doNothing;
47+
import static org.mockito.Mockito.mock;
48+
import static org.mockito.Mockito.when;
4049

4150
public class DefaultRocketMQListenerContainerTest {
4251
@Test
@@ -184,6 +193,45 @@ public String onMessage(ArrayList<Date> message) {
184193
assertThat(methodParameter.getParameterType() == ArrayList.class);
185194
}
186195

196+
@Test
197+
public void testHandleMessage() throws Exception {
198+
DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
199+
Method handleMessage = DefaultRocketMQListenerContainer.class.getDeclaredMethod("handleMessage", MessageExt.class);
200+
handleMessage.setAccessible(true);
201+
listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
202+
@Override
203+
public void onMessage(String message) {
204+
}
205+
});
206+
Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
207+
messageType.setAccessible(true);
208+
messageType.set(listenerContainer, String.class);
209+
MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
210+
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CLUSTER, "defaultCluster");
211+
messageExt.setBody("hello".getBytes());
212+
handleMessage.invoke(listenerContainer, messageExt);
213+
214+
// reply message
215+
listenerContainer.setRocketMQListener(null);
216+
DefaultMQPushConsumer consumer = mock(DefaultMQPushConsumer.class);
217+
DefaultMQPushConsumerImpl pushConsumer = mock(DefaultMQPushConsumerImpl.class);
218+
MQClientInstance mqClientInstance = mock(MQClientInstance.class);
219+
DefaultMQProducer producer = mock(DefaultMQProducer.class);
220+
when(consumer.getDefaultMQPushConsumerImpl()).thenReturn(pushConsumer);
221+
when(pushConsumer.getmQClientFactory()).thenReturn(mqClientInstance);
222+
when(mqClientInstance.getDefaultMQProducer()).thenReturn(producer);
223+
listenerContainer.setConsumer(consumer);
224+
listenerContainer.setMessageConverter(new CompositeMessageConverter(Arrays.asList(new StringMessageConverter(), new MappingJackson2MessageConverter())));
225+
doNothing().when(producer).send(any(MessageExt.class), any(SendCallback.class));
226+
listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<String, String>() {
227+
@Override
228+
public String onMessage(String message) {
229+
return "test";
230+
}
231+
});
232+
handleMessage.invoke(listenerContainer, messageExt);
233+
}
234+
187235
class User {
188236
private String userName;
189237
private int userAge;

0 commit comments

Comments
 (0)