From 5c244d7d554fd3173cca74faed50d26a8418115d Mon Sep 17 00:00:00 2001 From: Shem Shadrack Date: Sat, 28 Mar 2026 01:46:00 +0300 Subject: [PATCH 1/2] shutdown correctness --- .../iotdb/session/pool/SessionPool.java | 4 +- .../iotdb/session/pool/TableSessionPool.java | 1 + .../iotdb/session/pool/SessionPoolTest.java | 68 +++++++++++++++++++ 3 files changed, 72 insertions(+), 1 deletion(-) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 949fba1251c50..34776d8147208 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -788,7 +788,7 @@ private void occupy(ISession session) { occupied.put(session, session); } - /** close all connections in the pool */ +/** close all connections in the pool and unblocks any waiting threads*/ @Override public synchronized void close() { for (ISession session : queue) { @@ -819,6 +819,8 @@ public synchronized void close() { this.closed = true; queue.clear(); occupied.clear(); + // Notify all waiting threads in getSession() so they wake up immediately + this.notifyAll(); } @Override diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java index 4e08f202a14ad..7d589d82cef8e 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java @@ -36,6 +36,7 @@ public ITableSession getSession() throws IoTDBConnectionException { return sessionPool.getPooledTableSession(); } + //Closes the underlying session pool and unblocks any waiting threads. @Override public void close() { this.sessionPool.close(); diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java index ce48172af2b87..4d2869755b3be 100644 --- a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java +++ b/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java @@ -76,6 +76,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -1623,4 +1624,71 @@ private List FakedFirstFetchTsBlockResult() { return Collections.singletonList(tsBlock); } + + // Regression test for graceful shutdown + @Test(timeout = 5000) + public void testCloseNotifiesWaitingThreads() throws Exception { + SessionPool pool = + new SessionPool.Builder() + .host("localhost") + .port(6667) + .user("root") + .password("root") + .maxSize(1) + .waitToGetSessionTimeoutInMs(10000) + // notifyAll() + .build(); + + try { + Session mockSession = Mockito.mock(Session.class); + ConcurrentLinkedDeque queue = + (ConcurrentLinkedDeque) Whitebox.getInternalState(pool, "queue"); + queue.push(mockSession); + Whitebox.setInternalState(pool, "size", 1); + + ISession occupiedSession = (ISession) Whitebox.invokeMethod(pool, "getSession"); + assertEquals(mockSession, occupiedSession); + assertEquals(0, queue.size()); + + final Exception[] caughtException = {null}; + + Thread waiterThread = + new Thread( + () -> { + try { + Whitebox.invokeMethod(pool, "getSession"); + } catch (Exception e) { + caughtException[0] = e; + } + }); + waiterThread.start(); + + Thread.sleep(100); + + long closeStartTime = System.currentTimeMillis(); + pool.close(); + long closeEndTime = System.currentTimeMillis(); + + waiterThread.join(1000); + + assertNotNull("Waiter thread should have caught an exception", caughtException[0]); + assertTrue( + "Exception should be IoTDBConnectionException", + caughtException[0] instanceof IoTDBConnectionException); + assertTrue( + "Exception message should indicate pool is closed", + caughtException[0].getMessage().contains("closed")); + + long closeDuration = closeEndTime - closeStartTime; + assertTrue( + "close() should complete quickly, but took " + closeDuration + "ms", + closeDuration < 1000); + + } finally { + try { + pool.close(); + } catch (Exception e) { + } + } + } } From c8a49bc2447a9440ca6925d3c320899085b45f18 Mon Sep 17 00:00:00 2001 From: Shem Shadrack Date: Sat, 28 Mar 2026 20:53:24 +0300 Subject: [PATCH 2/2] Refine SessionPool shutdown documentation and improve unblocking regression test reliability. --- .../iotdb/session/pool/SessionPool.java | 2 +- .../iotdb/session/pool/TableSessionPool.java | 2 +- .../iotdb/session/pool/SessionPoolTest.java | 20 +++++++++---------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 34776d8147208..b902a4ccd284c 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -788,7 +788,7 @@ private void occupy(ISession session) { occupied.put(session, session); } -/** close all connections in the pool and unblocks any waiting threads*/ + /** Closes all connections in the pool and unblocks any waiting threads. */ @Override public synchronized void close() { for (ISession session : queue) { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java index 7d589d82cef8e..718f5c34f19cf 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java @@ -36,7 +36,7 @@ public ITableSession getSession() throws IoTDBConnectionException { return sessionPool.getPooledTableSession(); } - //Closes the underlying session pool and unblocks any waiting threads. + /** Closes the underlying session pool and unblocks any waiting threads. */ @Override public void close() { this.sessionPool.close(); diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java index 4d2869755b3be..a12ebbf70c7b8 100644 --- a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java +++ b/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java @@ -74,6 +74,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -1636,7 +1638,6 @@ public void testCloseNotifiesWaitingThreads() throws Exception { .password("root") .maxSize(1) .waitToGetSessionTimeoutInMs(10000) - // notifyAll() .build(); try { @@ -1651,11 +1652,13 @@ public void testCloseNotifiesWaitingThreads() throws Exception { assertEquals(0, queue.size()); final Exception[] caughtException = {null}; + CountDownLatch latch = new CountDownLatch(1); Thread waiterThread = new Thread( () -> { try { + latch.countDown(); Whitebox.invokeMethod(pool, "getSession"); } catch (Exception e) { caughtException[0] = e; @@ -1663,13 +1666,14 @@ public void testCloseNotifiesWaitingThreads() throws Exception { }); waiterThread.start(); - Thread.sleep(100); + assertTrue("Waiter thread should have started", latch.await(10, TimeUnit.SECONDS)); + // Give it a moment to enter the wait(1000) block in getSession() + Thread.sleep(200); - long closeStartTime = System.currentTimeMillis(); pool.close(); - long closeEndTime = System.currentTimeMillis(); - waiterThread.join(1000); + waiterThread.join(500); + assertTrue("Waiter thread should be unblocked quickly", !waiterThread.isAlive()); assertNotNull("Waiter thread should have caught an exception", caughtException[0]); assertTrue( @@ -1679,15 +1683,11 @@ public void testCloseNotifiesWaitingThreads() throws Exception { "Exception message should indicate pool is closed", caughtException[0].getMessage().contains("closed")); - long closeDuration = closeEndTime - closeStartTime; - assertTrue( - "close() should complete quickly, but took " + closeDuration + "ms", - closeDuration < 1000); - } finally { try { pool.close(); } catch (Exception e) { + // ignore } } }