|
6 | 6 | import java.io.PrintWriter; |
7 | 7 | import java.sql.*; |
8 | 8 | import java.util.*; |
| 9 | +import java.util.concurrent.ExecutorService; |
| 10 | +import java.util.concurrent.RejectedExecutionException; |
| 11 | +import java.util.concurrent.TimeUnit; |
9 | 12 | import java.util.concurrent.atomic.AtomicBoolean; |
10 | 13 | import java.util.concurrent.atomic.AtomicInteger; |
11 | 14 | import java.util.concurrent.atomic.LongAdder; |
@@ -81,6 +84,8 @@ final class ConnectionPool implements DataSourcePool { |
81 | 84 | private final PooledConnectionQueue queue; |
82 | 85 | private Timer heartBeatTimer; |
83 | 86 | private int heartbeatPoolExhaustedCount; |
| 87 | + private final ExecutorService executor; |
| 88 | + |
84 | 89 | /** |
85 | 90 | * Used to find and close() leaked connections. Leaked connections are |
86 | 91 | * thought to be busy but have not been used for some time. Each time a |
@@ -134,6 +139,7 @@ final class ConnectionPool implements DataSourcePool { |
134 | 139 | init(); |
135 | 140 | } |
136 | 141 | this.nextTrimTime = System.currentTimeMillis() + trimPoolFreqMillis; |
| 142 | + this.executor = ExecutorFactory.newExecutor(); |
137 | 143 | } |
138 | 144 |
|
139 | 145 | private void init() { |
@@ -649,15 +655,23 @@ public void offline() { |
649 | 655 | shutdownPool(false, false); |
650 | 656 | } |
651 | 657 |
|
652 | | - private void shutdownPool(boolean closeBusyConnections, boolean fromHook) { |
653 | | - stopHeartBeatIfRunning(); |
654 | | - PoolStatus status = queue.shutdown(closeBusyConnections); |
655 | | - dataSourceUp.set(false); |
656 | | - if (fromHook) { |
657 | | - Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem); |
658 | | - } else { |
659 | | - Log.info("DataSource [{0}] shutdown {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem); |
660 | | - removeShutdownHook(); |
| 658 | + private void shutdownPool(boolean fullShutdown, boolean fromHook) { |
| 659 | + heartbeatLock.lock(); |
| 660 | + try { |
| 661 | + stopHeartBeatIfRunning(); |
| 662 | + PoolStatus status = queue.shutdown(fullShutdown); |
| 663 | + dataSourceUp.set(false); |
| 664 | + if (fullShutdown) { |
| 665 | + shutdownExecutor(); |
| 666 | + } |
| 667 | + if (fromHook) { |
| 668 | + Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem); |
| 669 | + } else { |
| 670 | + Log.info("DataSource [{0}] shutdown {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem); |
| 671 | + removeShutdownHook(); |
| 672 | + } |
| 673 | + } finally { |
| 674 | + heartbeatLock.unlock(); |
661 | 675 | } |
662 | 676 | } |
663 | 677 |
|
@@ -718,6 +732,61 @@ private void stopHeartBeatIfRunning() { |
718 | 732 | } |
719 | 733 | } |
720 | 734 |
|
| 735 | + private static final class AsyncCloser implements Runnable { |
| 736 | + final PooledConnection pc; |
| 737 | + final boolean logErrors; |
| 738 | + |
| 739 | + private AsyncCloser(PooledConnection pc, boolean logErrors) { |
| 740 | + this.pc = pc; |
| 741 | + this.logErrors = logErrors; |
| 742 | + } |
| 743 | + |
| 744 | + @Override |
| 745 | + public void run() { |
| 746 | + pc.doCloseConnection(logErrors); |
| 747 | + } |
| 748 | + |
| 749 | + @Override |
| 750 | + public String toString() { |
| 751 | + return pc.toString(); |
| 752 | + } |
| 753 | + } |
| 754 | + |
| 755 | + /** |
| 756 | + * Closes the connection in the background as it may be slow or block. |
| 757 | + */ |
| 758 | + void closeConnectionFullyAsync(PooledConnection pc, boolean logErrors) { |
| 759 | + if (!executor.isShutdown()) { |
| 760 | + try { |
| 761 | + executor.submit(new AsyncCloser(pc, logErrors)); |
| 762 | + return; |
| 763 | + } catch (RejectedExecutionException e) { |
| 764 | + Log.trace("DataSource [{0}] closing connection synchronously", name); |
| 765 | + } |
| 766 | + } |
| 767 | + // it is possible that we receive runnables after shutdown. |
| 768 | + // in this case, we will execute them immediately (outside lock) |
| 769 | + pc.doCloseConnection(logErrors); |
| 770 | + } |
| 771 | + |
| 772 | + private void shutdownExecutor() { |
| 773 | + executor.shutdown(); |
| 774 | + try { |
| 775 | + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { |
| 776 | + Log.warn("DataSource [{0}] on shutdown, timeout waiting for connections to close", name); |
| 777 | + } |
| 778 | + } catch (InterruptedException ie) { |
| 779 | + Log.warn("DataSource [{0}] on shutdown, interrupted closing connections", name, ie); |
| 780 | + } |
| 781 | + final var pendingTasks = executor.shutdownNow(); |
| 782 | + if (!pendingTasks.isEmpty()) { |
| 783 | + Log.warn("DataSource [{0}] on shutdown, {1} pending connections were not closed", name, pendingTasks.size()); |
| 784 | + } |
| 785 | + } |
| 786 | + |
| 787 | + /** |
| 788 | + * Return the default autoCommit setting for the pool. |
| 789 | + */ |
721 | 790 | @Override |
722 | 791 | public boolean isAutoCommit() { |
723 | 792 | return autoCommit; |
|
0 commit comments