Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 19f3130

Browse files
fix(spanner): Retry creation of multiplexed session
1 parent aec0515 commit 19f3130

3 files changed

Lines changed: 328 additions & 22 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@
3939
import java.time.Clock;
4040
import java.time.Duration;
4141
import java.time.Instant;
42+
import java.util.Arrays;
4243
import java.util.BitSet;
44+
import java.util.EnumSet;
4345
import java.util.HashMap;
46+
import java.util.List;
4447
import java.util.Map;
4548
import java.util.concurrent.ExecutionException;
4649
import java.util.concurrent.Executors;
@@ -262,6 +265,10 @@ public void close() {
262265
*/
263266
private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>();
264267

268+
private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES =
269+
EnumSet.of(
270+
ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE);
271+
265272
private final BitSet channelUsage;
266273

267274
private final int numChannels;
@@ -358,11 +365,19 @@ public void close() {
358365
SettableApiFuture.create();
359366
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
360367
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
368+
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
369+
maybeWaitForSessionCreation(
370+
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
371+
initialSessionReferenceFuture);
372+
}
373+
374+
private void asyncCreateMultiplexedSession(
375+
SettableApiFuture<SessionReference> sessionReferenceFuture) {
361376
this.sessionClient.asyncCreateMultiplexedSession(
362377
new SessionConsumer() {
363378
@Override
364379
public void onSessionReady(SessionImpl session) {
365-
initialSessionReferenceFuture.set(session.getSessionReference());
380+
sessionReferenceFuture.set(session.getSessionReference());
366381
// only start the maintainer if we actually managed to create a session in the first
367382
// place.
368383
maintainer.start();
@@ -395,33 +410,59 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
395410
// Mark multiplexes sessions as unimplemented and fall back to regular sessions if
396411
// UNIMPLEMENTED is returned.
397412
maybeMarkUnimplemented(t);
398-
initialSessionReferenceFuture.setException(t);
413+
sessionReferenceFuture.setException(t);
399414
}
400415
});
401-
maybeWaitForSessionCreation(
402-
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
403-
initialSessionReferenceFuture);
404416
}
405417

406418
void setPool(SessionPool pool) {
407419
this.pool = pool;
408420
}
409421

410-
private static void maybeWaitForSessionCreation(
411-
SessionPoolOptions sessionPoolOptions, ApiFuture<SessionReference> future) {
422+
private void maybeWaitForSessionCreation(
423+
SessionPoolOptions sessionPoolOptions, SettableApiFuture<SessionReference> future) {
412424
Duration waitDuration = sessionPoolOptions.getWaitForMinSessions();
425+
SpannerException lastException = null;
426+
SettableApiFuture<SessionReference> sessionReferenceFuture = future;
427+
413428
if (waitDuration != null && !waitDuration.isZero()) {
414-
long timeoutMillis = waitDuration.toMillis();
415-
try {
416-
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
417-
} catch (ExecutionException executionException) {
418-
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
419-
} catch (InterruptedException interruptedException) {
420-
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
421-
} catch (TimeoutException timeoutException) {
422-
throw SpannerExceptionFactory.newSpannerException(
423-
ErrorCode.DEADLINE_EXCEEDED,
424-
"Timed out after waiting " + timeoutMillis + "ms for multiplexed session creation");
429+
Instant endTime = Instant.now().plus(waitDuration);
430+
while (Instant.now().isBefore(endTime)) {
431+
// If any exception is thrown, then retry the multiplexed session creation
432+
if (sessionReferenceFuture == null) {
433+
sessionReferenceFuture = SettableApiFuture.create();
434+
asyncCreateMultiplexedSession(sessionReferenceFuture);
435+
this.multiplexedSessionReference.set(sessionReferenceFuture);
436+
}
437+
// Calculate the remaining time pending for the future to wait for multiplexed session
438+
Duration remainingTime = Duration.between(Instant.now(), endTime);
439+
try {
440+
sessionReferenceFuture.get(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
441+
lastException = null;
442+
break;
443+
} catch (ExecutionException executionException) {
444+
lastException = SpannerExceptionFactory.asSpannerException(executionException.getCause());
445+
} catch (InterruptedException interruptedException) {
446+
lastException = SpannerExceptionFactory.propagateInterrupt(interruptedException);
447+
} catch (TimeoutException timeoutException) {
448+
lastException =
449+
SpannerExceptionFactory.newSpannerException(
450+
ErrorCode.DEADLINE_EXCEEDED,
451+
"Timed out after waiting "
452+
+ waitDuration.toMillis()
453+
+ "ms for multiplexed session creation");
454+
}
455+
// if any exception is thrown, then set the session reference to null to retry the
456+
// multiplexed session creation only if the error code is DEADLINE EXCEEDED, UNAVAILABLE or
457+
// RESOURCE_EXHAUSTED
458+
if (lastException != null && RETRYABLE_ERROR_CODES.contains(lastException.getErrorCode())) {
459+
sessionReferenceFuture = null;
460+
}
461+
}
462+
// if the wait time elapsed and multiplexed session fetch failed then throw the last exception
463+
// that we have received
464+
if (lastException != null) {
465+
throw lastException;
425466
}
426467
}
427468
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -542,16 +542,16 @@ void simulateExecutionTime(
542542
boolean stickyGlobalExceptions,
543543
CountDownLatch freezeLock) {
544544
Uninterruptibles.awaitUninterruptibly(freezeLock);
545-
checkException(globalExceptions, stickyGlobalExceptions);
546-
if (streamIndices.isEmpty()) {
547-
checkException(this.exceptions, stickyException);
548-
}
549545
if (minimumExecutionTime > 0 || randomExecutionTime > 0) {
550546
Uninterruptibles.sleepUninterruptibly(
551547
(randomExecutionTime == 0 ? 0 : RANDOM.nextInt(randomExecutionTime))
552548
+ minimumExecutionTime,
553549
TimeUnit.MILLISECONDS);
554550
}
551+
checkException(globalExceptions, stickyGlobalExceptions);
552+
if (streamIndices.isEmpty()) {
553+
checkException(this.exceptions, stickyException);
554+
}
555555
}
556556

557557
private static void checkException(Queue<Exception> exceptions, boolean keepException) {

0 commit comments

Comments
 (0)