Skip to content

Commit b0a41a8

Browse files
committed
Move single-thread Scheduler to ConnectionContext
1 parent a3886d7 commit b0a41a8

3 files changed

Lines changed: 17 additions & 13 deletions

File tree

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.time.Duration;
2121
import java.util.Optional;
2222
import reactor.core.publisher.Mono;
23+
import reactor.core.scheduler.Scheduler;
2324
import reactor.netty.http.client.HttpClient;
2425

2526
/**
@@ -52,6 +53,11 @@ public interface ConnectionContext {
5253
*/
5354
RootProvider getRootProvider();
5455

56+
/**
57+
* The {@link Scheduler} to use for token operations
58+
*/
59+
Scheduler getTokenScheduler();
60+
5561
/**
5662
* Attempt to explicitly trust the TLS certificate of an endpoint. Implementations can choose whether any actual trusting will happen.
5763
*

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434
import reactor.core.publisher.Mono;
35+
import reactor.core.scheduler.Scheduler;
36+
import reactor.core.scheduler.Schedulers;
3537
import reactor.netty.http.client.HttpClient;
3638
import reactor.netty.resources.ConnectionProvider;
3739
import reactor.netty.resources.LoopResources;
@@ -74,6 +76,7 @@ abstract class _DefaultConnectionContext implements ConnectionContext {
7476
@PreDestroy
7577
public final void dispose() {
7678
getConnectionProvider().ifPresent(ConnectionProvider::dispose);
79+
getTokenScheduler().dispose();
7780
getThreadPool().dispose();
7881

7982
try {
@@ -154,6 +157,12 @@ public Mono<Void> trust(String host, int port) {
154157
.orElse(Mono.empty());
155158
}
156159

160+
@Override
161+
@Value.Derived
162+
public Scheduler getTokenScheduler() {
163+
return Schedulers.newSingle(String.format("token-provider-%s/%d", getApiHost(), getPort().orElse(DEFAULT_PORT)));
164+
}
165+
157166
/**
158167
* Additional configuration for the underlying HttpClient
159168
*/

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import java.util.function.BiConsumer;
3838
import java.util.function.Consumer;
3939
import java.util.function.Function;
40-
import reactor.core.scheduler.Scheduler;
41-
import reactor.core.scheduler.Schedulers;
4240
import org.cloudfoundry.Nullable;
4341
import org.cloudfoundry.reactor.ConnectionContext;
4442
import org.cloudfoundry.reactor.TokenProvider;
@@ -85,9 +83,6 @@ public abstract class AbstractUaaTokenProvider implements TokenProvider {
8583
private final ConcurrentMap<ConnectionContext, Mono<String>> refreshTokens =
8684
new ConcurrentHashMap<>(1);
8785

88-
private final ConcurrentMap<ConnectionContext, Scheduler> tokenSchedulers =
89-
new ConcurrentHashMap<>(1);
90-
9186
private final ConcurrentMap<ConnectionContext, Mono<String>> activeTokenRequests =
9287
new ConcurrentHashMap<>(1);
9388

@@ -323,16 +318,10 @@ private Mono<String> token(final ConnectionContext connectionContext) {
323318
* simply is to be considered waste.
324319
*
325320
* The coding below fixes both issues: It ensures that the execution of the Mono
326-
* is synchronized and it ensures that two threads arriving to fetch the JWT in a
321+
* is synchronized and it ensures that two threads arriving to fetch the JWT in a
327322
* non-caching situation does not trigger "wasteful" requests to the UAA server.
328323
*/
329324

330-
// Get or create a single-threaded scheduler for this connection context
331-
final Scheduler tokenScheduler = this.tokenSchedulers.computeIfAbsent(
332-
connectionContext,
333-
ctx -> Schedulers.newSingle("token-" + ctx.hashCode())
334-
);
335-
336325
/*
337326
* We use Mono.defer to ensure that the execution of the locking not happens
338327
* during creation of the Mono (where it is of little relevance), but
@@ -348,7 +337,7 @@ private Mono<String> token(final ConnectionContext connectionContext) {
348337

349338
// Create new token request with proper synchronization
350339
final Mono<String> baseTokenRequest = createTokenRequest(connectionContext)
351-
.publishOn(tokenScheduler) // Ensure execution on single thread
340+
.publishOn(connectionContext.getTokenScheduler()) // Ensure execution on single thread
352341
.doOnSubscribe(s -> LOGGER.debug("Starting new token request"))
353342
.doOnSuccess(token -> LOGGER.debug("Token request completed successfully"))
354343
.doOnError(error -> LOGGER.debug("Token request failed", error))

0 commit comments

Comments
 (0)