Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit ef80e7b

Browse files
committed
chore: integrate location aware routing with RPCs
1 parent 9263839 commit ef80e7b

8 files changed

Lines changed: 11211 additions & 61 deletions

File tree

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.spi.v1;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.spanner.v1.CacheUpdate;
21+
import com.google.spanner.v1.DirectedReadOptions;
22+
import com.google.spanner.v1.ExecuteSqlRequest;
23+
import com.google.spanner.v1.ReadRequest;
24+
import com.google.spanner.v1.RoutingHint;
25+
import com.google.spanner.v1.TransactionSelector;
26+
import java.util.Objects;
27+
import java.util.concurrent.atomic.AtomicLong;
28+
29+
/**
30+
* Finds a server for a request using location-aware routing metadata.
31+
*
32+
* <p>This component is per-database and maintains both recipe and range caches.
33+
*/
34+
@InternalApi
35+
public final class ChannelFinder {
36+
private final Object updateLock = new Object();
37+
private final AtomicLong databaseId = new AtomicLong();
38+
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
39+
private final KeyRangeCache rangeCache;
40+
private final String databaseUri;
41+
42+
public ChannelFinder(ChannelEndpointCache endpointCache, String databaseUri) {
43+
this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache));
44+
this.databaseUri = Objects.requireNonNull(databaseUri);
45+
}
46+
47+
void useDeterministicRandom() {
48+
rangeCache.useDeterministicRandom();
49+
}
50+
51+
public void update(CacheUpdate update) {
52+
synchronized (updateLock) {
53+
long currentId = databaseId.get();
54+
if (currentId != update.getDatabaseId()) {
55+
if (currentId != 0) {
56+
recipeCache.clear();
57+
rangeCache.clear();
58+
}
59+
databaseId.set(update.getDatabaseId());
60+
}
61+
if (update.hasKeyRecipes()) {
62+
recipeCache.addRecipes(update.getKeyRecipes());
63+
}
64+
rangeCache.addRanges(update);
65+
}
66+
}
67+
68+
public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
69+
recipeCache.computeKeys(reqBuilder);
70+
return fillRoutingHint(
71+
reqBuilder.getTransaction(),
72+
reqBuilder.getDirectedReadOptions(),
73+
KeyRangeCache.RangeMode.COVERING_SPLIT,
74+
reqBuilder.getRoutingHintBuilder());
75+
}
76+
77+
public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) {
78+
recipeCache.computeKeys(reqBuilder);
79+
return fillRoutingHint(
80+
reqBuilder.getTransaction(),
81+
reqBuilder.getDirectedReadOptions(),
82+
KeyRangeCache.RangeMode.PICK_RANDOM,
83+
reqBuilder.getRoutingHintBuilder());
84+
}
85+
86+
private ChannelEndpoint fillRoutingHint(
87+
TransactionSelector transactionSelector,
88+
DirectedReadOptions directedReadOptions,
89+
KeyRangeCache.RangeMode rangeMode,
90+
RoutingHint.Builder hintBuilder) {
91+
long id = databaseId.get();
92+
if (id == 0) {
93+
return null;
94+
}
95+
hintBuilder.setDatabaseId(id);
96+
return rangeCache.fillRoutingHint(
97+
preferLeader(transactionSelector), rangeMode, directedReadOptions, hintBuilder);
98+
}
99+
100+
private static boolean preferLeader(TransactionSelector selector) {
101+
switch (selector.getSelectorCase()) {
102+
case BEGIN:
103+
return !selector.getBegin().hasReadOnly() || selector.getBegin().getReadOnly().getStrong();
104+
case SINGLE_USE:
105+
if (!selector.getSingleUse().hasReadOnly()) {
106+
return true;
107+
}
108+
return selector.getSingleUse().getReadOnly().getStrong();
109+
case ID:
110+
case SELECTOR_NOT_SET:
111+
default:
112+
return true;
113+
}
114+
}
115+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.google.api.gax.rpc.UnavailableException;
5858
import com.google.api.gax.rpc.WatchdogProvider;
5959
import com.google.api.pathtemplate.PathTemplate;
60+
import com.google.auth.Credentials;
6061
import com.google.cloud.RetryHelper;
6162
import com.google.cloud.RetryHelper.RetryHelperException;
6263
import com.google.cloud.grpc.GcpManagedChannel;
@@ -209,6 +210,7 @@
209210
import java.util.concurrent.ConcurrentLinkedDeque;
210211
import java.util.concurrent.ConcurrentMap;
211212
import java.util.concurrent.ExecutionException;
213+
import java.util.concurrent.Executor;
212214
import java.util.concurrent.ExecutorService;
213215
import java.util.concurrent.Executors;
214216
import java.util.concurrent.Future;
@@ -223,6 +225,8 @@
223225
public class GapicSpannerRpc implements SpannerRpc {
224226
private static final PathTemplate PROJECT_NAME_TEMPLATE =
225227
PathTemplate.create("projects/{project}");
228+
private static final String EXPERIMENTAL_LOCATION_API_ENV_VAR =
229+
"GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API";
226230
private static final PathTemplate OPERATION_NAME_TEMPLATE =
227231
PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}");
228232
private static final int MAX_MESSAGE_SIZE = 256 * 1024 * 1024;
@@ -285,6 +289,96 @@ public class GapicSpannerRpc implements SpannerRpc {
285289

286290
private final GrpcCallContext baseGrpcCallContext;
287291

292+
private static final class KeyAwareTransportChannelProvider implements TransportChannelProvider {
293+
private final InstantiatingGrpcChannelProvider baseProvider;
294+
295+
private KeyAwareTransportChannelProvider(InstantiatingGrpcChannelProvider.Builder builder) {
296+
this.baseProvider = builder.build();
297+
}
298+
299+
private KeyAwareTransportChannelProvider(InstantiatingGrpcChannelProvider baseProvider) {
300+
this.baseProvider = baseProvider;
301+
}
302+
303+
@Override
304+
public GrpcTransportChannel getTransportChannel() throws IOException {
305+
return GrpcTransportChannel.newBuilder()
306+
.setManagedChannel(KeyAwareChannel.create(baseProvider))
307+
.build();
308+
}
309+
310+
@Override
311+
public String getTransportName() {
312+
return baseProvider.getTransportName();
313+
}
314+
315+
@Override
316+
public boolean needsEndpoint() {
317+
return baseProvider.needsEndpoint();
318+
}
319+
320+
@Override
321+
public boolean needsCredentials() {
322+
return baseProvider.needsCredentials();
323+
}
324+
325+
@Override
326+
public boolean needsExecutor() {
327+
return baseProvider.needsExecutor();
328+
}
329+
330+
@Override
331+
public boolean needsHeaders() {
332+
return baseProvider.needsHeaders();
333+
}
334+
335+
@Override
336+
public boolean shouldAutoClose() {
337+
return baseProvider.shouldAutoClose();
338+
}
339+
340+
@Override
341+
public TransportChannelProvider withEndpoint(String endpoint) {
342+
return new KeyAwareTransportChannelProvider(
343+
(InstantiatingGrpcChannelProvider) baseProvider.withEndpoint(endpoint));
344+
}
345+
346+
@Override
347+
public TransportChannelProvider withCredentials(Credentials credentials) {
348+
return new KeyAwareTransportChannelProvider(
349+
(InstantiatingGrpcChannelProvider) baseProvider.withCredentials(credentials));
350+
}
351+
352+
@Override
353+
public TransportChannelProvider withHeaders(Map<String, String> headers) {
354+
return new KeyAwareTransportChannelProvider(
355+
(InstantiatingGrpcChannelProvider) baseProvider.withHeaders(headers));
356+
}
357+
358+
@Override
359+
public TransportChannelProvider withPoolSize(int poolSize) {
360+
return new KeyAwareTransportChannelProvider(
361+
(InstantiatingGrpcChannelProvider) baseProvider.withPoolSize(poolSize));
362+
}
363+
364+
@Override
365+
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
366+
return new KeyAwareTransportChannelProvider(
367+
(InstantiatingGrpcChannelProvider) baseProvider.withExecutor(executor));
368+
}
369+
370+
@Override
371+
public TransportChannelProvider withExecutor(Executor executor) {
372+
return new KeyAwareTransportChannelProvider(
373+
(InstantiatingGrpcChannelProvider) baseProvider.withExecutor(executor));
374+
}
375+
376+
@Override
377+
public boolean acceptsPoolSize() {
378+
return baseProvider.acceptsPoolSize();
379+
}
380+
}
381+
288382
public static GapicSpannerRpc create(SpannerOptions options) {
289383
return new GapicSpannerRpc(options);
290384
}
@@ -393,9 +487,13 @@ public GapicSpannerRpc(final SpannerOptions options) {
393487
// If it is enabled in options uses the channel pool provided by the gRPC-GCP extension.
394488
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);
395489

490+
boolean enableLocationApi =
491+
Boolean.parseBoolean(System.getenv(EXPERIMENTAL_LOCATION_API_ENV_VAR));
396492
TransportChannelProvider channelProvider =
397-
MoreObjects.firstNonNull(
398-
options.getChannelProvider(), defaultChannelProviderBuilder.build());
493+
enableLocationApi
494+
? new KeyAwareTransportChannelProvider(defaultChannelProviderBuilder)
495+
: MoreObjects.firstNonNull(
496+
options.getChannelProvider(), defaultChannelProviderBuilder.build());
399497

400498
CredentialsProvider credentialsProvider =
401499
GrpcTransportOptions.setUpCredentialsProvider(options);

0 commit comments

Comments
 (0)