Skip to content

Commit f803dd9

Browse files
authored
Add retries for insertAll (#2518)
1 parent 398a6d9 commit f803dd9

2 files changed

Lines changed: 34 additions & 5 deletions

File tree

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
2525
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
2626
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
27+
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
2728
import com.google.api.services.bigquery.model.TableRow;
2829
import com.google.api.services.bigquery.model.TableSchema;
2930
import com.google.cloud.BaseService;
3031
import com.google.cloud.PageImpl;
3132
import com.google.cloud.PageImpl.NextPageFetcher;
3233
import com.google.cloud.RetryHelper;
34+
import com.google.cloud.RetryHelper.RetryHelperException;
3335
import com.google.cloud.RetryOption;
3436
import com.google.cloud.Tuple;
3537
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
@@ -430,16 +432,39 @@ public InsertAllResponse insertAll(InsertAllRequest request) {
430432
requestPb.setIgnoreUnknownValues(request.ignoreUnknownValues());
431433
requestPb.setSkipInvalidRows(request.skipInvalidRows());
432434
requestPb.setTemplateSuffix(request.getTemplateSuffix());
435+
// Using an array of size 1 here to have a mutable boolean variable, which can be modified in
436+
// an anonymous inner class.
437+
final boolean[] allInsertIdsSet = {true};
433438
List<Rows> rowsPb = Lists.transform(request.getRows(), new Function<RowToInsert, Rows>() {
434439
@Override
435440
public Rows apply(RowToInsert rowToInsert) {
441+
allInsertIdsSet[0] &= rowToInsert.getId() != null;
436442
return new Rows().setInsertId(rowToInsert.getId()).setJson(rowToInsert.getContent());
437443
}
438444
});
439445
requestPb.setRows(rowsPb);
440-
return InsertAllResponse.fromPb(
441-
bigQueryRpc.insertAll(tableId.getProject(), tableId.getDataset(), tableId.getTable(),
442-
requestPb));
446+
447+
TableDataInsertAllResponse responsePb;
448+
if (allInsertIdsSet[0]) {
449+
// allowing retries only if all row insertIds are set (used for deduplication)
450+
try {
451+
responsePb = runWithRetries(
452+
new Callable<TableDataInsertAllResponse>() {
453+
@Override
454+
public TableDataInsertAllResponse call() throws Exception {
455+
return bigQueryRpc.insertAll(tableId.getProject(), tableId.getDataset(),
456+
tableId.getTable(), requestPb);
457+
}
458+
}, getOptions().getRetrySettings(), EXCEPTION_HANDLER, getOptions().getClock());
459+
} catch (RetryHelperException e) {
460+
throw BigQueryException.translateAndThrow(e);
461+
}
462+
} else {
463+
responsePb = bigQueryRpc.insertAll(tableId.getProject(), tableId.getDataset(),
464+
tableId.getTable(), requestPb);
465+
}
466+
467+
return InsertAllResponse.fromPb(responsePb);
443468
}
444469

445470
@Override

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import com.google.common.collect.ImmutableMap;
8181
import com.google.common.collect.ImmutableSet;
8282
import com.google.common.collect.Iterables;
83+
import com.google.common.collect.Sets;
8384
import com.google.common.io.BaseEncoding;
8485
import java.io.IOException;
8586
import java.nio.ByteBuffer;
@@ -1093,8 +1094,11 @@ public void testExtractJob() throws InterruptedException, TimeoutException {
10931094
Job remoteExtractJob = bigquery.create(JobInfo.of(extractConfiguration));
10941095
remoteExtractJob = remoteExtractJob.waitFor();
10951096
assertNull(remoteExtractJob.getStatus().getError());
1096-
assertEquals(CSV_CONTENT,
1097-
new String(storage.readAllBytes(BUCKET, EXTRACT_FILE), StandardCharsets.UTF_8));
1097+
1098+
String extractedCsv =
1099+
new String(storage.readAllBytes(BUCKET, EXTRACT_FILE), StandardCharsets.UTF_8);
1100+
assertEquals(
1101+
Sets.newHashSet(CSV_CONTENT.split("\n")), Sets.newHashSet(extractedCsv.split("\n")));
10981102
assertTrue(bigquery.delete(DATASET, tableName));
10991103
}
11001104

0 commit comments

Comments
 (0)