From 957e94f2cbc29b70db2fa02ad22557b53693fd54 Mon Sep 17 00:00:00 2001 From: Mudit Saxena Date: Mon, 13 Oct 2025 14:28:43 +0530 Subject: [PATCH 1/2] Added test for specific race condition scenario: * 1. A background thread continuously adds packets to the learner's queue * 2. Shutdown is initiated, which clears the queue and adds proposalOfDeath * 3. Shutdown then blocks on socket.close() (mocked to take 5 seconds) * 4. During the socket close wait, the background thread continues adding packets * 5. The queue grows even though shutdown has been initiated --- .../server/quorum/LearnerHandlerTest.java | 180 +++++++++++++++++- 1 file changed, 174 insertions(+), 6 deletions(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java index 43202716d2b..9d7954c1a0e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java @@ -21,11 +21,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; + import java.io.BufferedInputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.net.Socket; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -41,6 +43,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -49,24 +52,28 @@ public class LearnerHandlerTest extends ZKTestCase { protected static final Logger LOG = LoggerFactory.getLogger(LearnerHandlerTest.class); + + // Test constants for shutdown and packet queuing behavior + private static final int CONCURRENT_PACKET_COUNT = 2000; + private static final int PACKET_SEND_DELAY_MS = 10; + private static final int INITIAL_PACKET_ACCUMULATION_TIME_MS = 1000; class MockLearnerHandler extends LearnerHandler { - boolean threadStarted = false; + boolean sendingThreadStarted = true; MockLearnerHandler(Socket sock, Leader leader) throws IOException { super(sock, new BufferedInputStream(sock.getInputStream()), leader); } protected void startSendingPackets() { - threadStarted = true; + sendingThreadStarted = true; } @Override protected boolean shouldSendMarkerPacketForLogging() { return false; } - } class MockZKDatabase extends ZKDatabase { @@ -136,6 +143,14 @@ public void setUp() throws Exception { db = new MockZKDatabase(null); sock = mock(Socket.class); + doAnswer(invocation -> { + System.out.println("Mock socket close called, simulating delay..."); + Thread.sleep(5000); + System.out.println("Socket close timed out, closing socket..."); + return null; + }).when(sock).close(); + + // Intercept when startForwarding is called leader = mock(Leader.class); when(leader.startForwarding(ArgumentMatchers.any(LearnerHandler.class), ArgumentMatchers.anyLong())).thenAnswer(new Answer() { @@ -147,6 +162,49 @@ public Long answer(InvocationOnMock invocation) { when(leader.getZKDatabase()).thenReturn(db); learnerHandler = new MockLearnerHandler(sock, leader); + + // Use reflection to access and mock the private SyncLimitCheck class + Class syncLimitCheckClass = null; + for (Class innerClass : LearnerHandler.class.getDeclaredClasses()) { + if (innerClass.getSimpleName().equals("SyncLimitCheck")) { + syncLimitCheckClass = innerClass; + break; + } + } + + // Create a mock of the private inner class using Mockito with Answer + Object mockSyncLimitCheck = Mockito.mock(syncLimitCheckClass, invocation -> { + // Stub the check method to return false + if ("check".equals(invocation.getMethod().getName())) { + return false; + } + // For other methods, use default behavior + return Mockito.RETURNS_DEFAULTS.answer(invocation); + }); + + // Inject the mock into LearnerHandler instance + Field field = LearnerHandler.class.getDeclaredField("syncLimitCheck"); + field.setAccessible(true); + field.set(learnerHandler, mockSyncLimitCheck); + List learnerHandlers = new ArrayList<>(); + learnerHandlers.add(learnerHandler); + when(leader.getLearners()).thenReturn(learnerHandlers); + // I want to test removeLearnerHandler function in Leader + doAnswer(invocation -> { + LearnerHandler lh = invocation.getArgument(0); + if (lh == learnerHandler) { + LOG.info("Mock removeLearnerHandler called"); + } + if (!learnerHandlers.isEmpty()) { + LOG.info("Removing learner handler: {}", lh); + learnerHandlers.remove(lh); + } + return null; + }).when(leader).removeLearnerHandler(ArgumentMatchers.any(LearnerHandler.class)); + + Field fieldSendingThreadStarted = LearnerHandler.class.getDeclaredField("sendingThreadStarted"); + fieldSendingThreadStarted.setAccessible(true); + fieldSendingThreadStarted.set(learnerHandler, learnerHandler.sendingThreadStarted); } Proposal createProposal(long zxid) { @@ -174,7 +232,7 @@ public void queuedPacketMatches(long[] zxids) { void reset() { learnerHandler.getQueuedPackets().clear(); - learnerHandler.threadStarted = false; + learnerHandler.sendingThreadStarted = false; learnerHandler.setFirstPacket(true); } @@ -556,4 +614,114 @@ public void testTxnLogGap() throws Exception { reset(); } + /** + * Tests that the packet queue can grow during a slow/blocked shutdown process. + * + * This test verifies a specific race condition scenario: + * 1. A background thread continuously adds packets to the learner's queue + * 2. Shutdown is initiated, which clears the queue and adds proposalOfDeath + * 3. Shutdown then blocks on socket.close() (mocked to take 5 seconds) + * 4. During the socket close wait, the background thread continues adding packets + * 5. The queue grows even though shutdown has been initiated + * + * This simulates real-world scenarios where: + * - Network I/O is slow during shutdown + * - The leader continues sending packets while learner is shutting down + * - The queue can unexpectedly grow during the shutdown process + * + * The test ensures this behavior is understood and can be monitored. + * + * @throws InterruptedException if thread operations are interrupted + */ + @Test + public void testLearnerHandlerShutdownWithConcurrentPacketQueuing() throws InterruptedException { + // Setup a thread that will keep adding packets to the queue even after shutdown starts + // This simulates a leader adding packets concurrently with shutdown + Thread packetSender = new Thread(() -> { + // Add packets to queue + for (int i = 0; i < CONCURRENT_PACKET_COUNT; i++) { + if (leader.getLearners().isEmpty()) { + LOG.debug("No learners available, stopping packet sender."); + break; + } + leader.getLearners().get(0).queuePacket(new QuorumPacket()); + try { + Thread.sleep(PACKET_SEND_DELAY_MS); // Space out the additions + } catch (InterruptedException e) { + LOG.warn("Packet sender thread interrupted", e); + Thread.currentThread().interrupt(); + break; + } + } + }); + packetSender.start(); + LOG.debug("Packet sender started, adding packets to queue..."); + Thread.sleep(INITIAL_PACKET_ACCUMULATION_TIME_MS); // Allow some packets to be added + LOG.debug("Current queue size: {}", learnerHandler.getQueuedPackets().size()); + learnerHandler.startSendingPackets(); + + // Verify that packets are being added + int initialQueueSize = learnerHandler.getQueuedPackets().size(); + assertTrue(initialQueueSize > 0, + "Queue should have some packets before shutdown (actual: " + initialQueueSize + ")"); + LOG.debug("Initial queue size before shutdown: {}", initialQueueSize); + + // Start shutdown in a separate thread + // Shutdown will: clear queue, add proposalOfDeath, then block on socket.close() for 5 seconds + Thread shutdownThread = new Thread(() -> { + try { + learnerHandler.shutdown(); + } catch (Exception e) { + LOG.warn("Exception during learner handler shutdown", e); + } + }); + shutdownThread.start(); + + // Give shutdown time to clear the queue and start blocking on socket close + Thread.sleep(500); + + // At this point: + // - Shutdown has cleared the queue and added proposalOfDeath (queue size = 1) + // - Shutdown is now blocked in socket.close() for 5 seconds + // - Packet sender is still running and adding packets + int queueSizeAfterShutdownStart = learnerHandler.getQueuedPackets().size(); + LOG.debug("Queue size after shutdown started (should be ~1): {}", queueSizeAfterShutdownStart); + + // Wait for packets to accumulate during the socket close wait + Thread.sleep(2000); + + // Check that queue has grown during the shutdown process + int queueSizeDuringShutdown = learnerHandler.getQueuedPackets().size(); + LOG.debug("Queue size during shutdown (socket close wait): {}", queueSizeDuringShutdown); + + // The queue should have grown because: + // - Shutdown cleared it to 1 (proposalOfDeath) + // - But packet sender kept adding while shutdown waits on socket.close() + assertTrue(queueSizeDuringShutdown > queueSizeAfterShutdownStart, + "Queue should grow during shutdown when socket close is delayed (after shutdown start: " + + queueSizeAfterShutdownStart + ", during shutdown: " + queueSizeDuringShutdown + ")"); + + // Wait for shutdown to complete + shutdownThread.join(10000); + if (shutdownThread.isAlive()) { + LOG.warn("Shutdown thread did not complete, interrupting"); + shutdownThread.interrupt(); + shutdownThread.join(2000); + } + + // Wait for packet sender to complete + packetSender.join(5000); + if (packetSender.isAlive()) { + LOG.warn("Packet sender thread did not complete, interrupting"); + packetSender.interrupt(); + packetSender.join(1000); + } + + int finalQueueSize = learnerHandler.getQueuedPackets().size(); + LOG.debug("Final queue size after shutdown complete: {}", finalQueueSize); + + // Verify learner was removed from leader + assertEquals(0, leader.getLearners().size(), + "Leader should have no learners after shutdown completes"); + } } From 0d9ade35795cb376f0367bf58dc1544c754f7acd Mon Sep 17 00:00:00 2001 From: Mudit Saxena Date: Mon, 13 Oct 2025 14:29:42 +0530 Subject: [PATCH 2/2] [MINOR] Fixed checkstyle violations --- .../server/quorum/LearnerHandlerTest.java | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java index 9d7954c1a0e..c70b56f470e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java @@ -21,8 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.*; - +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.BufferedInputStream; import java.io.IOException; import java.lang.reflect.Field; @@ -50,9 +51,8 @@ import org.slf4j.LoggerFactory; public class LearnerHandlerTest extends ZKTestCase { - protected static final Logger LOG = LoggerFactory.getLogger(LearnerHandlerTest.class); - + // Test constants for shutdown and packet queuing behavior private static final int CONCURRENT_PACKET_COUNT = 2000; private static final int PACKET_SEND_DELAY_MS = 10; @@ -160,9 +160,8 @@ public Long answer(InvocationOnMock invocation) { } }); when(leader.getZKDatabase()).thenReturn(db); - learnerHandler = new MockLearnerHandler(sock, leader); - + // Use reflection to access and mock the private SyncLimitCheck class Class syncLimitCheckClass = null; for (Class innerClass : LearnerHandler.class.getDeclaredClasses()) { @@ -171,9 +170,9 @@ public Long answer(InvocationOnMock invocation) { break; } } - + // Create a mock of the private inner class using Mockito with Answer - Object mockSyncLimitCheck = Mockito.mock(syncLimitCheckClass, invocation -> { + Object mockSyncLimitCheck = mock(syncLimitCheckClass, invocation -> { // Stub the check method to return false if ("check".equals(invocation.getMethod().getName())) { return false; @@ -616,21 +615,19 @@ public void testTxnLogGap() throws Exception { /** * Tests that the packet queue can grow during a slow/blocked shutdown process. - * * This test verifies a specific race condition scenario: * 1. A background thread continuously adds packets to the learner's queue * 2. Shutdown is initiated, which clears the queue and adds proposalOfDeath * 3. Shutdown then blocks on socket.close() (mocked to take 5 seconds) * 4. During the socket close wait, the background thread continues adding packets * 5. The queue grows even though shutdown has been initiated - * + *

* This simulates real-world scenarios where: * - Network I/O is slow during shutdown * - The leader continues sending packets while learner is shutting down * - The queue can unexpectedly grow during the shutdown process - * - * The test ensures this behavior is understood and can be monitored. - * + *

+ * * @throws InterruptedException if thread operations are interrupted */ @Test @@ -659,13 +656,13 @@ public void testLearnerHandlerShutdownWithConcurrentPacketQueuing() throws Inter Thread.sleep(INITIAL_PACKET_ACCUMULATION_TIME_MS); // Allow some packets to be added LOG.debug("Current queue size: {}", learnerHandler.getQueuedPackets().size()); learnerHandler.startSendingPackets(); - + // Verify that packets are being added int initialQueueSize = learnerHandler.getQueuedPackets().size(); - assertTrue(initialQueueSize > 0, + assertTrue(initialQueueSize > 0, "Queue should have some packets before shutdown (actual: " + initialQueueSize + ")"); LOG.debug("Initial queue size before shutdown: {}", initialQueueSize); - + // Start shutdown in a separate thread // Shutdown will: clear queue, add proposalOfDeath, then block on socket.close() for 5 seconds Thread shutdownThread = new Thread(() -> { @@ -676,31 +673,31 @@ public void testLearnerHandlerShutdownWithConcurrentPacketQueuing() throws Inter } }); shutdownThread.start(); - // Give shutdown time to clear the queue and start blocking on socket close Thread.sleep(500); - + // At this point: // - Shutdown has cleared the queue and added proposalOfDeath (queue size = 1) // - Shutdown is now blocked in socket.close() for 5 seconds // - Packet sender is still running and adding packets int queueSizeAfterShutdownStart = learnerHandler.getQueuedPackets().size(); LOG.debug("Queue size after shutdown started (should be ~1): {}", queueSizeAfterShutdownStart); - + // Wait for packets to accumulate during the socket close wait Thread.sleep(2000); - + // Check that queue has grown during the shutdown process int queueSizeDuringShutdown = learnerHandler.getQueuedPackets().size(); LOG.debug("Queue size during shutdown (socket close wait): {}", queueSizeDuringShutdown); - + // The queue should have grown because: // - Shutdown cleared it to 1 (proposalOfDeath) // - But packet sender kept adding while shutdown waits on socket.close() assertTrue(queueSizeDuringShutdown > queueSizeAfterShutdownStart, - "Queue should grow during shutdown when socket close is delayed (after shutdown start: " + - queueSizeAfterShutdownStart + ", during shutdown: " + queueSizeDuringShutdown + ")"); - + "Queue should grow during shutdown when socket close is delayed (after shutdown start: " + + queueSizeAfterShutdownStart + ", during shutdown: " + + queueSizeDuringShutdown + ")"); + // Wait for shutdown to complete shutdownThread.join(10000); if (shutdownThread.isAlive()) { @@ -708,7 +705,7 @@ public void testLearnerHandlerShutdownWithConcurrentPacketQueuing() throws Inter shutdownThread.interrupt(); shutdownThread.join(2000); } - + // Wait for packet sender to complete packetSender.join(5000); if (packetSender.isAlive()) { @@ -716,10 +713,10 @@ public void testLearnerHandlerShutdownWithConcurrentPacketQueuing() throws Inter packetSender.interrupt(); packetSender.join(1000); } - + int finalQueueSize = learnerHandler.getQueuedPackets().size(); LOG.debug("Final queue size after shutdown complete: {}", finalQueueSize); - + // Verify learner was removed from leader assertEquals(0, leader.getLearners().size(), "Leader should have no learners after shutdown completes");