Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit e788f02

Browse files
fix: Retry multiplexed session failures
1 parent 3d585cf commit e788f02

4 files changed

Lines changed: 237 additions & 30 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.concurrent.atomic.AtomicInteger;
5454
import java.util.concurrent.atomic.AtomicLong;
5555
import java.util.concurrent.atomic.AtomicReference;
56+
import java.util.concurrent.locks.ReentrantLock;
5657

5758
/**
5859
* {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
@@ -315,6 +316,18 @@ public void close() {
315316
*/
316317
private final AtomicBoolean unimplemented = new AtomicBoolean(false);
317318

319+
/**
320+
* This flag is set to true if create session RPC is in progress. This flag prevents application
321+
* from firing two requests concurrently
322+
*/
323+
private final AtomicBoolean retryingSessionCreation = new AtomicBoolean(false);
324+
325+
/**
326+
* This lock is used to prevent two threads from retrying createSession RPC requests in
327+
* concurrently.
328+
*/
329+
private final ReentrantLock sessionCreationLock = new ReentrantLock();
330+
318331
/**
319332
* This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
320333
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
@@ -358,11 +371,20 @@ public void close() {
358371
SettableApiFuture.create();
359372
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
360373
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
374+
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
375+
maybeWaitForSessionCreation(
376+
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
377+
initialSessionReferenceFuture);
378+
}
379+
380+
private void asyncCreateMultiplexedSession(
381+
SettableApiFuture<SessionReference> sessionReferenceFuture) {
361382
this.sessionClient.asyncCreateMultiplexedSession(
362383
new SessionConsumer() {
363384
@Override
364385
public void onSessionReady(SessionImpl session) {
365-
initialSessionReferenceFuture.set(session.getSessionReference());
386+
retryingSessionCreation.set(false);
387+
sessionReferenceFuture.set(session.getSessionReference());
366388
// only start the maintainer if we actually managed to create a session in the first
367389
// place.
368390
maintainer.start();
@@ -394,13 +416,11 @@ public void onSessionReady(SessionImpl session) {
394416
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
395417
// Mark multiplexes sessions as unimplemented and fall back to regular sessions if
396418
// UNIMPLEMENTED is returned.
419+
retryingSessionCreation.set(false);
397420
maybeMarkUnimplemented(t);
398-
initialSessionReferenceFuture.setException(t);
421+
sessionReferenceFuture.setException(t);
399422
}
400423
});
401-
maybeWaitForSessionCreation(
402-
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
403-
initialSessionReferenceFuture);
404424
}
405425

406426
void setPool(SessionPool pool) {
@@ -546,10 +566,36 @@ MultiplexedSessionMaintainer getMaintainer() {
546566
return this.maintainer;
547567
}
548568

569+
ApiFuture<SessionReference> getCurrentSessionReferenceFuture() {
570+
return ApiFutures.catchingAsync(
571+
this.multiplexedSessionReference.get(),
572+
Throwable.class,
573+
(throwable) -> {
574+
maybeRetrySessionCreation();
575+
return this.multiplexedSessionReference.get();
576+
},
577+
MoreExecutors.directExecutor());
578+
}
579+
580+
private void maybeRetrySessionCreation() {
581+
sessionCreationLock.lock();
582+
try {
583+
if (isValid()
584+
&& isMultiplexedSessionsSupported()
585+
&& retryingSessionCreation.compareAndSet(false, true)) {
586+
SettableApiFuture<SessionReference> settableApiFuture = SettableApiFuture.create();
587+
asyncCreateMultiplexedSession(settableApiFuture);
588+
multiplexedSessionReference.set(settableApiFuture);
589+
}
590+
} finally {
591+
sessionCreationLock.unlock();
592+
}
593+
}
594+
549595
@VisibleForTesting
550596
SessionReference getCurrentSessionReference() {
551597
try {
552-
return this.multiplexedSessionReference.get().get();
598+
return getCurrentSessionReferenceFuture().get();
553599
} catch (ExecutionException executionException) {
554600
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
555601
} catch (InterruptedException interruptedException) {
@@ -587,28 +633,22 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) {
587633

588634
private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
589635
boolean singleUse) {
590-
try {
591-
return new MultiplexedSessionTransaction(
592-
this,
593-
tracer.getCurrentSpan(),
594-
// Getting the result of the SettableApiFuture that contains the multiplexed session will
595-
// also automatically propagate any error that happened during the creation of the
596-
// session, such as for example a DatabaseNotFound exception. We therefore do not need
597-
// any special handling of such errors.
598-
multiplexedSessionReference.get().get(),
599-
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
600-
singleUse,
601-
this.pool);
602-
} catch (ExecutionException executionException) {
603-
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
604-
} catch (InterruptedException interruptedException) {
605-
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
606-
}
636+
return new MultiplexedSessionTransaction(
637+
this,
638+
tracer.getCurrentSpan(),
639+
// Getting the result of the SettableApiFuture that contains the multiplexed session will
640+
// also automatically propagate any error that happened during the creation of the
641+
// session, such as for example a DatabaseNotFound exception. We therefore do not need
642+
// any special handling of such errors.
643+
getCurrentSessionReference(),
644+
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
645+
singleUse,
646+
this.pool);
607647
}
608648

609649
private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() {
610650
return new DelayedMultiplexedSessionTransaction(
611-
this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool);
651+
this, tracer.getCurrentSpan(), getCurrentSessionReferenceFuture(), this.pool);
612652
}
613653

614654
private int getSingleUseChannelHint() {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3202,7 +3202,8 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() {
32023202
mockSpanner.unfreeze();
32033203
assertThrows(ResourceNotFoundException.class, rs::next);
32043204
// The server should only receive one BatchCreateSessions request.
3205-
assertThat(mockSpanner.getRequests()).hasSize(1);
3205+
// If multiplexed session used, it will be retried once so 2 CreateSession requests
3206+
assertThat(mockSpanner.getRequests()).hasSize(useMultiplexedSession ? 2 : 1);
32063207
}
32073208
assertThrows(
32083209
ResourceNotFoundException.class,
@@ -3221,9 +3222,9 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() {
32213222
} else {
32223223
// Note that in case of the use of regular sessions, then we have 1 request:
32233224
// BatchCreateSessions for the session pool.
3224-
// Note that in case of the use of multiplexed sessions for read-write, then we have 1
3225-
// request: CreateSession for the multiplexed session.
3226-
assertThat(mockSpanner.getRequests()).hasSize(1);
3225+
// Note that in case of the use of multiplexed sessions for read-write, then we have 3
3226+
// requests: CreateSession for the multiplexed session.
3227+
assertThat(mockSpanner.getRequests()).hasSize(useMultiplexedSession ? 3 : 1);
32273228
}
32283229
}
32293230
}
@@ -3413,7 +3414,9 @@ public void testGetInvalidatedClientMultipleTimes() {
34133414
if (spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession()) {
34143415
// We should only receive 1 CreateSession request. The query should never be executed,
34153416
// as the session creation fails before it gets to executing a query.
3416-
assertEquals(1, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
3417+
assertEquals(
3418+
2 + (2 * run) + useClient,
3419+
mockSpanner.countRequestsOfType(CreateSessionRequest.class));
34173420
assertEquals(0, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
34183421
} else {
34193422
// The server should only receive one BatchCreateSessions request for each run as we

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import java.time.Duration;
5555
import java.util.*;
5656
import java.util.concurrent.CountDownLatch;
57+
import java.util.concurrent.ExecutorService;
58+
import java.util.concurrent.Executors;
5759
import java.util.concurrent.TimeUnit;
5860
import java.util.concurrent.atomic.AtomicInteger;
5961
import java.util.concurrent.atomic.AtomicReference;
@@ -245,6 +247,168 @@ public void testUnimplementedErrorOnCreation_fallsBackToRegularSessions() {
245247
assertEquals(0L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
246248
}
247249

250+
@Test
251+
public void testDeadlineExceededErrorWithOneRetry() {
252+
// Setting up two exceptions
253+
mockSpanner.setCreateSessionExecutionTime(
254+
SimulatedExecutionTime.ofExceptions(
255+
Arrays.asList(
256+
Status.DEADLINE_EXCEEDED
257+
.withDescription(
258+
"CallOptions deadline exceeded after 22.986872393s. "
259+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
260+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
261+
.asRuntimeException(),
262+
Status.DEADLINE_EXCEEDED
263+
.withDescription(
264+
"CallOptions deadline exceeded after 22.986872393s. "
265+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
266+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
267+
.asRuntimeException())));
268+
DatabaseClientImpl client =
269+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
270+
assertNotNull(client.multiplexedSessionDatabaseClient);
271+
272+
// initial fetch call fails with exception
273+
// this call will try to fetch it again which again throws an exception
274+
assertThrows(
275+
SpannerException.class,
276+
() -> {
277+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
278+
//noinspection StatementWithEmptyBody
279+
while (resultSet.next()) {
280+
// ignore
281+
}
282+
}
283+
});
284+
285+
// When third request comes it should succeed
286+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
287+
//noinspection StatementWithEmptyBody
288+
while (resultSet.next()) {
289+
// ignore
290+
}
291+
}
292+
293+
// Verify that we received one ExecuteSqlRequest, and that it used a multiplexed session.
294+
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
295+
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
296+
297+
Session session = mockSpanner.getSession(requests.get(0).getSession());
298+
assertNotNull(session);
299+
assertTrue(session.getMultiplexed());
300+
301+
assertNotNull(client.multiplexedSessionDatabaseClient);
302+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
303+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
304+
}
305+
306+
@Test
307+
public void testDeadlineExceededErrorWithOneRetryWithParallelRequests()
308+
throws InterruptedException {
309+
mockSpanner.setCreateSessionExecutionTime(
310+
SimulatedExecutionTime.ofExceptions(
311+
Arrays.asList(
312+
Status.DEADLINE_EXCEEDED
313+
.withDescription(
314+
"CallOptions deadline exceeded after 22.986872393s. "
315+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
316+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
317+
.asRuntimeException(),
318+
Status.DEADLINE_EXCEEDED
319+
.withDescription(
320+
"CallOptions deadline exceeded after 22.986872393s. "
321+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
322+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
323+
.asRuntimeException())));
324+
325+
SpannerOptions spannerOptions =
326+
SpannerOptions.newBuilder()
327+
.setProjectId("test-project")
328+
.setCredentials(NoCredentials.getInstance())
329+
.setChannelProvider(channelProvider)
330+
.setSessionPoolOption(
331+
SessionPoolOptions.newBuilder()
332+
.setMultiplexedSessionMaintenanceDuration(Duration.ofMinutes(10))
333+
.build())
334+
.build();
335+
336+
Spanner testSpanner = spannerOptions.getService();
337+
DatabaseClientImpl client =
338+
(DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
339+
assertNotNull(client.multiplexedSessionDatabaseClient);
340+
341+
ExecutorService executor = Executors.newCachedThreadPool();
342+
343+
// First set of request should fail with an error
344+
CountDownLatch failureCountDownLatch = new CountDownLatch(3);
345+
for (int i = 0; i < 3; i++) {
346+
if (i == 0) {
347+
mockSpanner.freeze();
348+
}
349+
executor.submit(
350+
() -> {
351+
try {
352+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
353+
//noinspection StatementWithEmptyBody
354+
while (resultSet.next()) {
355+
// ignore
356+
}
357+
}
358+
} catch (SpannerException e) {
359+
failureCountDownLatch.countDown();
360+
}
361+
});
362+
if (i == 2) {
363+
mockSpanner.unfreeze();
364+
}
365+
}
366+
367+
assertTrue(failureCountDownLatch.await(2, TimeUnit.SECONDS));
368+
assertEquals(0, failureCountDownLatch.getCount());
369+
370+
// Second set of requests should pass
371+
CountDownLatch countDownLatch = new CountDownLatch(3);
372+
for (int i = 0; i < 3; i++) {
373+
if (i == 0) {
374+
mockSpanner.freeze();
375+
}
376+
executor.submit(
377+
() -> {
378+
try {
379+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
380+
//noinspection StatementWithEmptyBody
381+
while (resultSet.next()) {
382+
// ignore
383+
}
384+
}
385+
} catch (SpannerException e) {
386+
countDownLatch.countDown();
387+
}
388+
});
389+
if (i == 2) {
390+
mockSpanner.unfreeze();
391+
}
392+
}
393+
394+
assertFalse(countDownLatch.await(2, TimeUnit.SECONDS));
395+
assertEquals(3, countDownLatch.getCount());
396+
397+
// Verify that we received 3 ExecuteSqlRequest, and that it used a multiplexed session.
398+
assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
399+
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
400+
401+
Session session = mockSpanner.getSession(requests.get(0).getSession());
402+
assertNotNull(session);
403+
assertTrue(session.getMultiplexed());
404+
405+
assertNotNull(client.multiplexedSessionDatabaseClient);
406+
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
407+
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
408+
409+
testSpanner.close();
410+
}
411+
248412
@Test
249413
public void
250414
testUnimplementedErrorOnCreation_firstReceivesError_secondFallsBackToRegularSessions() {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ public void testNoNetworkConnection() {
339339

340340
// Attempt count should have a failed metric point for CreateSession.
341341
assertEquals(
342-
1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0);
342+
2, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0);
343343
assertTrue(
344344
checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME));
345345
assertTrue(

0 commit comments

Comments
 (0)