Skip to content

Commit 1149a9f

Browse files
authored
AMQ-9829 Track prefetched messages for duplicate suppression during failover (#1616)
During failover in transacted sessions with async dispatch (MessageListener), prefetched messages sitting in the unconsumedMessages buffer were not being tracked in previouslyDeliveredMessages. This caused them to be incorrectly identified as duplicates on redelivery and poison-acked to the DLQ.
1 parent e1500b9 commit 1149a9f

3 files changed

Lines changed: 409 additions & 2 deletions

File tree

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public PreviouslyDeliveredMap(TransactionId transactionId) {
113113
class PreviouslyDelivered {
114114
org.apache.activemq.command.Message message;
115115
boolean redelivered;
116+
boolean prefetchedOnly; // true if message was only prefetched, not delivered to application
116117

117118
PreviouslyDelivered(MessageDispatch messageDispatch) {
118119
message = messageDispatch.getMessage();
@@ -122,6 +123,12 @@ class PreviouslyDelivered {
122123
message = messageDispatch.getMessage();
123124
this.redelivered = redelivered;
124125
}
126+
127+
PreviouslyDelivered(MessageDispatch messageDispatch, boolean redelivered, boolean prefetchedOnly) {
128+
message = messageDispatch.getMessage();
129+
this.redelivered = redelivered;
130+
this.prefetchedOnly = prefetchedOnly;
131+
}
125132
}
126133

127134
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
@@ -770,8 +777,12 @@ void clearMessagesInProgress() {
770777
LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size());
771778
// ensure unconsumed are rolledback up front as they may get redelivered to another consumer
772779
List<MessageDispatch> list = unconsumedMessages.removeAll();
780+
final boolean isTransacted = session.isTransacted();
773781
if (!this.info.isBrowser()) {
774782
for (MessageDispatch old : list) {
783+
if (isTransacted) {
784+
capturePrefetchedMessagesForDuplicateSuppression(old);
785+
}
775786
session.connection.rollbackDuplicate(this, old.getMessage());
776787
}
777788
}
@@ -933,6 +944,16 @@ private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
933944
if (!isAutoAcknowledgeBatch()) {
934945
synchronized(deliveredMessages) {
935946
deliveredMessages.addFirst(md);
947+
if (session.isTransacted()) {
948+
PreviouslyDelivered entry = null;
949+
if (previouslyDeliveredMessages != null) {
950+
entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId());
951+
}
952+
if (entry != null && entry.prefetchedOnly) {
953+
entry.prefetchedOnly = false;
954+
entry.redelivered = true;
955+
}
956+
}
936957
}
937958
if (session.getTransacted()) {
938959
if (transactedIndividualAck) {
@@ -1382,6 +1403,7 @@ private void rollbackPreviouslyDeliveredAndNotRedelivered() {
13821403
removeFromDeliveredMessages(entry.message.getMessageId());
13831404
}
13841405
}
1406+
// Clear everything on rollback - prefetched messages will be redelivered by broker
13851407
clearPreviouslyDelivered();
13861408
}
13871409
}
@@ -1420,7 +1442,8 @@ public void dispatch(MessageDispatch md) {
14201442
synchronized (unconsumedMessages.getMutex()) {
14211443
if (!unconsumedMessages.isClosed()) {
14221444
// deliverySequenceId non zero means previously queued dispatch
1423-
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
1445+
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || isPrefetchedRedelivery(md)
1446+
|| !session.connection.isDuplicate(this, md.getMessage())) {
14241447
if (listener != null && unconsumedMessages.isRunning()) {
14251448
if (redeliveryExceeded(md)) {
14261449
poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
@@ -1570,6 +1593,33 @@ private void captureDeliveredMessagesForDuplicateSuppressionWithRequireRedeliver
15701593
LOG.trace("{} tracking existing transacted {} delivered list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
15711594
}
15721595

1596+
private void capturePrefetchedMessagesForDuplicateSuppression(final MessageDispatch pending) {
1597+
if (pending.getMessage() == null) {
1598+
return; // nothing to track
1599+
}
1600+
if (previouslyDeliveredMessages == null) {
1601+
previouslyDeliveredMessages = new PreviouslyDeliveredMap<>(session.getTransactionContext().getTransactionId());
1602+
}
1603+
previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new PreviouslyDelivered(pending, false, true));
1604+
LOG.trace("{} tracking existing transacted {} prefetched ({})", getConsumerId(), previouslyDeliveredMessages.transactionId, pending);
1605+
}
1606+
1607+
private boolean isPrefetchedRedelivery(final MessageDispatch md) {
1608+
if (!session.isTransacted()) {
1609+
return false;
1610+
}
1611+
if (md.getMessage() == null) {
1612+
return false;
1613+
}
1614+
synchronized (deliveredMessages) {
1615+
if (previouslyDeliveredMessages != null) {
1616+
PreviouslyDelivered entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId());
1617+
return entry != null && entry.prefetchedOnly;
1618+
}
1619+
}
1620+
return false;
1621+
}
1622+
15731623
public int getMessageSize() {
15741624
return unconsumedMessages.size();
15751625
}
@@ -1689,4 +1739,11 @@ public boolean isConsumerExpiryCheckEnabled() {
16891739
public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
16901740
this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
16911741
}
1742+
1743+
// Protected method for testing
1744+
protected int getPreviouslyDeliveredMessagesSize() {
1745+
synchronized(deliveredMessages) {
1746+
return previouslyDeliveredMessages != null ? previouslyDeliveredMessages.size() : 0;
1747+
}
1748+
}
16921749
}

0 commit comments

Comments
 (0)