Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 0b32ed4

Browse files
chore: merge main into generate-libraries-main
2 parents 37826c3 + 866a8c2 commit 0b32ed4

2 files changed

Lines changed: 166 additions & 0 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,25 @@ ApiFuture<Empty> rollbackAsync() {
614614
getTransactionChannelHint());
615615
session.markUsed(clock.instant());
616616
return apiFuture;
617+
} else if (transactionIdFuture != null) {
618+
ApiFuture<ByteString> transactionIdOrEmptyFuture =
619+
ApiFutures.catching(
620+
transactionIdFuture,
621+
Throwable.class,
622+
input -> ByteString.empty(),
623+
MoreExecutors.directExecutor());
624+
return ApiFutures.transformAsync(
625+
transactionIdOrEmptyFuture,
626+
transactionId ->
627+
transactionId.isEmpty()
628+
? ApiFutures.immediateFuture(Empty.getDefaultInstance())
629+
: rpc.rollbackAsync(
630+
RollbackRequest.newBuilder()
631+
.setSession(session.getName())
632+
.setTransactionId(transactionId)
633+
.build(),
634+
getTransactionChannelHint()),
635+
MoreExecutors.directExecutor());
617636
} else {
618637
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
619638
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static org.junit.Assert.assertNull;
20+
21+
import com.google.api.core.ApiFuture;
22+
import com.google.cloud.NoCredentials;
23+
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
24+
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
25+
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
26+
import com.google.cloud.spanner.connection.AbstractMockServerTest;
27+
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
28+
import com.google.common.base.Function;
29+
import com.google.spanner.v1.ExecuteSqlRequest;
30+
import com.google.spanner.v1.RollbackRequest;
31+
import io.grpc.ManagedChannelBuilder;
32+
import io.grpc.Status;
33+
import java.util.Objects;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
36+
import org.junit.BeforeClass;
37+
import org.junit.Test;
38+
import org.junit.runner.RunWith;
39+
import org.junit.runners.JUnit4;
40+
import org.threeten.bp.Duration;
41+
42+
@RunWith(JUnit4.class)
43+
public class OrphanedTransactionTest extends AbstractMockServerTest {
44+
private static final Statement STATEMENT = Statement.of("SELECT * FROM random");
45+
46+
@BeforeClass
47+
public static void setupReadResult() {
48+
com.google.cloud.spanner.connection.RandomResultSetGenerator generator =
49+
new RandomResultSetGenerator(10);
50+
mockSpanner.putStatementResult(StatementResult.query(STATEMENT, generator.generate()));
51+
}
52+
53+
private Spanner createSpanner() {
54+
return SpannerOptions.newBuilder()
55+
.setProjectId("fake-project")
56+
.setHost("http://localhost:" + getPort())
57+
.setCredentials(NoCredentials.getInstance())
58+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
59+
.setSessionPoolOption(
60+
SessionPoolOptions.newBuilder().setWaitForMinSessions(Duration.ofSeconds(5L)).build())
61+
.build()
62+
.getService();
63+
}
64+
65+
@Test
66+
public void testOrphanedTransaction() throws Exception {
67+
ExecutorService executor = Executors.newCachedThreadPool();
68+
try (Spanner spanner = createSpanner()) {
69+
DatabaseClient client =
70+
spanner.getDatabaseClient(
71+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
72+
// Freeze the mock server to ensure that the request lands on the mock server before we
73+
// proceed.
74+
mockSpanner.freeze();
75+
AsyncTransactionManager manager = client.transactionManagerAsync();
76+
TransactionContextFuture context = manager.beginAsync();
77+
context.then(
78+
(txn, input) -> {
79+
try (AsyncResultSet resultSet = txn.executeQueryAsync(STATEMENT)) {
80+
resultSet.toListAsync(
81+
(Function<StructReader, Object>)
82+
row -> Objects.requireNonNull(row).getValue(0).getAsString(),
83+
executor);
84+
}
85+
return null;
86+
},
87+
executor);
88+
// Wait for the ExecuteSqlRequest to land on the mock server.
89+
mockSpanner.waitForRequestsToContain(
90+
input ->
91+
input instanceof ExecuteSqlRequest
92+
&& ((ExecuteSqlRequest) input).getSql().equals(STATEMENT.getSql()),
93+
5000L);
94+
// Now close the transaction. This should (eventually) trigger a rollback, even though the
95+
// client has not yet received a transaction ID.
96+
manager.closeAsync();
97+
// Unfreeze the mock server and wait for the Rollback request to be received.
98+
mockSpanner.unfreeze();
99+
mockSpanner.waitForLastRequestToBe(RollbackRequest.class, 5000L);
100+
} finally {
101+
executor.shutdown();
102+
}
103+
}
104+
105+
@Test
106+
public void testOrphanedTransactionWithFailedFirstQuery() throws Exception {
107+
ExecutorService executor = Executors.newCachedThreadPool();
108+
mockSpanner.setExecuteStreamingSqlExecutionTime(
109+
SimulatedExecutionTime.ofException(
110+
Status.INVALID_ARGUMENT.withDescription("table not found").asRuntimeException()));
111+
try (Spanner spanner = createSpanner()) {
112+
DatabaseClient client =
113+
spanner.getDatabaseClient(
114+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
115+
// Freeze the mock server to ensure that the request lands on the mock server before we
116+
// proceed.
117+
mockSpanner.freeze();
118+
AsyncTransactionManager manager = client.transactionManagerAsync();
119+
TransactionContextFuture context = manager.beginAsync();
120+
context.then(
121+
(txn, input) -> {
122+
try (AsyncResultSet resultSet = txn.executeQueryAsync(STATEMENT)) {
123+
resultSet.toListAsync(
124+
(Function<StructReader, Object>)
125+
row -> Objects.requireNonNull(row).getValue(0).getAsString(),
126+
executor);
127+
}
128+
return null;
129+
},
130+
executor);
131+
// Wait for the ExecuteSqlRequest to land on the mock server.
132+
mockSpanner.waitForRequestsToContain(
133+
input ->
134+
input instanceof ExecuteSqlRequest
135+
&& ((ExecuteSqlRequest) input).getSql().equals(STATEMENT.getSql()),
136+
5000L);
137+
// Now close the transaction. This will not trigger a Rollback, as the statement failed.
138+
// The closeResult will be done when the error for the failed statement is returned to the
139+
// client.
140+
ApiFuture<Void> closeResult = manager.closeAsync();
141+
mockSpanner.unfreeze();
142+
assertNull(closeResult.get());
143+
} finally {
144+
executor.shutdown();
145+
}
146+
}
147+
}

0 commit comments

Comments
 (0)