|
53 | 53 | import java.util.concurrent.atomic.AtomicInteger; |
54 | 54 | import java.util.concurrent.atomic.AtomicLong; |
55 | 55 | import java.util.concurrent.atomic.AtomicReference; |
| 56 | +import java.util.concurrent.locks.ReentrantLock; |
56 | 57 |
|
57 | 58 | /** |
58 | 59 | * {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message |
@@ -315,6 +316,18 @@ public void close() { |
315 | 316 | */ |
316 | 317 | private final AtomicBoolean unimplemented = new AtomicBoolean(false); |
317 | 318 |
|
| 319 | + /** |
| 320 | + * This flag is set to true if create session RPC is in progress. This flag prevents application |
| 321 | + * from firing two requests concurrently |
| 322 | + */ |
| 323 | + private final AtomicBoolean retryingSessionCreation = new AtomicBoolean(false); |
| 324 | + |
| 325 | + /** |
| 326 | + * This lock is used to prevent two threads from retrying createSession RPC requests in |
| 327 | + * concurrently. |
| 328 | + */ |
| 329 | + private final ReentrantLock sessionCreationLock = new ReentrantLock(); |
| 330 | + |
318 | 331 | /** |
319 | 332 | * This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is |
320 | 333 | * executed on a multiplexed session. TODO: Remove once this is guaranteed to be available. |
@@ -358,11 +371,20 @@ public void close() { |
358 | 371 | SettableApiFuture.create(); |
359 | 372 | this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create(); |
360 | 373 | this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture); |
| 374 | + asyncCreateMultiplexedSession(initialSessionReferenceFuture); |
| 375 | + maybeWaitForSessionCreation( |
| 376 | + sessionClient.getSpanner().getOptions().getSessionPoolOptions(), |
| 377 | + initialSessionReferenceFuture); |
| 378 | + } |
| 379 | + |
| 380 | + private void asyncCreateMultiplexedSession( |
| 381 | + SettableApiFuture<SessionReference> sessionReferenceFuture) { |
361 | 382 | this.sessionClient.asyncCreateMultiplexedSession( |
362 | 383 | new SessionConsumer() { |
363 | 384 | @Override |
364 | 385 | public void onSessionReady(SessionImpl session) { |
365 | | - initialSessionReferenceFuture.set(session.getSessionReference()); |
| 386 | + retryingSessionCreation.set(false); |
| 387 | + sessionReferenceFuture.set(session.getSessionReference()); |
366 | 388 | // only start the maintainer if we actually managed to create a session in the first |
367 | 389 | // place. |
368 | 390 | maintainer.start(); |
@@ -394,13 +416,11 @@ public void onSessionReady(SessionImpl session) { |
394 | 416 | public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) { |
395 | 417 | // Mark multiplexes sessions as unimplemented and fall back to regular sessions if |
396 | 418 | // UNIMPLEMENTED is returned. |
| 419 | + retryingSessionCreation.set(false); |
397 | 420 | maybeMarkUnimplemented(t); |
398 | | - initialSessionReferenceFuture.setException(t); |
| 421 | + sessionReferenceFuture.setException(t); |
399 | 422 | } |
400 | 423 | }); |
401 | | - maybeWaitForSessionCreation( |
402 | | - sessionClient.getSpanner().getOptions().getSessionPoolOptions(), |
403 | | - initialSessionReferenceFuture); |
404 | 424 | } |
405 | 425 |
|
406 | 426 | void setPool(SessionPool pool) { |
@@ -546,10 +566,35 @@ MultiplexedSessionMaintainer getMaintainer() { |
546 | 566 | return this.maintainer; |
547 | 567 | } |
548 | 568 |
|
| 569 | + ApiFuture<SessionReference> getCurrentSessionReferenceFuture() { |
| 570 | + return ApiFutures.catchingAsync( |
| 571 | + this.multiplexedSessionReference.get(), |
| 572 | + Throwable.class, |
| 573 | + (throwable) -> { |
| 574 | + maybeRetrySessionCreation(); |
| 575 | + return this.multiplexedSessionReference.get(); |
| 576 | + }, |
| 577 | + MoreExecutors.directExecutor()); |
| 578 | + } |
| 579 | + |
| 580 | + private void maybeRetrySessionCreation() { |
| 581 | + sessionCreationLock.lock(); |
| 582 | + try { |
| 583 | + if (isMultiplexedSessionsSupported() |
| 584 | + && retryingSessionCreation.compareAndSet(false, true)) { |
| 585 | + SettableApiFuture<SessionReference> settableApiFuture = SettableApiFuture.create(); |
| 586 | + asyncCreateMultiplexedSession(settableApiFuture); |
| 587 | + multiplexedSessionReference.set(settableApiFuture); |
| 588 | + } |
| 589 | + } finally { |
| 590 | + sessionCreationLock.unlock(); |
| 591 | + } |
| 592 | + } |
| 593 | + |
549 | 594 | @VisibleForTesting |
550 | 595 | SessionReference getCurrentSessionReference() { |
551 | 596 | try { |
552 | | - return this.multiplexedSessionReference.get().get(); |
| 597 | + return getCurrentSessionReferenceFuture().get(); |
553 | 598 | } catch (ExecutionException executionException) { |
554 | 599 | throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); |
555 | 600 | } catch (InterruptedException interruptedException) { |
@@ -587,28 +632,22 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) { |
587 | 632 |
|
588 | 633 | private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction( |
589 | 634 | boolean singleUse) { |
590 | | - try { |
591 | | - return new MultiplexedSessionTransaction( |
592 | | - this, |
593 | | - tracer.getCurrentSpan(), |
594 | | - // Getting the result of the SettableApiFuture that contains the multiplexed session will |
595 | | - // also automatically propagate any error that happened during the creation of the |
596 | | - // session, such as for example a DatabaseNotFound exception. We therefore do not need |
597 | | - // any special handling of such errors. |
598 | | - multiplexedSessionReference.get().get(), |
599 | | - singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT, |
600 | | - singleUse, |
601 | | - this.pool); |
602 | | - } catch (ExecutionException executionException) { |
603 | | - throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); |
604 | | - } catch (InterruptedException interruptedException) { |
605 | | - throw SpannerExceptionFactory.propagateInterrupt(interruptedException); |
606 | | - } |
| 635 | + return new MultiplexedSessionTransaction( |
| 636 | + this, |
| 637 | + tracer.getCurrentSpan(), |
| 638 | + // Getting the result of the SettableApiFuture that contains the multiplexed session will |
| 639 | + // also automatically propagate any error that happened during the creation of the |
| 640 | + // session, such as for example a DatabaseNotFound exception. We therefore do not need |
| 641 | + // any special handling of such errors. |
| 642 | + getCurrentSessionReference(), |
| 643 | + singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT, |
| 644 | + singleUse, |
| 645 | + this.pool); |
607 | 646 | } |
608 | 647 |
|
609 | 648 | private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() { |
610 | 649 | return new DelayedMultiplexedSessionTransaction( |
611 | | - this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool); |
| 650 | + this, tracer.getCurrentSpan(), getCurrentSessionReferenceFuture(), this.pool); |
612 | 651 | } |
613 | 652 |
|
614 | 653 | private int getSingleUseChannelHint() { |
|
0 commit comments