[SessionPool] Fix graceful shutdown by notifying waiting threads in close()#17381
[SessionPool] Fix graceful shutdown by notifying waiting threads in close()#17381ShemShadrack wants to merge 2 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR improves SessionPool shutdown behavior by waking threads blocked in getSession() when the pool is closed, aiming to reduce shutdown latency in environments that require fast cleanup (e.g., streaming jobs).
Changes:
- Add
notifyAll()inSessionPool.close()to wake threads waiting ingetSession(). - Add brief close/unblocking documentation in
SessionPoolandTableSessionPool. - Add a regression test intended to verify that waiting threads fail fast with
IoTDBConnectionExceptionwhen the pool is closed.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java |
Wakes all waiting getSession() callers during close() via notifyAll(). |
iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java |
Documents that closing the wrapper pool unblocks waiting threads (via underlying pool close). |
iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java |
Adds a regression test for unblocking behavior on pool close. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| /** close all connections in the pool */ | ||
| /** close all connections in the pool and unblocks any waiting threads*/ |
There was a problem hiding this comment.
The new Javadoc line is not indented like the surrounding code and is missing standard Javadoc formatting (capitalization/period and space before closing */). This may violate style/checkstyle and reads unpolished. Please format it consistently (indent to match the method, use a proper sentence like “Closes all connections in the pool and unblocks any waiting threads.”).
| /** close all connections in the pool and unblocks any waiting threads*/ | |
| /** | |
| * Closes all connections in the pool and unblocks any waiting threads. | |
| */ |
| return sessionPool.getPooledTableSession(); | ||
| } | ||
|
|
||
| //Closes the underlying session pool and unblocks any waiting threads. |
There was a problem hiding this comment.
The added comment has no space after // and is a line comment on a public method. If the intent is to document behavior for API users, prefer a proper Javadoc comment (and keep formatting consistent) rather than //Closes ....
| //Closes the underlying session pool and unblocks any waiting threads. | |
| /** | |
| * Closes the underlying session pool and unblocks any waiting threads. | |
| */ |
| Thread waiterThread = | ||
| new Thread( | ||
| () -> { | ||
| try { | ||
| Whitebox.invokeMethod(pool, "getSession"); | ||
| } catch (Exception e) { | ||
| caughtException[0] = e; | ||
| } | ||
| }); | ||
| waiterThread.start(); | ||
|
|
||
| Thread.sleep(100); | ||
|
|
||
| long closeStartTime = System.currentTimeMillis(); | ||
| pool.close(); | ||
| long closeEndTime = System.currentTimeMillis(); | ||
|
|
||
| waiterThread.join(1000); | ||
|
|
There was a problem hiding this comment.
This regression test is likely flaky and may not reliably prove the notifyAll() behavior. getSession() uses wait(1000), and the test joins for 1000ms; without notifyAll(), the waiting thread can still wake on the 1s timeout and the test may pass. Consider coordinating with a CountDownLatch to ensure the waiter is actually in the wait loop before calling close(), and assert the waiter unblocks well under 1s (e.g., join for a few hundred ms and assert the thread is no longer alive).
| } finally { | ||
| try { | ||
| pool.close(); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
The empty catch block in the finally silently swallows failures and makes debugging harder if close() starts throwing unexpectedly. At minimum, add a short // ignore comment, or log/fail() when an exception is thrown (depending on the intended behavior).
| } catch (Exception e) { | |
| } catch (Exception e) { | |
| // ignore: best-effort cleanup in test |
| // Notify all waiting threads in getSession() so they wake up immediately | ||
| this.notifyAll(); |
There was a problem hiding this comment.
The PR description says waiting threads could remain blocked for “up to the full timeout period”, but getSession() currently uses wait(1000), so the pre-fix worst-case delay appears to be ~1s rather than waitToGetSessionTimeoutInMs. Consider updating the description (or the implementation, if full-timeout blocking is still expected) so the rationale matches the current code.
| .password("root") | ||
| .maxSize(1) | ||
| .waitToGetSessionTimeoutInMs(10000) | ||
| // notifyAll() |
There was a problem hiding this comment.
Minor: the inline // notifyAll() comment in the builder chain doesn’t add information (it doesn’t configure anything) and can be removed to keep the test clear and intention-revealing.
| // notifyAll() |
| long closeStartTime = System.currentTimeMillis(); | ||
| pool.close(); | ||
| long closeEndTime = System.currentTimeMillis(); | ||
|
|
||
| waiterThread.join(1000); | ||
|
|
||
| assertNotNull("Waiter thread should have caught an exception", caughtException[0]); | ||
| assertTrue( | ||
| "Exception should be IoTDBConnectionException", | ||
| caughtException[0] instanceof IoTDBConnectionException); | ||
| assertTrue( | ||
| "Exception message should indicate pool is closed", | ||
| caughtException[0].getMessage().contains("closed")); | ||
|
|
||
| long closeDuration = closeEndTime - closeStartTime; | ||
| assertTrue( | ||
| "close() should complete quickly, but took " + closeDuration + "ms", | ||
| closeDuration < 1000); |
There was a problem hiding this comment.
The closeStartTime/closeEndTime measurement here doesn’t assert the key behavior (that the waiting thread is unblocked quickly); close() may be fast even if the waiter is still blocked. Prefer measuring/limiting the waiter unblocking time (e.g., time from close() to waiter exit) or drop this duration assertion to avoid slow-CI flakiness.
…ession test reliability.
|
I updated the |
SessionPool.close()terminates all sessions and marks the pool as closed, but threads blocked ingetSession()would not wake up immediately. They remained blocked in synchronized wait loops for up to the full timeout period only later discovering the pool was closed. This is inefficient during graceful shutdown scenarios, especially in streaming environments like Flink where fast cleanup is critical.The fix adds
notifyAll()at the end of theclose()method to immediately wake all waiting threads, allowing them to check the closed flag and fail fast. Additionally, comprehensive JavaDoc was added to both SessionPool and TableSessionPool to document the unblocking behavior. A regression test verifies that waiting threads receive IoTDBConnectionException immediately upon pool closure.