Skip to content

Commit 4362c5a

Browse files
authored
Fix/recuring flaky tests (#1791)
* fix(test): flaky tests * fix(test): Add logs to understand why the tests continue to fail randomly * fix(test): enhance DurableFiveBrokerNetworkBridgeTest with additional assertions for durable subscriptions * fix(test): use reflection to syncrhonize. Unfortunatly I can't see other means to do that * fix(test): remove additional logging to debug tests
1 parent 0c1eabd commit 4362c5a

4 files changed

Lines changed: 124 additions & 14 deletions

File tree

activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848

4949
import org.apache.activemq.ActiveMQConnection;
5050
import org.apache.activemq.ActiveMQConnectionFactory;
51+
import org.apache.activemq.broker.region.DurableTopicSubscription;
52+
import org.apache.activemq.broker.region.RegionBroker;
53+
import org.apache.activemq.broker.region.TopicRegion;
5154
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
5255
import org.apache.activemq.broker.region.policy.PolicyEntry;
5356
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -1736,15 +1739,37 @@ private boolean isSubscriptionInactive(Topic topic, String clientId) throws Exce
17361739

17371740
private boolean isSubscriptionActive(Topic topic, String clientId) throws Exception {
17381741
if (isVirtualTopicSubscriptionStrategy()) {
1739-
String queueName = buildVirtualTopicQueueName(topic, clientId);
1742+
final String queueName = buildVirtualTopicQueueName(topic, clientId);
17401743
try {
17411744
return getProxyToQueue(queueName).getConsumerCount() > 0;
17421745
} catch (Exception ignore) {
17431746
return false;
17441747
}
17451748
} else {
1746-
return brokerService.getAdminView().getDurableTopicSubscribers().length >= 1 &&
1747-
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
1749+
final int activeSubs = brokerService.getAdminView().getDurableTopicSubscribers().length;
1750+
final int inactiveSubs = brokerService.getAdminView().getInactiveDurableTopicSubscribers().length;
1751+
final boolean jmxActive = activeSubs >= 1 && inactiveSubs == 0;
1752+
1753+
// Diagnostic: also check the actual broker-level subscription state
1754+
// to determine if the flakiness is a JMX registration issue or a real broker bug
1755+
boolean brokerLevelActive = false;
1756+
try {
1757+
final RegionBroker regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
1758+
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
1759+
final String subName = QoS.values()[topic.qos().ordinal()] + ":" + topic.name().toString();
1760+
final DurableTopicSubscription sub = topicRegion.lookupSubscription(subName, clientId);
1761+
brokerLevelActive = sub != null && sub.isActive();
1762+
} catch (Exception e) {
1763+
LOG.debug("Could not check broker-level subscription state", e);
1764+
}
1765+
1766+
if (jmxActive != brokerLevelActive) {
1767+
LOG.warn("MQTT subscription state MISMATCH: JMX says active={} (active={}, inactive={}), " +
1768+
"broker-level says active={} for clientId={}, topic={}",
1769+
jmxActive, activeSubs, inactiveSubs, brokerLevelActive, clientId, topic.name());
1770+
}
1771+
1772+
return jmxActive;
17481773
}
17491774
}
17501775

activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
package org.apache.activemq.network;
1818

1919
import java.io.File;
20+
import java.lang.reflect.Field;
2021
import java.net.URI;
2122
import java.util.ArrayList;
2223
import java.util.List;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.RejectedExecutionException;
2327
import java.util.concurrent.TimeUnit;
2428

2529
import jakarta.jms.Connection;
@@ -30,6 +34,8 @@
3034
import junit.framework.Test;
3135
import org.apache.activemq.JmsMultipleBrokersTestSupport;
3236
import org.apache.activemq.broker.BrokerService;
37+
import org.apache.activemq.broker.TransportConnection;
38+
import org.apache.activemq.broker.TransportConnector;
3339
import org.apache.activemq.broker.region.Destination;
3440
import org.apache.activemq.broker.region.DestinationFilter;
3541
import org.apache.activemq.broker.region.DurableTopicSubscription;
@@ -134,6 +140,23 @@ public void testDurablePropagationBrokerRestart() throws Exception {
134140
startAllBrokers();
135141
waitForBridgeFormation();
136142

143+
// Wait for the async durable sync (syncDurableSubs=true) to complete across all bridges.
144+
// After restart with persistent data, NC durable subs must re-establish to match Phase 1
145+
// counts before we proceed with unsubscribes, otherwise the sync can re-create NC durable
146+
// subs AFTER the unsubscribe advisory has already propagated.
147+
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
148+
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
149+
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
150+
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
151+
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
152+
153+
// Wait for all bridge sync executors to finish populating durableRemoteSubs.
154+
// setupStaticDestinations() creates DemandSubscriptions with empty durableRemoteSubs,
155+
// and the syncExecutor populates them asynchronously. Without this wait, unsubscribe
156+
// advisories may race with the sync executor: the advisory finds nothing to remove
157+
// (durableRemoteSubs is empty), then the sync populates it, leaving a stale NC durable sub.
158+
waitForAllBridgeSyncCompletion();
159+
137160
conn = brokers.get("Broker_A_A").factory.createConnection();
138161
conn.setClientID("clientId1");
139162
conn.start();
@@ -854,6 +877,59 @@ protected void configureBroker(BrokerService broker) {
854877
broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest");
855878
}
856879

880+
/**
881+
* Wait for all bridge sync executors (both initiator and duplex bridges) to complete
882+
* processing their BrokerSubscriptionInfo tasks on all brokers.
883+
* Uses reflection to access the private syncExecutor, following the same pattern
884+
* as {@link DynamicNetworkTestSupport#findDuplexBridge}.
885+
*/
886+
private void waitForAllBridgeSyncCompletion() throws Exception {
887+
final Field syncExecutorField = DemandForwardingBridgeSupport.class.getDeclaredField("syncExecutor");
888+
syncExecutorField.setAccessible(true);
889+
final Field duplexBridgeField = TransportConnection.class.getDeclaredField("duplexBridge");
890+
duplexBridgeField.setAccessible(true);
891+
892+
for (final BrokerItem item : brokers.values()) {
893+
final BrokerService broker = item.broker;
894+
// Initiator bridges (accessible via network connectors)
895+
for (final NetworkConnector nc : broker.getNetworkConnectors()) {
896+
for (final NetworkBridge bridge : nc.activeBridges()) {
897+
if (bridge instanceof DemandForwardingBridgeSupport) {
898+
flushSyncExecutor(syncExecutorField, (DemandForwardingBridgeSupport) bridge);
899+
}
900+
}
901+
}
902+
// Duplex bridges (accessible via transport connections)
903+
for (final TransportConnector tc : broker.getTransportConnectors()) {
904+
for (final TransportConnection conn : tc.getConnections()) {
905+
if (conn.getConnectionId() != null && conn.getConnectionId().startsWith("networkConnector_")) {
906+
final DemandForwardingBridgeSupport duplexBridge =
907+
(DemandForwardingBridgeSupport) duplexBridgeField.get(conn);
908+
if (duplexBridge != null) {
909+
flushSyncExecutor(syncExecutorField, duplexBridge);
910+
}
911+
}
912+
}
913+
}
914+
}
915+
}
916+
917+
private void flushSyncExecutor(final Field syncExecutorField,
918+
final DemandForwardingBridgeSupport bridge) throws Exception {
919+
final ExecutorService syncExecutor = (ExecutorService) syncExecutorField.get(bridge);
920+
if (syncExecutor.isShutdown()) {
921+
return;
922+
}
923+
final CountDownLatch latch = new CountDownLatch(1);
924+
try {
925+
syncExecutor.execute(latch::countDown);
926+
} catch (final RejectedExecutionException e) {
927+
return;
928+
}
929+
assertTrue("Sync executor should complete on " + bridge,
930+
latch.await(30, TimeUnit.SECONDS));
931+
}
932+
857933
protected void startNetworkConnectors(NetworkConnector... connectors) throws Exception {
858934
for (final NetworkConnector connector : connectors) {
859935
connector.start();

activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,12 @@ public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exce
226226
//Test that on successful reconnection of the bridge that
227227
//the NC sub will be removed
228228
restartBroker(broker2, true);
229-
assertNCDurableSubsCount(broker2, topic, 1);
229+
// In REVERSE flow, broker2=localBroker has the bridge and broker1 (remoteBroker)
230+
// is already running, so the sync may have already cleaned up the NC durable sub.
231+
// This "before sync" assertion is only valid in FORWARD flow.
232+
if (flow == FLOW.FORWARD) {
233+
assertNCDurableSubsCount(broker2, topic, 1);
234+
}
230235
restartBroker(broker1, true);
231236
assertBridgeStarted();
232237
assertNCDurableSubsCount(broker2, topic, 0);
@@ -253,7 +258,9 @@ public void testSubscriptionRemovedAfterIncludedChanged() throws Exception {
253258
//the NC sub will be removed because even though the local subscription exists,
254259
//it no longer matches the included filter
255260
restartBroker(broker2, true);
256-
assertNCDurableSubsCount(broker2, topic, 1);
261+
if (flow == FLOW.FORWARD) {
262+
assertNCDurableSubsCount(broker2, topic, 1);
263+
}
257264
restartBroker(broker1, true);
258265
assertBridgeStarted();
259266
assertNCDurableSubsCount(broker2, topic, 0);
@@ -291,7 +298,9 @@ public void testSubscriptionRemovedAfterStaticChanged() throws Exception {
291298
//the NC sub will be removed because even though the local subscription exists,
292299
//it no longer matches the included static filter
293300
restartBroker(broker2, true);
294-
assertNCDurableSubsCount(broker2, topic, 1);
301+
if (flow == FLOW.FORWARD) {
302+
assertNCDurableSubsCount(broker2, topic, 1);
303+
}
295304
restartBroker(broker1, true);
296305
assertBridgeStarted();
297306
assertNCDurableSubsCount(broker2, topic, 0);
@@ -320,10 +329,13 @@ public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Ex
320329
//Test that on successful reconnection of the bridge that
321330
//the NC sub will be removed for topic1 but will stay for topic2
322331

323-
//before sync, the old NC should exist
332+
//before sync, the old NC should exist (only verifiable in FORWARD flow;
333+
//in REVERSE, broker2=localBroker has the bridge and sync may already have run)
324334
restartBroker(broker2, true);
325-
assertNCDurableSubsCount(broker2, topic, 1);
326-
assertNCDurableSubsCount(broker2, topic2, 0);
335+
if (flow == FLOW.FORWARD) {
336+
assertNCDurableSubsCount(broker2, topic, 1);
337+
assertNCDurableSubsCount(broker2, topic2, 0);
338+
}
327339

328340
//After sync, remove old NC and create one for topic 2
329341
restartBroker(broker1, true);

activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,11 +1357,8 @@ public void testToTopicWithDurable() throws Exception {
13571357
includedProducer.send(test);
13581358
assertNotNull(bridgeConsumer.receive(5000));
13591359

1360-
assertTrue("dequeues not updated",
1361-
Wait.waitFor(() -> 1 == destinationStatistics.getDequeues().getCount()));
1362-
1363-
assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
1364-
assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
1360+
waitForDispatchFromLocalBroker(destinationStatistics, 1);
1361+
assertLocalBrokerStatistics(destinationStatistics, 1);
13651362

13661363
assertRemoteAdvisoryCount(advisoryConsumer, 1);
13671364
assertAdvisoryBrokerCounts(1,1,0);

0 commit comments

Comments
 (0)