Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.

Commit 388db84

Browse files
committed
refactor: retry connection for arrow and json
1 parent 0786b1f commit 388db84

1 file changed

Lines changed: 99 additions & 65 deletions

File tree

google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 99 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public class BigQueryStatement extends BigQueryNoOpsStatement {
9494
private static final String DEFAULT_DATASET_NAME = "_google_jdbc";
9595
private static final String DEFAULT_TABLE_NAME = "temp_table_";
9696
private static final String JDBC_JOB_PREFIX = "google-jdbc-";
97+
private static final int MAX_RETRY_COUNT = 5;
98+
private static final long RETRY_DELAY_MS = 2000L;
9799
protected ResultSet currentResultSet;
98100
protected long currentUpdateCount = -1;
99101
protected List<JobId> jobIds = new ArrayList<>();
@@ -810,8 +812,6 @@ Thread populateArrowBufferedQueue(
810812
() -> {
811813
long rowsRead = 0;
812814
int retryCount = 0;
813-
final long retryTimeoutInSecs = this.connection.getRetryTimeoutInSeconds();
814-
long startTime = System.nanoTime();
815815
try {
816816
// Use the first stream to perform reading.
817817
if (readSession.getStreamsCount() == 0) {
@@ -838,7 +838,6 @@ Thread populateArrowBufferedQueue(
838838
ArrowRecordBatch currentBatch = response.getArrowRecordBatch();
839839
arrowBatchWrapperBlockingQueue.put(BigQueryArrowBatchWrapper.of(currentBatch));
840840
rowsRead += response.getRowCount();
841-
startTime = System.nanoTime();
842841
}
843842
break;
844843
} catch (com.google.api.gax.rpc.ApiException e) {
@@ -847,21 +846,26 @@ Thread populateArrowBufferedQueue(
847846
LOG.warning("Read session expired or not found: %s", e.getMessage());
848847
break;
849848
}
850-
long elapsedSecs = (System.nanoTime() - startTime) / 1_000_000_000L;
851-
if (elapsedSecs >= retryTimeoutInSecs) {
849+
if (retryCount >= MAX_RETRY_COUNT) {
852850
LOG.log(
853851
Level.WARNING,
854852
"\n"
855853
+ Thread.currentThread().getName()
856-
+ " Interrupted @ arrowStreamProcessor, timeout exceeded",
854+
+ " Interrupted @ arrowStreamProcessor, max retries exceeded",
857855
e);
856+
try {
857+
arrowBatchWrapperBlockingQueue.put(
858+
BigQueryArrowBatchWrapper.ofError(new BigQueryJdbcRuntimeException(e)));
859+
} catch (InterruptedException ie) {
860+
Thread.currentThread().interrupt();
861+
}
858862
break;
859863
}
860864
retryCount++;
861865
LOG.info(
862866
"Connection interrupted during arrow stream read, retrying. attempt: %d",
863867
retryCount);
864-
Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000);
868+
Thread.sleep(RETRY_DELAY_MS);
865869
}
866870
}
867871

@@ -870,6 +874,12 @@ Thread populateArrowBufferedQueue(
870874
Level.WARNING,
871875
"\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor",
872876
e);
877+
try {
878+
arrowBatchWrapperBlockingQueue.put(
879+
BigQueryArrowBatchWrapper.ofError(new BigQueryJdbcRuntimeException(e)));
880+
} catch (InterruptedException ie) {
881+
Thread.currentThread().interrupt();
882+
}
873883
Thread.currentThread().interrupt();
874884
throw new BigQueryJdbcRuntimeException(e);
875885
} finally { // logic needed for graceful shutdown
@@ -956,7 +966,12 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results) {
956966
if (jobId != null) {
957967
// Thread to make rpc calls to fetch data from the server
958968
Thread nextPageWorker =
959-
runNextPageTaskAsync(results, results.getNextPageToken(), jobId, rpcResponseQueue);
969+
runNextPageTaskAsync(
970+
results,
971+
results.getNextPageToken(),
972+
jobId,
973+
rpcResponseQueue,
974+
this.bigQueryFieldValueListWrapperBlockingQueue);
960975
threadList.add(nextPageWorker);
961976
} else {
962977
try {
@@ -1021,7 +1036,8 @@ Thread runNextPageTaskAsync(
10211036
TableResult result,
10221037
String firstPageToken,
10231038
JobId jobId,
1024-
BlockingQueue<Tuple<TableResult, Boolean>> rpcResponseQueue) {
1039+
BlockingQueue<Tuple<TableResult, Boolean>> rpcResponseQueue,
1040+
BlockingQueue<BigQueryFieldValueListWrapper> bigQueryFieldValueListWrapperBlockingQueue) {
10251041
LOG.finest("++enter++");
10261042
// parse and put the first page in the pageCache before the other pages are parsed from the RPC
10271043
// calls
@@ -1030,8 +1046,6 @@ Thread runNextPageTaskAsync(
10301046
// This thread makes the RPC calls and paginates
10311047
Runnable nextPageTask =
10321048
() -> {
1033-
long startTimeLoop = System.nanoTime();
1034-
final long retryTimeoutInSecs = this.connection.getRetryTimeoutInSeconds();
10351049
int retryCount = 0;
10361050
String currentPageToken = firstPageToken;
10371051
TableResult currentResults = result;
@@ -1061,7 +1075,6 @@ Thread runNextPageTaskAsync(
10611075
// this will be parsed asynchronously without blocking the current
10621076
// thread
10631077
rpcResponseQueue.put(Tuple.of(currentResults, true));
1064-
startTimeLoop = System.nanoTime();
10651078
LOG.fine(
10661079
"Fetched %d results from the server in %d ms.",
10671080
querySettings.getMaxResultPerPage(),
@@ -1070,21 +1083,28 @@ Thread runNextPageTaskAsync(
10701083
if (ex.getCode() == 404) {
10711084
throw ex;
10721085
}
1073-
long elapsedSecs = (System.nanoTime() - startTimeLoop) / 1_000_000_000L;
1074-
if (elapsedSecs >= retryTimeoutInSecs) {
1075-
throw ex;
1086+
if (retryCount >= MAX_RETRY_COUNT) {
1087+
throw new BigQueryJdbcRuntimeException(ex); // Re-throw max retries exceeded
10761088
}
10771089
retryCount++;
10781090
LOG.info(
10791091
"Connection interrupted during json stream read, retrying. attempt: %d",
10801092
retryCount);
1081-
Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000);
1082-
} catch (InterruptedException ie) {
1093+
Thread.sleep(RETRY_DELAY_MS);
1094+
} catch (InterruptedException ex) {
10831095
Thread.currentThread().interrupt();
1084-
throw new BigQueryJdbcRuntimeException(ie);
1096+
throw new BigQueryJdbcRuntimeException(ex);
1097+
} catch (Exception ex) {
1098+
throw new BigQueryJdbcRuntimeException(ex);
10851099
}
10861100
}
10871101
} catch (Exception ex) {
1102+
try {
1103+
bigQueryFieldValueListWrapperBlockingQueue.put(
1104+
BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
1105+
} catch (InterruptedException ie) {
1106+
Thread.currentThread().interrupt();
1107+
}
10881108
throw new BigQueryJdbcRuntimeException(ex);
10891109
} finally {
10901110
try {
@@ -1120,61 +1140,75 @@ Thread parseAndPopulateRpcDataAsync(
11201140

11211141
Runnable populateBufferRunnable =
11221142
() -> { // producer thread populating the buffer
1123-
Iterable<FieldValueList> fieldValueLists;
1124-
// as we have to process the first page
1125-
boolean hasRows = true;
1126-
while (hasRows) {
1127-
try {
1128-
Tuple<TableResult, Boolean> nextPageTuple = rpcResponseQueue.take();
1129-
if (nextPageTuple.x() != null) {
1130-
fieldValueLists = nextPageTuple.x().getValues();
1131-
} else {
1132-
fieldValueLists = null;
1133-
}
1134-
hasRows = nextPageTuple.y();
1143+
try {
1144+
Iterable<FieldValueList> fieldValueLists;
1145+
// as we have to process the first page
1146+
boolean hasRows = true;
1147+
while (hasRows) {
1148+
try {
1149+
Tuple<TableResult, Boolean> nextPageTuple = rpcResponseQueue.take();
1150+
if (nextPageTuple.x() != null) {
1151+
fieldValueLists = nextPageTuple.x().getValues();
1152+
} else {
1153+
fieldValueLists = null;
1154+
}
1155+
hasRows = nextPageTuple.y();
11351156

1136-
} catch (InterruptedException e) {
1137-
LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
1138-
// Thread might get interrupted while calling the Cancel method, which is
1139-
// expected, so logging this instead of throwing the exception back
1140-
break;
1141-
}
1157+
} catch (InterruptedException e) {
1158+
LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
1159+
// Thread might get interrupted while calling the Cancel method, which is
1160+
// expected, so logging this instead of throwing the exception back
1161+
break;
1162+
}
11421163

1143-
if (Thread.currentThread().isInterrupted()
1144-
|| queryTaskExecutor.isShutdown()
1145-
|| fieldValueLists == null) {
1146-
// do not process further pages and shutdown (outerloop)
1147-
break;
1148-
}
1164+
if (Thread.currentThread().isInterrupted()
1165+
|| queryTaskExecutor.isShutdown()
1166+
|| fieldValueLists == null) {
1167+
// do not process further pages and shutdown (outerloop)
1168+
break;
1169+
}
11491170

1150-
long startTime = System.nanoTime();
1151-
long results = 0;
1152-
for (FieldValueList fieldValueList : fieldValueLists) {
1153-
try {
1154-
if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
1155-
// do not process further pages and shutdown (inner loop)
1156-
break;
1171+
long startTime = System.nanoTime();
1172+
long results = 0;
1173+
for (FieldValueList fieldValueList : fieldValueLists) {
1174+
try {
1175+
if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
1176+
// do not process further pages and shutdown (inner loop)
1177+
break;
1178+
}
1179+
bigQueryFieldValueListWrapperBlockingQueue.put(
1180+
BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList));
1181+
results += 1;
1182+
} catch (InterruptedException ex) {
1183+
throw new BigQueryJdbcRuntimeException(ex);
11571184
}
1158-
bigQueryFieldValueListWrapperBlockingQueue.put(
1159-
BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList));
1160-
results += 1;
1161-
} catch (InterruptedException ex) {
1162-
throw new BigQueryJdbcRuntimeException(ex);
11631185
}
1186+
LOG.fine(
1187+
"Processed %d results in %d ms.",
1188+
results, (int) ((System.nanoTime() - startTime) / 1000000));
11641189
}
1165-
LOG.fine(
1166-
"Processed %d results in %d ms.",
1167-
results, (int) ((System.nanoTime() - startTime) / 1000000));
1168-
}
1169-
try {
1170-
// All the pages has been processed, put this marker
1171-
bigQueryFieldValueListWrapperBlockingQueue.put(
1172-
BigQueryFieldValueListWrapper.of(null, null, true));
1173-
} catch (InterruptedException e) {
1190+
} catch (Exception ex) {
11741191
LOG.log(
11751192
Level.WARNING,
1176-
"\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync",
1177-
e);
1193+
"\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync",
1194+
ex);
1195+
try {
1196+
bigQueryFieldValueListWrapperBlockingQueue.put(
1197+
BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
1198+
} catch (InterruptedException ie) {
1199+
Thread.currentThread().interrupt();
1200+
}
1201+
} finally {
1202+
try {
1203+
// All the pages has been processed, put this marker
1204+
bigQueryFieldValueListWrapperBlockingQueue.put(
1205+
BigQueryFieldValueListWrapper.of(null, null, true));
1206+
} catch (InterruptedException e) {
1207+
LOG.log(
1208+
Level.WARNING,
1209+
"\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync",
1210+
e);
1211+
}
11781212
}
11791213
};
11801214

0 commit comments

Comments
 (0)