Skip to content

Commit 8d51a29

Browse files
authored
test(usecases): various fixes for flaky tests (#1704)
* Enhance QueueZeroPrefetchLazyDispatchPriorityTest with improved message enqueuing checks and logging consistency * Refactor ConnectionFailureEvictsFromPoolTest and BrokerNetworkWithStuckMessagesTest for improved readability and consistency in message assertions * Update assertDeqInflight method to use >= for dequeues and inflight checks, enhancing accuracy in advisory message assertions * Add wait for broker to process connection drop in RedeliveryRestartWithExceptionTest * Fix handling of IllegalStateException in AMQ3166Test to prevent transaction hang during async sends * fix: avoid test hanging on Windows * test(usecases): fix flaky advisory propagation waits in TwoBrokerVirtualTopicSelectorAwareForwardingTest
1 parent fb8acf9 commit 8d51a29

8 files changed

Lines changed: 129 additions & 154 deletions

File tree

activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) thro
100100
}
101101

102102
if (whackIndex) {
103-
File indexToDelete = new File(brokerDataDir, "db.data");
103+
final File indexToDelete = new File(brokerDataDir, "db.data");
104104
LOG.info("Whacking index: " + indexToDelete);
105-
indexToDelete.delete();
105+
IOHelper.deleteFileNonBlocking(indexToDelete);
106106
}
107107

108108
doStartBroker(false, forceRecoverIndex);
@@ -219,14 +219,15 @@ public void testRecoveryAfterCorruptionMetadataLocation() throws Exception {
219219
broker.getPersistenceAdapter().checkpoint(true);
220220
Location location = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getMetadata().producerSequenceIdTrackerLocation;
221221

222-
DataFile dataFile = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
223-
RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
222+
final DataFile dataFile = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
223+
final RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
224224
randomAccessFile.seek(location.getOffset());
225225
// Use an invalid size well past the end of the data file to trigger corruption handling without large allocation.
226-
int bogusSize = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal()
226+
final int bogusSize = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal()
227227
.getFileMap().get(location.getDataFileId()).getLength() * 10;
228228
randomAccessFile.writeInt(bogusSize);
229229
randomAccessFile.getChannel().force(true);
230+
dataFile.closeRandomAccessFile(randomAccessFile);
230231

231232
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().close();
232233
try {
@@ -437,6 +438,7 @@ private void corruptBatchEndEof(int id) throws Exception{
437438
randomAccessFile.writeInt(4 * 1024 * 1024);
438439
randomAccessFile.writeLong(0l);
439440
randomAccessFile.getChannel().force(true);
441+
dataFile.closeRandomAccessFile(randomAccessFile);
440442
}
441443

442444
private void corruptOrderIndex(final int num, final int size) throws Exception {

activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,12 @@ public void transportResumed() {
116116
TestCase.fail("Expected Error");
117117
} catch (JMSException e) {
118118
}
119+
// Wait for async exception event BEFORE the try-with-resources closes the connection.
120+
// ActiveMQConnection.onException() fires TransportListener callbacks via executeAsync(),
121+
// so the callback runs in a separate thread. If we wait after connection.close(), the
122+
// async executor may already be shut down and the callback never fires.
123+
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS));
119124
}
120-
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS));
121125
// If we get another connection now it should be a new connection that
122126
// works.
123127
LOG.info("expect new connection after failure");

activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
4848
import org.apache.activemq.transport.tcp.TcpTransport;
4949
import org.apache.activemq.usage.SystemUsage;
50+
import org.apache.activemq.util.Wait;
5051
import org.junit.After;
5152
import org.junit.Before;
5253
import org.slf4j.Logger;
@@ -241,6 +242,16 @@ public void testValidateRedeliveryFlagOnNonPersistentAfterTransientFailureConnec
241242

242243
connection.getTransport().narrow(TcpTransport.class).getTransportListener().onException(new IOException("Die"));
243244

245+
// Wait for the broker to fully process the connection drop and return
246+
// unacked messages to the queue. The onException triggers async cleanup
247+
// via executeAsync(), so without this wait the new consumer may receive
248+
// fresh messages (6-10) instead of the redelivered ones (1-5).
249+
final ActiveMQQueue dest = new ActiveMQQueue(queueName);
250+
assertTrue("unacked messages returned to queue", Wait.waitFor(() -> {
251+
final org.apache.activemq.broker.region.Destination d = broker.getDestination(dest);
252+
return d != null && d.getDestinationStatistics().getInflight().getCount() == 0;
253+
}, 10000, 100));
254+
244255
connection = (ActiveMQConnection) connectionFactory.createConnection();
245256
connection.start();
246257

activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -191,18 +191,25 @@ public void testRollbackOnAsyncErrorAmqApi() throws Exception {
191191
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(session.createQueue("QAT"));
192192

193193
for (int i=0; i<batchSize; i++) {
194-
producer.send(session.createTextMessage("Hello A"), new AsyncCallback() {
195-
@Override
196-
public void onSuccess() {
197-
batchSent.countDown();
198-
}
194+
try {
195+
producer.send(session.createTextMessage("Hello A"), new AsyncCallback() {
196+
@Override
197+
public void onSuccess() {
198+
batchSent.countDown();
199+
}
199200

200-
@Override
201-
public void onException(JMSException e) {
202-
session.getTransactionContext().setRollbackOnly(true);
203-
batchSent.countDown();
204-
}
205-
});
201+
@Override
202+
public void onException(JMSException e) {
203+
session.getTransactionContext().setRollbackOnly(true);
204+
batchSent.countDown();
205+
}
206+
});
207+
} catch (jakarta.jms.IllegalStateException alreadyRolledBack) {
208+
// Async error from an earlier send may have already marked the transaction
209+
// rollback-only. Count down the latch so beforeEnd doesn't hang.
210+
batchSent.countDown();
211+
continue;
212+
}
206213

207214
if (i==0) {
208215
// transaction context begun on first send
@@ -211,9 +218,9 @@ public void onException(JMSException e) {
211218
public void beforeEnd() throws Exception {
212219
// await response to all sends in the batch
213220
if (!batchSent.await(10, TimeUnit.SECONDS)) {
214-
LOG.error("TimedOut waiting for aync send requests!");
221+
LOG.error("TimedOut waiting for async send requests!");
215222
session.getTransactionContext().setRollbackOnly(true);
216-
};
223+
}
217224
super.beforeEnd();
218225
}
219226
});

activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java

Lines changed: 17 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -235,15 +235,8 @@ public void testBrokerNetworkWithStuckMessages() throws Exception {
235235
// Ensure that there are zero messages on the local broker. This tells
236236
// us that those messages have been prefetched to the remote broker
237237
// where the demand exists.
238-
Wait.waitFor(new Wait.Condition() {
239-
@Override
240-
public boolean isSatisified() throws Exception {
241-
Object[] result = browseQueueWithJmx(localBroker);
242-
return 0 == result.length;
243-
}
244-
});
245-
messages = browseQueueWithJmx(localBroker);
246-
assertEquals(0, messages.length);
238+
assertTrue("local broker drained", Wait.waitFor(() ->
239+
browseQueueWithJmx(localBroker).length == 0));
247240

248241
// try and pull the messages from remote, should be denied b/c on networkTtl
249242
LOG.info("creating demand on second remote...");
@@ -270,15 +263,9 @@ public boolean isSatisified() throws Exception {
270263
connection2.send(connectionInfo2.createRemoveCommand());
271264

272265
// There should now be 5 messages stuck on the remote broker
273-
assertTrue("correct stuck message count", Wait.waitFor(new Wait.Condition() {
274-
@Override
275-
public boolean isSatisified() throws Exception {
276-
Object[] result = browseQueueWithJmx(remoteBroker);
277-
return 5 == result.length;
278-
}
279-
}));
266+
assertTrue("correct stuck message count", Wait.waitFor(() ->
267+
browseQueueWithJmx(remoteBroker).length == 5));
280268
messages = browseQueueWithJmx(remoteBroker);
281-
assertEquals(5, messages.length);
282269

283270
assertTrue("can see broker path property",
284271
((String)((CompositeData)messages[1]).get("BrokerPath")).contains(localBroker.getBroker().getBrokerId().toString()));
@@ -295,15 +282,15 @@ public boolean isSatisified() throws Exception {
295282
connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
296283
LOG.info("acked one message on origin, waiting for all messages to percolate back");
297284

298-
Wait.waitFor(new Wait.Condition() {
299-
@Override
300-
public boolean isSatisified() throws Exception {
301-
Object[] result = browseQueueWithJmx(localBroker);
302-
return 4 == result.length;
303-
}
304-
});
305-
messages = browseQueueWithJmx(localBroker);
306-
assertEquals(4, messages.length);
285+
// Wait for ALL stuck messages to replay from remote back to local.
286+
// Must check both brokers: local == 4 (5 replayed - 1 acked) AND remote == 0
287+
// (replay complete). Without the remote check, the Wait can return when only
288+
// 4 of 5 messages have arrived on local (transient match), then the 5th arrives.
289+
assertTrue("messages percolated back", Wait.waitFor(() -> {
290+
final Object[] localResult = browseQueueWithJmx(localBroker);
291+
final Object[] remoteResult = browseQueueWithJmx(remoteBroker);
292+
return localResult.length == 4 && remoteResult.length == 0;
293+
}, TimeUnit.SECONDS.toMillis(30), 100));
307294

308295
LOG.info("checking for messages on remote again");
309296
// messages won't migrate back again till consumer closes
@@ -329,25 +316,10 @@ public boolean isSatisified() throws Exception {
329316
assertEquals(receiveNumMessages, counter);
330317

331318
// verify all messages consumed
332-
Wait.waitFor(new Wait.Condition() {
333-
@Override
334-
public boolean isSatisified() throws Exception {
335-
Object[] result = browseQueueWithJmx(remoteBroker);
336-
return 0 == result.length;
337-
}
338-
});
339-
messages = browseQueueWithJmx(remoteBroker);
340-
assertEquals(0, messages.length);
341-
342-
Wait.waitFor(new Wait.Condition() {
343-
@Override
344-
public boolean isSatisified() throws Exception {
345-
Object[] result = browseQueueWithJmx(localBroker);
346-
return 0 == result.length;
347-
}
348-
});
349-
messages = browseQueueWithJmx(localBroker);
350-
assertEquals(0, messages.length);
319+
assertTrue("remote drained", Wait.waitFor(() ->
320+
browseQueueWithJmx(remoteBroker).length == 0));
321+
assertTrue("local drained", Wait.waitFor(() ->
322+
browseQueueWithJmx(localBroker).length == 0));
351323

352324
// Close the consumer on the remote broker
353325
connection2.send(consumerInfo3.createRemoveCommand());

activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -277,23 +277,23 @@ public void testPrefetchSizePercent() throws Exception {
277277

278278
private void assertDeqInflight(final int dequeue, final int inflight,
279279
final ActiveMQTopic... topics) throws Exception {
280-
assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() {
281-
@Override
282-
public boolean isSatisified() throws Exception {
283-
long actualDeq = 0;
284-
long actualInflight = 0;
285-
for (ActiveMQTopic topic : topics) {
286-
ActiveMQTopic advisory = AdvisorySupport.getConsumerAdvisoryTopic(topic);
287-
Destination destination = brokers.get("A").broker.getDestination(advisory);
288-
if (destination != null) {
289-
actualDeq += destination.getDestinationStatistics().getDequeues().getCount();
290-
actualInflight += destination.getDestinationStatistics().getInflight().getCount();
291-
}
280+
// Use >= instead of == because duplex bridges with statically included destinations
281+
// may generate additional advisory messages from the bridge's own subscriptions,
282+
// depending on subscription registration ordering.
283+
assertTrue("deq and inflight as expected", Wait.waitFor(() -> {
284+
long actualDeq = 0;
285+
long actualInflight = 0;
286+
for (final ActiveMQTopic topic : topics) {
287+
final ActiveMQTopic advisory = AdvisorySupport.getConsumerAdvisoryTopic(topic);
288+
final Destination destination = brokers.get("A").broker.getDestination(advisory);
289+
if (destination != null) {
290+
actualDeq += destination.getDestinationStatistics().getDequeues().getCount();
291+
actualInflight += destination.getDestinationStatistics().getInflight().getCount();
292292
}
293-
LOG.info("A Deq:" + actualDeq);
294-
LOG.info("A Inflight:" + actualInflight);
295-
return actualDeq == dequeue && actualInflight == inflight;
296293
}
294+
LOG.info("A Deq:{} (expected >={}), Inflight:{} (expected >={})",
295+
actualDeq, dequeue, actualInflight, inflight);
296+
return actualDeq >= dequeue && actualInflight >= inflight;
297297
}));
298298
}
299299

activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public void tearDown() throws Exception {
7474

7575
@Test(timeout=120000)
7676
public void testPriorityMessages() throws Exception {
77+
final ActiveMQQueue destination = new ActiveMQQueue("TestQ");
7778

7879
for (int i = 0; i < ITERATIONS; i++) {
7980

@@ -85,11 +86,15 @@ public void testPriorityMessages() throws Exception {
8586

8687
LOG.info("On iteration {}", i);
8788

88-
Thread.sleep(1000);
89+
// Wait for all messages to be enqueued before consuming
90+
assertTrue("Messages enqueued", Wait.waitFor(() -> {
91+
final Queue queue = (Queue) broker.getDestination(destination);
92+
return queue != null && queue.getDestinationStatistics().getMessages().getCount() == 5;
93+
}, 5000, 100));
8994

9095
// consume messages
91-
ArrayList<Message> consumeList = consumeMessages("TestQ", 5, TimeUnit.SECONDS.toMillis(30));
92-
LOG.info("Consumed list " + consumeList.size());
96+
final ArrayList<Message> consumeList = consumeMessages("TestQ", 5, TimeUnit.SECONDS.toMillis(30));
97+
LOG.info("Consumed list {}", consumeList.size());
9398

9499
// compare lists
95100
assertEquals("message 1 should be priority high", 5, consumeList.get(0).getJMSPriority());
@@ -170,27 +175,32 @@ public void testLongLivedPriorityConsumer() throws Exception {
170175
public void testPriorityMessagesWithJmsBrowser() throws Exception {
171176

172177
final int numToSend = 5;
178+
final ActiveMQQueue destination = new ActiveMQQueue("TestQ");
173179

174180
for (int i = 0; i < ITERATIONS; i++) {
175181
produceMessages(numToSend - 1, 4, "TestQ");
176182

177-
ArrayList<Message> browsed = browseMessages("TestQ");
183+
final ArrayList<Message> browsed = browseMessages("TestQ");
178184

179185
LOG.info("Browsed: {}", browsed.size());
180186

181187
// send 1 message priority HIGH
182188
produceMessages(1, 5, "TestQ");
183189

184-
Thread.sleep(1000);
190+
// Wait for all messages to be enqueued
191+
assertTrue("Messages enqueued", Wait.waitFor(() -> {
192+
final Queue queue = (Queue) broker.getDestination(destination);
193+
return queue != null && queue.getDestinationStatistics().getMessages().getCount() == numToSend;
194+
}, 5000, 100));
185195

186196
LOG.info("On iteration {}", i);
187197

188-
Message message = consumeOneMessage("TestQ");
198+
final Message message = consumeOneMessage("TestQ");
189199
assertNotNull(message);
190200
assertEquals(5, message.getJMSPriority());
191201

192202
// consume messages
193-
ArrayList<Message> consumeList = consumeMessages("TestQ");
203+
final ArrayList<Message> consumeList = consumeMessages("TestQ");
194204
LOG.info("Consumed list {}", consumeList.size());
195205

196206
// compare lists
@@ -217,30 +227,37 @@ public void testJmsBrowserGetsPagedIn() throws Exception {
217227
return queue != null && queue.getDestinationStatistics().getMessages().getCount() == numToSend;
218228
}, 5000, 100));
219229

220-
ArrayList<Message> browsed = browseMessages("TestQ");
230+
final ArrayList<Message> browsed = browseMessages("TestQ");
221231

222232
LOG.info("Browsed: {}", browsed.size());
223233

224234
assertEquals(0, browsed.size());
225235

226-
Message message = consumeOneMessage("TestQ", Session.CLIENT_ACKNOWLEDGE);
236+
final Message message = consumeOneMessage("TestQ", Session.CLIENT_ACKNOWLEDGE);
227237
assertNotNull(message);
228238

229-
browsed = browseMessages("TestQ");
239+
final ArrayList<Message> browsedAfterPull = browseMessages("TestQ");
230240

231-
LOG.info("Browsed: {}", browsed.size());
241+
LOG.info("Browsed: {}", browsedAfterPull.size());
232242

233-
assertEquals("see only the paged in for pull", 1, browsed.size());
243+
assertEquals("see only the paged in for pull", 1, browsedAfterPull.size());
234244

235-
// Wait for all messages to be available (including redelivery of unacked message)
245+
// Wait for the unacked message to be fully redelivered after connection close.
246+
// messages.getCount() includes in-flight messages so it's already == numToSend;
247+
// we must also check inflight == 0 to ensure the redelivery processing is complete
248+
// and all messages are truly available for dispatch to a new consumer.
236249
assertTrue("All messages available for consumption", Wait.waitFor(() -> {
237250
final Queue queue = (Queue) broker.getDestination(destination);
238-
return queue != null && queue.getDestinationStatistics().getMessages().getCount() == numToSend;
251+
return queue != null
252+
&& queue.getDestinationStatistics().getMessages().getCount() == numToSend
253+
&& queue.getDestinationStatistics().getInflight().getCount() == 0;
239254
}, 5000, 100));
240255

241-
// consume messages
242-
ArrayList<Message> consumeList = consumeMessages("TestQ");
243-
LOG.info("Consumed list " + consumeList.size());
256+
// Use the retry-based consume to handle zero-prefetch dispatch timing:
257+
// with prioritized messages + lazy dispatch + redelivered messages in the
258+
// dispatch pending list, the pull mechanism may need multiple attempts on slow CI.
259+
final ArrayList<Message> consumeList = consumeMessages("TestQ", numToSend, TimeUnit.SECONDS.toMillis(30));
260+
LOG.info("Consumed list {}", consumeList.size());
244261
assertEquals(numToSend, consumeList.size());
245262
}
246263
}

0 commit comments

Comments
 (0)