Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit c59c2a5

Browse files
chore: merge main into generate-libraries-main
2 parents fb0072d + 9263972 commit c59c2a5

3 files changed

Lines changed: 366 additions & 5 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@
5353
* transactions.
5454
*/
5555
final class MultiplexedSessionDatabaseClient extends AbstractMultiplexedSessionDatabaseClient {
56+
/**
57+
* The maximum number of attempts that the client will try to execute CreateSession for the
58+
* initial multiplexed session. This value is only used for the very first multiplexed session
59+
* that is created, and it is only used if the application has not set a waitForMinSessions value.
60+
* If waitForMinSessions has been set, then the client will retry until the duration in
61+
* waitForMinSessions has been reached.
62+
*/
63+
private static final int MAX_INITIAL_CREATE_SESSION_ATTEMPTS = 10;
64+
5665
@VisibleForTesting
5766
static final Statement DETERMINE_DIALECT_STATEMENT =
5867
Statement.newBuilder(
@@ -226,14 +235,19 @@ public void close() {
226235
final SettableApiFuture<SessionReference> initialSessionReferenceFuture =
227236
SettableApiFuture.create();
228237
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
229-
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
238+
239+
Duration waitDuration =
240+
sessionClient.getSpanner().getOptions().getSessionPoolOptions().getWaitForMinSessions();
241+
int initialAttempts =
242+
waitDuration == null || waitDuration.isZero() ? MAX_INITIAL_CREATE_SESSION_ATTEMPTS : 1;
243+
asyncCreateMultiplexedSession(initialSessionReferenceFuture, initialAttempts);
230244
maybeWaitForSessionCreation(
231245
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
232246
initialSessionReferenceFuture);
233247
}
234248

235249
private void asyncCreateMultiplexedSession(
236-
SettableApiFuture<SessionReference> sessionReferenceFuture) {
250+
SettableApiFuture<SessionReference> sessionReferenceFuture, int remainingAttempts) {
237251
this.sessionClient.asyncCreateMultiplexedSession(
238252
new SessionConsumer() {
239253
@Override
@@ -263,7 +277,15 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
263277
MultiplexedSessionDatabaseClient.this.resourceNotFoundException.set(
264278
(ResourceNotFoundException) spannerException);
265279
}
280+
// Set the exception to trigger an error for all waiters.
281+
// Then retry the session creation if the error is (potentially) transient.
266282
sessionReferenceFuture.setException(t);
283+
if (remainingAttempts > 1
284+
&& RETRYABLE_ERROR_CODES.contains(spannerException.getErrorCode())) {
285+
final SettableApiFuture<SessionReference> future = SettableApiFuture.create();
286+
MultiplexedSessionDatabaseClient.this.multiplexedSessionReference.set(future);
287+
asyncCreateMultiplexedSession(future, remainingAttempts - 1);
288+
}
267289
}
268290
});
269291
}
@@ -283,7 +305,7 @@ private void maybeWaitForSessionCreation(
283305
// If any exception is thrown, then retry the multiplexed session creation
284306
if (sessionReferenceFuture == null) {
285307
sessionReferenceFuture = SettableApiFuture.create();
286-
asyncCreateMultiplexedSession(sessionReferenceFuture);
308+
asyncCreateMultiplexedSession(sessionReferenceFuture, 1);
287309
this.multiplexedSessionReference.set(sessionReferenceFuture);
288310
}
289311
try {
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import com.google.cloud.NoCredentials;
24+
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
25+
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
26+
import com.google.cloud.spanner.connection.AbstractMockServerTest;
27+
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
28+
import com.google.common.collect.ImmutableList;
29+
import com.google.spanner.v1.BeginTransactionRequest;
30+
import com.google.spanner.v1.CommitRequest;
31+
import com.google.spanner.v1.ReadRequest;
32+
import io.grpc.ManagedChannelBuilder;
33+
import io.grpc.Status;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import org.junit.BeforeClass;
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
import org.junit.runners.JUnit4;
39+
40+
@RunWith(JUnit4.class)
41+
public class ExcludeFromChangeStreamTest extends AbstractMockServerTest {
42+
43+
@BeforeClass
44+
public static void setupReadResult() {
45+
RandomResultSetGenerator generator = new RandomResultSetGenerator(10);
46+
mockSpanner.putStatementResult(
47+
StatementResult.query(
48+
Statement.of("SELECT my-column FROM my-table WHERE 1=1"), generator.generate()));
49+
}
50+
51+
private Spanner createSpanner() {
52+
return SpannerOptions.newBuilder()
53+
.setProjectId("fake-project")
54+
.setHost("http://localhost:" + getPort())
55+
.setCredentials(NoCredentials.getInstance())
56+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
57+
.build()
58+
.getService();
59+
}
60+
61+
@Test
62+
public void testStandardTransaction() {
63+
try (Spanner spanner = createSpanner()) {
64+
for (int i = 0; i < 10; i++) {
65+
DatabaseClient client =
66+
spanner.getDatabaseClient(
67+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
68+
client
69+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
70+
.run(
71+
transaction -> {
72+
try (ResultSet resultSet =
73+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
74+
while (resultSet.next()) {}
75+
}
76+
transaction.buffer(
77+
Mutation.newInsertOrUpdateBuilder("my-table")
78+
.set("my-column")
79+
.to(1L)
80+
.build());
81+
return null;
82+
});
83+
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
84+
assertEquals(1, mockSpanner.countRequestsOfType(ReadRequest.class));
85+
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
86+
87+
ReadRequest readRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
88+
assertTrue(readRequest.hasTransaction());
89+
assertTrue(readRequest.getTransaction().hasBegin());
90+
assertTrue(readRequest.getTransaction().getBegin().hasReadWrite());
91+
assertTrue(readRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
92+
93+
CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0);
94+
assertNotNull(commitRequest.getTransactionId());
95+
96+
mockSpanner.clearRequests();
97+
}
98+
}
99+
}
100+
101+
@Test
102+
public void testTransactionAbortedDuringRead() {
103+
try (Spanner spanner = createSpanner()) {
104+
for (int i = 0; i < 10; i++) {
105+
DatabaseClient client =
106+
spanner.getDatabaseClient(
107+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
108+
AtomicBoolean hasAborted = new AtomicBoolean(false);
109+
client
110+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
111+
.run(
112+
transaction -> {
113+
if (hasAborted.compareAndSet(false, true)) {
114+
mockSpanner.abortNextStatement();
115+
}
116+
try (ResultSet resultSet =
117+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
118+
while (resultSet.next()) {}
119+
}
120+
transaction.buffer(
121+
Mutation.newInsertOrUpdateBuilder("my-table")
122+
.set("my-column")
123+
.to(1L)
124+
.build());
125+
return null;
126+
});
127+
assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
128+
assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class));
129+
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
130+
131+
BeginTransactionRequest beginRequest =
132+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class).get(0);
133+
assertTrue(beginRequest.getOptions().hasReadWrite());
134+
assertTrue(beginRequest.getOptions().getExcludeTxnFromChangeStreams());
135+
136+
ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
137+
assertTrue(firstReadRequest.hasTransaction());
138+
assertTrue(firstReadRequest.getTransaction().hasBegin());
139+
assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite());
140+
assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
141+
142+
ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1);
143+
assertTrue(secondReadRequest.hasTransaction());
144+
assertTrue(secondReadRequest.getTransaction().hasId());
145+
146+
CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0);
147+
assertNotNull(commitRequest.getTransactionId());
148+
149+
mockSpanner.clearRequests();
150+
}
151+
}
152+
}
153+
154+
@Test
155+
public void testTransactionAbortedDuringCommit() {
156+
try (Spanner spanner = createSpanner()) {
157+
for (int i = 0; i < 10; i++) {
158+
DatabaseClient client =
159+
spanner.getDatabaseClient(
160+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
161+
AtomicBoolean hasAborted = new AtomicBoolean(false);
162+
client
163+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
164+
.run(
165+
transaction -> {
166+
try (ResultSet resultSet =
167+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
168+
while (resultSet.next()) {}
169+
}
170+
if (hasAborted.compareAndSet(false, true)) {
171+
mockSpanner.abortNextStatement();
172+
}
173+
transaction.buffer(
174+
Mutation.newInsertOrUpdateBuilder("my-table")
175+
.set("my-column")
176+
.to(1L)
177+
.build());
178+
return null;
179+
});
180+
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
181+
assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class));
182+
assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class));
183+
184+
ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
185+
assertTrue(firstReadRequest.hasTransaction());
186+
assertTrue(firstReadRequest.getTransaction().hasBegin());
187+
assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite());
188+
assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
189+
190+
ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1);
191+
assertTrue(secondReadRequest.hasTransaction());
192+
assertTrue(secondReadRequest.getTransaction().hasBegin());
193+
assertTrue(secondReadRequest.getTransaction().getBegin().hasReadWrite());
194+
assertTrue(secondReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
195+
196+
for (CommitRequest commitRequest : mockSpanner.getRequestsOfType(CommitRequest.class)) {
197+
assertNotNull(commitRequest.getTransactionId());
198+
}
199+
mockSpanner.clearRequests();
200+
}
201+
}
202+
}
203+
204+
@Test
205+
public void testReadReturnsUnavailable() {
206+
207+
try (Spanner spanner = createSpanner()) {
208+
for (int i = 0; i < 10; i++) {
209+
mockSpanner.setStreamingReadExecutionTime(
210+
SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException()));
211+
DatabaseClient client =
212+
spanner.getDatabaseClient(
213+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
214+
client
215+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
216+
.run(
217+
transaction -> {
218+
try (ResultSet resultSet =
219+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
220+
while (resultSet.next()) {}
221+
}
222+
transaction.buffer(
223+
Mutation.newInsertOrUpdateBuilder("my-table")
224+
.set("my-column")
225+
.to(1L)
226+
.build());
227+
return null;
228+
});
229+
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
230+
assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class));
231+
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
232+
233+
ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
234+
assertTrue(firstReadRequest.hasTransaction());
235+
assertTrue(firstReadRequest.getTransaction().hasBegin());
236+
assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite());
237+
assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
238+
239+
ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1);
240+
assertTrue(secondReadRequest.hasTransaction());
241+
assertTrue(secondReadRequest.getTransaction().hasBegin());
242+
assertTrue(secondReadRequest.getTransaction().getBegin().hasReadWrite());
243+
assertTrue(secondReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
244+
245+
CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0);
246+
assertNotNull(commitRequest.getTransactionId());
247+
248+
mockSpanner.clearRequests();
249+
}
250+
}
251+
}
252+
253+
@Test
254+
public void testReadReturnsUnavailableHalfway() {
255+
try (Spanner spanner = createSpanner()) {
256+
for (int i = 0; i < 10; i++) {
257+
mockSpanner.setStreamingReadExecutionTime(
258+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 2));
259+
260+
DatabaseClient client =
261+
spanner.getDatabaseClient(
262+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
263+
client
264+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
265+
.run(
266+
transaction -> {
267+
try (ResultSet resultSet =
268+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
269+
while (resultSet.next()) {}
270+
}
271+
transaction.buffer(
272+
Mutation.newInsertOrUpdateBuilder("my-table")
273+
.set("my-column")
274+
.to(1L)
275+
.build());
276+
return null;
277+
});
278+
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
279+
assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class));
280+
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
281+
282+
ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
283+
assertTrue(firstReadRequest.hasTransaction());
284+
assertTrue(firstReadRequest.getTransaction().hasBegin());
285+
assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite());
286+
assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
287+
288+
ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1);
289+
assertTrue(secondReadRequest.hasTransaction());
290+
assertTrue(secondReadRequest.getTransaction().hasId());
291+
292+
CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0);
293+
assertNotNull(commitRequest.getTransactionId());
294+
295+
mockSpanner.clearRequests();
296+
}
297+
}
298+
}
299+
}

0 commit comments

Comments
 (0)