Skip to content

Commit 243820c

Browse files
authored
[ISSUE #506] support send message with arbitrarily delay time (#515)
* support sync send message with arbitrarily delay time. * support sync send message with arbitrarily delay time. * fix check style error.
1 parent b008ae2 commit 243820c

3 files changed

Lines changed: 102 additions & 0 deletions

File tree

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.rocketmq.common.message.MessageBatch;
3232
import org.apache.rocketmq.common.message.MessageClientIDSetter;
3333
import org.apache.rocketmq.common.message.MessageExt;
34+
import org.apache.rocketmq.spring.support.DelayMode;
3435
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
3536
import org.apache.rocketmq.spring.support.RocketMQUtil;
3637
import org.slf4j.Logger;
@@ -535,6 +536,77 @@ public <T extends Message> SendResult syncSend(String destination, Collection<T>
535536
}
536537
}
537538

539+
/**
540+
* Same to {@link #syncSend(String, Message)} with send delay time specified in addition.
541+
*
542+
* @param destination formats: `topicName:tags`
543+
* @param message {@link org.springframework.messaging.Message}
544+
* @param delayTime delay time in seconds for message
545+
* @return {@link SendResult}
546+
*/
547+
public SendResult syncSendDelayTimeSeconds(String destination, Message<?> message, long delayTime) {
548+
return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
549+
}
550+
551+
/**
552+
* Same to {@link #syncSend(String, Object)} with send delayTime specified in addition.
553+
*
554+
* @param destination formats: `topicName:tags`
555+
* @param payload the Object to use as payload
556+
* @param delayTime delay time in seconds for message
557+
* @return {@link SendResult}
558+
*/
559+
public SendResult syncSendDelayTimeSeconds(String destination, Object payload, long delayTime) {
560+
Message<?> message = MessageBuilder.withPayload(payload).build();
561+
return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
562+
}
563+
564+
/**
565+
* Same to {@link #syncSend(String, Message)} with send timeout and delay time specified in addition.
566+
*
567+
* @param destination formats: `topicName:tags`
568+
* @param message {@link org.springframework.messaging.Message}
569+
* @param timeout send timeout with millis
570+
* @param delayTime delay time for message
571+
* @return {@link SendResult}
572+
*/
573+
public SendResult syncSend(String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) {
574+
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
575+
log.error("syncSend failed. destination:{}, message is null ", destination);
576+
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
577+
}
578+
try {
579+
long now = System.currentTimeMillis();
580+
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
581+
if (delayTime > 0 && Objects.nonNull(mode)) {
582+
switch (mode) {
583+
case DELAY_SECONDS:
584+
rocketMsg.setDelayTimeSec(delayTime);
585+
break;
586+
case DELAY_MILLISECONDS:
587+
rocketMsg.setDelayTimeMs(delayTime);
588+
break;
589+
case DELIVER_TIME_MILLISECONDS:
590+
rocketMsg.setDeliverTimeMs(delayTime);
591+
break;
592+
default:
593+
log.warn("delay mode: {} not support", mode);
594+
}
595+
}
596+
SendResult sendResult = producer.send(rocketMsg, timeout);
597+
long costTime = System.currentTimeMillis() - now;
598+
if (log.isDebugEnabled()) {
599+
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
600+
}
601+
return sendResult;
602+
} catch (Exception e) {
603+
log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", destination, message, e);
604+
throw new MessagingException(e.getMessage(), e);
605+
}
606+
}
607+
608+
609+
538610
/**
539611
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
540612
*
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.spring.support;
19+
20+
public enum DelayMode {
21+
DELAY_SECONDS,
22+
DELAY_MILLISECONDS,
23+
DELIVER_TIME_MILLISECONDS,
24+
}

rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ public void testSendMessage() {
9494
} catch (MessagingException e) {
9595
assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed");
9696
}
97+
98+
try {
99+
rocketMQTemplate.syncSendDelayTimeSeconds(topic, "payload", 10L);
100+
} catch (MessagingException e) {
101+
assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed");
102+
}
97103
}
98104
@Test
99105
public void testAsyncBatchSendMessage() {

0 commit comments

Comments
 (0)