diff --git a/java-bigquery-jdbc/.gitignore b/java-bigquery-jdbc/.gitignore index 76cba4530b64..bcad69d78e7c 100644 --- a/java-bigquery-jdbc/.gitignore +++ b/java-bigquery-jdbc/.gitignore @@ -3,6 +3,7 @@ target-it/** **/*logs*/** **/ITBigQueryJDBCLocalTest.java **/BigQueryStatementE2EBenchmark.java +.agents/ tools/**/*.class tools/**/drivers/** diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 07c95236c020..586a5c329405 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -62,6 +62,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** @@ -209,6 +210,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { Boolean reqGoogleDriveScope; private final Properties clientInfo = new Properties(); private boolean isReadOnlyTokenUsed = false; + private final ExecutorService metadataExecutor; + private final ExecutorService queryExecutor; BigQueryConnection(String url) throws IOException { this(url, DataSource.fromUrl(url)); @@ -344,6 +347,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.headerProvider = createHeaderProvider(); this.bigQuery = getBigQueryConnection(); + this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount); + this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool(); } } @@ -937,23 +942,91 @@ public void close() throws SQLException { } private void closeImpl() throws SQLException { + SQLException exceptionToThrow = null; try { if (this.bigQueryReadClient != null) { this.bigQueryReadClient.shutdown(); - this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES); - this.bigQueryReadClient.close(); } - if (this.bigQueryWriteClient != null) { this.bigQueryWriteClient.shutdown(); - this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES); - this.bigQueryWriteClient.close(); + } + if (this.metadataExecutor != null) { + this.metadataExecutor.shutdown(); + } + if (this.queryExecutor != null) { + this.queryExecutor.shutdown(); } for (Statement statement : this.openStatements) { - statement.close(); + try { + statement.close(); + } catch (SQLException e) { + if (exceptionToThrow == null) { + exceptionToThrow = e; + } else { + exceptionToThrow.addSuppressed(e); + } + } } this.openStatements.clear(); + + boolean interrupted = Thread.currentThread().isInterrupted(); + + try { + if (this.bigQueryReadClient != null) { + if (interrupted) { + this.bigQueryReadClient.shutdownNow(); + } else { + this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES); + } + } + if (this.bigQueryWriteClient != null) { + if (interrupted) { + this.bigQueryWriteClient.shutdownNow(); + } else { + this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES); + } + } + if (this.metadataExecutor != null) { + if (interrupted || !this.metadataExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + this.metadataExecutor.shutdownNow(); + } + } + if (this.queryExecutor != null) { + if (interrupted || !this.queryExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + this.queryExecutor.shutdownNow(); + } + } + } catch (InterruptedException e) { + interrupted = true; + if (this.bigQueryReadClient != null) { + this.bigQueryReadClient.shutdownNow(); + } + if (this.bigQueryWriteClient != null) { + this.bigQueryWriteClient.shutdownNow(); + } + if (this.metadataExecutor != null) { + this.metadataExecutor.shutdownNow(); + } + if (this.queryExecutor != null) { + this.queryExecutor.shutdownNow(); + } + } finally { + try { + if (this.bigQueryReadClient != null) { + this.bigQueryReadClient.close(); + } + } finally { + if (this.bigQueryWriteClient != null) { + this.bigQueryWriteClient.close(); + } + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + throw new InterruptedException("Interrupted awaiting executor termination"); + } } catch (ConcurrentModificationException ex) { throw new BigQueryJdbcException("Concurrent modification during close", ex); } catch (InterruptedException e) { @@ -962,9 +1035,20 @@ private void closeImpl() throws SQLException { BigQueryJdbcMdc.clear(); BigQueryJdbcRootLogger.closeConnectionHandler(this.connectionId); } + if (exceptionToThrow != null) { + throw exceptionToThrow; + } this.isClosed = true; } + ExecutorService getExecutorService() { + return this.queryExecutor; + } + + ExecutorService getMetadataExecutor() { + return this.metadataExecutor; + } + @Override public boolean isClosed() { return this.isClosed; diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java index 1e3287b8b2fc..b3e79973a4e7 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java @@ -26,9 +26,13 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** Lightweight MDC implementation for the BigQuery JDBC driver using InheritableThreadLocal. */ class BigQueryJdbcMdc { + private static final BigQueryJdbcCustomLogger LOG = + new BigQueryJdbcCustomLogger(BigQueryJdbcMdc.class.getName()); + private static final InheritableThreadLocal currentConnectionId = new InheritableThreadLocal<>(); @@ -73,6 +77,28 @@ static ExecutorService newFixedThreadPool(int nThreads) { return newFixedThreadPool(nThreads, Executors.defaultThreadFactory()); } + /** + * Creates a new cached thread pool ExecutorService that automatically propagates MDC connection + * context from the submitting thread to the executing thread. + */ + static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return new MdcThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 60L, + TimeUnit.SECONDS, + new java.util.concurrent.SynchronousQueue<>(), + new MdcThreadFactory(threadFactory)); + } + + /** + * Creates a new cached thread pool ExecutorService that automatically propagates MDC connection + * context from the submitting thread to the executing thread. + */ + static ExecutorService newCachedThreadPool() { + return newCachedThreadPool(Executors.defaultThreadFactory()); + } + private static class MdcThreadFactory implements ThreadFactory { private final ThreadFactory delegate; @@ -82,11 +108,16 @@ public MdcThreadFactory(ThreadFactory delegate) { @Override public Thread newThread(Runnable r) { - return delegate.newThread( - () -> { - clear(); - r.run(); - }); + Thread t = + delegate.newThread( + () -> { + clear(); + r.run(); + }); + if (t != null) { + t.setDaemon(true); + } + return t; } } @@ -102,11 +133,37 @@ public MdcThreadPoolExecutor( super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } + private final AtomicBoolean warningLogged = new AtomicBoolean(false); + + private void monitorQueueSaturation(int queueSize) { + int corePoolSize = getCorePoolSize(); + // Warn when queue size is >= corePoolSize * 5, with a minimum of 10 tasks to avoid false + // alerts for tiny pools + int warnThreshold = Math.max(10, corePoolSize * 5); + // Recovery reset threshold is corePoolSize * 2, with a minimum of 4 tasks + int recoveryThreshold = Math.max(4, corePoolSize * 2); + + if (queueSize >= warnThreshold) { + if (warningLogged.compareAndSet(false, true)) { + LOG.warning( + "Thread pool is saturating. Core pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.", + corePoolSize, getActiveCount(), queueSize); + } + } else if (queueSize <= recoveryThreshold) { + if (warningLogged.get()) { + warningLogged.set(false); + } + } + } + @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } + + monitorQueueSaturation(getQueue().size()); + if (command instanceof MdcFutureTask) { super.execute(command); } else { diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java index c97da7bd9ee3..5c377775368f 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java @@ -1387,4 +1387,17 @@ private static void validateNonNegative(long val, String propertyName) { "Invalid value for %s. It must be greater than or equal to 0.", propertyName)); } } + + /** + * Validates that a property value is greater than or equal to a minimum threshold. For thread + * pools, a minimum of 2 is enforced to ensure there are enough threads to handle concurrent + * coordination and avoid deadlock or thread starvation. + */ + private static void validateMin(long val, long min, String propertyName) { + if (val < min) { + throw new BigQueryJdbcRuntimeException( + String.format( + "Invalid value for %s. It must be greater than or equal to %d.", propertyName, min)); + } + } } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java index 56662c4efc3a..5acbe218e474 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java @@ -26,12 +26,13 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -public class BigQueryJdbcMdcTest { +public class BigQueryJdbcMdcTest extends BigQueryJdbcLoggingBaseTest { @AfterEach public void tearDown() { @@ -251,4 +252,164 @@ public void testPoolThreadInheritanceSevered() throws Exception { executor.shutdownNow(); } } + + @Test + public void testConnectionScopedExecutorLifecycle() throws Exception { + String url1 = + "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;" + + "OAuthType=2;ProjectId=Proj1;" + + "OAuthAccessToken=redacted;OAuthClientId=redacted;OAuthClientSecret=redacted;" + + "metadataFetchThreadCount=5;"; + String url2 = + "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;" + + "OAuthType=2;ProjectId=Proj2;" + + "OAuthAccessToken=redacted;OAuthClientId=redacted;OAuthClientSecret=redacted;" + + "metadataFetchThreadCount=10;"; + + BigQueryConnection conn1 = new BigQueryConnection(url1); + BigQueryConnection conn2 = new BigQueryConnection(url2); + + try { + ExecutorService exec1 = conn1.getExecutorService(); + ExecutorService exec2 = conn2.getExecutorService(); + ExecutorService metadataExec1 = conn1.getMetadataExecutor(); + ExecutorService metadataExec2 = conn2.getMetadataExecutor(); + + assertTrue(exec1 != exec2); + assertTrue(exec1 instanceof ThreadPoolExecutor); + assertTrue(exec2 instanceof ThreadPoolExecutor); + assertTrue(metadataExec1 instanceof ThreadPoolExecutor); + assertTrue(metadataExec2 instanceof ThreadPoolExecutor); + + assertEquals(0, ((ThreadPoolExecutor) exec1).getCorePoolSize()); + assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec1).getMaximumPoolSize()); + assertEquals(0, ((ThreadPoolExecutor) exec2).getCorePoolSize()); + assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec2).getMaximumPoolSize()); + assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getCorePoolSize()); + assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getMaximumPoolSize()); + assertEquals(10, ((ThreadPoolExecutor) metadataExec2).getCorePoolSize()); + assertEquals(10, ((ThreadPoolExecutor) metadataExec2).getMaximumPoolSize()); + + try (BigQueryJdbcMdc.MdcCloseable mdc = + BigQueryJdbcMdc.registerInstance(conn1.getConnectionId())) { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference workerMdc = new AtomicReference<>(); + exec1.execute( + () -> { + workerMdc.set(BigQueryJdbcMdc.getConnectionId()); + latch.countDown(); + }); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(conn1.getConnectionId(), workerMdc.get()); + } + } finally { + try { + conn1.close(); + } finally { + conn2.close(); + } + } + + assertTrue(conn1.getExecutorService().isShutdown()); + assertTrue(conn1.getMetadataExecutor().isShutdown()); + assertTrue(conn2.getExecutorService().isShutdown()); + assertTrue(conn2.getMetadataExecutor().isShutdown()); + } + + @Test + public void testThreadPoolSaturatingWarning() throws Exception { + ExecutorService executor = BigQueryJdbcMdc.newFixedThreadPool(1); + try { + CountDownLatch blockLatch = new CountDownLatch(1); + + // 1. Submit a task to occupy the single thread + executor.execute( + () -> { + try { + blockLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // 2. Submit 11 tasks to fill the queue and trigger the warning threshold of 10 (Math.max(10, + // 1 * 5)) + for (int i = 0; i < 11; i++) { + executor.execute(() -> {}); + } + + blockLatch.countDown(); + + // Verify that warning was logged + assertTrue( + assertLogContains("Thread pool is saturating"), + "Warning message about thread pool saturation was not logged"); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testThreadPoolHysteresisWarning() throws Exception { + ExecutorService executor = BigQueryJdbcMdc.newFixedThreadPool(1); + try { + CountDownLatch blockLatch1 = new CountDownLatch(1); + + // 1. Submit a task to occupy the single thread + executor.execute( + () -> { + try { + blockLatch1.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // 2. Submit 11 tasks (queue size becomes 10 upon 11th task submission) + for (int i = 0; i < 11; i++) { + executor.execute(() -> {}); + } + + // Verify that warning was logged + assertTrue( + assertLogContains("Thread pool is saturating"), + "Warning message about thread pool saturation was not logged"); + + // Clear the captured logs + capturedLogs.clear(); + + // Release the latch and wait for all tasks to complete, draining the queue to 0 (which is <= + // recovery threshold) + blockLatch1.countDown(); + + // To ensure all tasks have finished, we submit a final task and wait for its completion + CountDownLatch syncLatch = new CountDownLatch(1); + executor.execute(syncLatch::countDown); + assertTrue(syncLatch.await(5, TimeUnit.SECONDS)); + + // Now the queue is empty. Let's block the thread again and submit 11 tasks. + CountDownLatch blockLatch2 = new CountDownLatch(1); + executor.execute( + () -> { + try { + blockLatch2.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + for (int i = 0; i < 11; i++) { + executor.execute(() -> {}); + } + + // Verify warning was logged a second time because the flag was reset + assertTrue( + assertLogContains("Thread pool is saturating"), + "Warning message about thread pool saturation was not logged after recovery"); + + blockLatch2.countDown(); + } finally { + executor.shutdownNow(); + } + } }