Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit c20f4de

Browse files
rahul2393claude
andcommitted
feat: add KeyAwareChannel and location-aware routing integration
This commit adds the KeyAwareChannel class and integrates all components for location-aware routing in the Spanner client. Key components: - ChannelFinder: Orchestrates routing decisions using caches - KeyAwareChannel: Custom ManagedChannel that intercepts key-aware methods - SpannerOptions integration: Enables feature via setExperimentalHost() - GapicSpannerRpc modifications: Wire in KeyAwareChannel Activation requires: - SpannerOptions.Builder.setExperimentalHost() with location-aware endpoint - GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API=true environment variable This is part of the experimental location-aware routing for improved latency. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 9765d6b commit c20f4de

15 files changed

Lines changed: 840 additions & 12 deletions

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ public static GcpChannelPoolOptions createDefaultDynamicChannelPoolOptions() {
257257
private final boolean enableEndToEndTracing;
258258
private final String monitoringHost;
259259
private final TransactionOptions defaultTransactionOptions;
260+
private final boolean isExperimentalHost;
260261

261262
enum TracingFramework {
262263
OPEN_CENSUS,
@@ -922,6 +923,12 @@ protected SpannerOptions(Builder builder) {
922923
enableEndToEndTracing = builder.enableEndToEndTracing;
923924
monitoringHost = builder.monitoringHost;
924925
defaultTransactionOptions = builder.defaultTransactionOptions;
926+
isExperimentalHost = builder.isExperimentalHost;
927+
}
928+
929+
/** Returns true if an experimental host is configured. */
930+
public boolean isExperimentalHost() {
931+
return isExperimentalHost;
925932
}
926933

927934
private String getResolvedUniverseDomain() {
@@ -987,6 +994,15 @@ default String getMonitoringHost() {
987994
default GoogleCredentials getDefaultExperimentalHostCredentials() {
988995
return null;
989996
}
997+
998+
/**
999+
* Returns true if the experimental location API (SpanFE bypass) should be enabled. When
1000+
* enabled, the client will use location-aware routing to send requests directly to the
1001+
* appropriate Spanner server.
1002+
*/
1003+
default boolean isEnableLocationApi() {
1004+
return false;
1005+
}
9901006
}
9911007

9921008
static final String DEFAULT_SPANNER_EXPERIMENTAL_HOST_CREDENTIALS =
@@ -1011,6 +1027,8 @@ private static class SpannerEnvironmentImpl implements SpannerEnvironment {
10111027
private static final String SPANNER_DISABLE_DIRECT_ACCESS_GRPC_BUILTIN_METRICS =
10121028
"SPANNER_DISABLE_DIRECT_ACCESS_GRPC_BUILTIN_METRICS";
10131029
private static final String SPANNER_MONITORING_HOST = "SPANNER_MONITORING_HOST";
1030+
private static final String GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API =
1031+
"GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API";
10141032

10151033
private SpannerEnvironmentImpl() {}
10161034

@@ -1069,6 +1087,11 @@ public String getMonitoringHost() {
10691087
public GoogleCredentials getDefaultExperimentalHostCredentials() {
10701088
return getOAuthTokenFromFile(System.getenv(DEFAULT_SPANNER_EXPERIMENTAL_HOST_CREDENTIALS));
10711089
}
1090+
1091+
@Override
1092+
public boolean isEnableLocationApi() {
1093+
return Boolean.parseBoolean(System.getenv(GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API));
1094+
}
10721095
}
10731096

10741097
/** Builder for {@link SpannerOptions} instances. */
@@ -1141,6 +1164,7 @@ public static class Builder
11411164
private SslContext mTLSContext = null;
11421165
private String experimentalHost = null;
11431166
private boolean usePlainText = false;
1167+
private boolean isExperimentalHost = false;
11441168
private TransactionOptions defaultTransactionOptions = TransactionOptions.getDefaultInstance();
11451169

11461170
private static String createCustomClientLibToken(String token) {
@@ -1689,6 +1713,7 @@ public Builder setExperimentalHost(String host) {
16891713
super.setProjectId(EXPERIMENTAL_HOST_PROJECT_ID);
16901714
setSessionPoolOption(SessionPoolOptions.newBuilder().setExperimentalHost().build());
16911715
this.experimentalHost = host;
1716+
this.isExperimentalHost = true;
16921717
return this;
16931718
}
16941719

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.spi.v1;
18+
19+
import com.google.spanner.v1.CacheUpdate;
20+
import com.google.spanner.v1.ReadRequest;
21+
import com.google.spanner.v1.RoutingHint;
22+
import java.util.concurrent.locks.ReadWriteLock;
23+
import java.util.concurrent.locks.ReentrantReadWriteLock;
24+
import javax.annotation.Nullable;
25+
26+
/**
27+
* ChannelFinder is responsible for finding the correct Spanner server to route RPCs to.
28+
*
29+
* <p>It uses a {@link KeyRecipeCache} and a {@link KeyRangeCache} to store metadata about the
30+
* database, including key recipes and range information. This metadata is updated through the
31+
* {@link #update(CacheUpdate)} method.
32+
*
33+
* <p>The {@link #findServer(ReadRequest.Builder)} method is used to determine the appropriate
34+
* server for a given read request.
35+
*/
36+
public final class ChannelFinder {
37+
private final String deployment;
38+
private final String databaseUri;
39+
private final KeyRangeCache rangeCache;
40+
private final KeyRecipeCache recipeCache;
41+
private final ReadWriteLock lock = new ReentrantReadWriteLock();
42+
private long databaseId = 0L;
43+
private final ChannelFinderServerFactory serverFactory;
44+
45+
public ChannelFinder(
46+
ChannelFinderServerFactory serverFactory, String deployment, String databaseUri) {
47+
this.serverFactory = serverFactory;
48+
this.deployment = deployment;
49+
this.databaseUri = databaseUri;
50+
this.rangeCache = new KeyRangeCache(serverFactory);
51+
this.recipeCache = new KeyRecipeCache();
52+
}
53+
54+
/**
55+
* Updates the cache with new metadata.
56+
*
57+
* @param cacheUpdate The cache update information.
58+
*/
59+
public void update(CacheUpdate cacheUpdate) {
60+
lock.writeLock().lock();
61+
try {
62+
if (databaseId != cacheUpdate.getDatabaseId()) {
63+
System.out.println("DEBUG [BYPASS]: Database ID changed from " + databaseId
64+
+ " to " + cacheUpdate.getDatabaseId() + ", clearing caches");
65+
recipeCache.clear();
66+
rangeCache.clear();
67+
databaseId = cacheUpdate.getDatabaseId();
68+
}
69+
recipeCache.addRecipes(cacheUpdate.getKeyRecipes());
70+
rangeCache.addRanges(cacheUpdate);
71+
System.out.println("DEBUG [BYPASS]: Cache updated. Current state:\n" + rangeCache.debugString());
72+
} finally {
73+
lock.writeLock().unlock();
74+
}
75+
}
76+
77+
/**
78+
* Finds the server for a given ReadRequest.
79+
*
80+
* @param reqBuilder The ReadRequest builder.
81+
* @return The server to route the request to, or null if an error occurs.
82+
*/
83+
@Nullable
84+
public ChannelFinderServer findServer(ReadRequest.Builder reqBuilder) {
85+
RoutingHint.Builder hintBuilder = reqBuilder.getRoutingHintBuilder();
86+
lock.readLock().lock();
87+
try {
88+
if (databaseId != 0) {
89+
hintBuilder.setDatabaseId(databaseId);
90+
}
91+
System.out.println("DEBUG [BYPASS]: findServer - computing keys for table: "
92+
+ reqBuilder.getTable());
93+
recipeCache.computeKeys(reqBuilder); // Modifies hintBuilder within reqBuilder
94+
System.out.println("DEBUG [BYPASS]: findServer - after computeKeys, key: "
95+
+ hintBuilder.getKey().toStringUtf8());
96+
ChannelFinderServer server = rangeCache.fillRoutingInfo(
97+
reqBuilder.getSession(), false, hintBuilder);
98+
System.out.println("DEBUG [BYPASS]: findServer - fillRoutingInfo returned server: "
99+
+ (server != null ? server.getAddress() : "null"));
100+
return server;
101+
} finally {
102+
lock.readLock().unlock();
103+
}
104+
}
105+
106+
/**
107+
* Returns a debug string representation of the cache.
108+
*
109+
* @return A string containing debug information.
110+
*/
111+
public String debugString() {
112+
lock.readLock().lock();
113+
try {
114+
return rangeCache.debugString();
115+
} finally {
116+
lock.readLock().unlock();
117+
}
118+
}
119+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.google.cloud.spanner.spi.v1;
218

319
import io.grpc.ManagedChannel;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.google.cloud.spanner.spi.v1;
218

319
public interface ChannelFinderServerFactory {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.google.api.gax.rpc.UnavailableException;
5858
import com.google.api.gax.rpc.WatchdogProvider;
5959
import com.google.api.pathtemplate.PathTemplate;
60+
import com.google.auth.Credentials;
6061
import com.google.cloud.RetryHelper;
6162
import com.google.cloud.RetryHelper.RetryHelperException;
6263
import com.google.cloud.grpc.GcpManagedChannel;
@@ -209,12 +210,15 @@
209210
import java.util.concurrent.ConcurrentLinkedDeque;
210211
import java.util.concurrent.ConcurrentMap;
211212
import java.util.concurrent.ExecutionException;
213+
import java.util.concurrent.Executor;
212214
import java.util.concurrent.ExecutorService;
213215
import java.util.concurrent.Executors;
214216
import java.util.concurrent.Future;
215217
import java.util.concurrent.ScheduledExecutorService;
216218
import java.util.concurrent.TimeUnit;
217219
import java.util.stream.Collectors;
220+
import java.util.logging.Level;
221+
import java.util.logging.Logger;
218222
import java.util.stream.Stream;
219223
import javax.annotation.Nullable;
220224

@@ -223,6 +227,7 @@
223227
public class GapicSpannerRpc implements SpannerRpc {
224228
private static final PathTemplate PROJECT_NAME_TEMPLATE =
225229
PathTemplate.create("projects/{project}");
230+
private static final Logger logger = Logger.getLogger(GapicSpannerRpc.class.getName());
226231
private static final PathTemplate OPERATION_NAME_TEMPLATE =
227232
PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}");
228233
private static final int MAX_MESSAGE_SIZE = 256 * 1024 * 1024;
@@ -285,6 +290,89 @@ public class GapicSpannerRpc implements SpannerRpc {
285290

286291
private final GrpcCallContext baseGrpcCallContext;
287292

293+
private static class KeyAwareTransportChannelProvider implements TransportChannelProvider {
294+
private final InstantiatingGrpcChannelProvider.Builder delegateBuilder;
295+
private final TransportChannelProvider delegate;
296+
297+
public KeyAwareTransportChannelProvider(
298+
InstantiatingGrpcChannelProvider.Builder delegateBuilder) {
299+
this.delegateBuilder = delegateBuilder;
300+
this.delegate = delegateBuilder.build();
301+
}
302+
303+
@Override
304+
public GrpcTransportChannel getTransportChannel() throws IOException {
305+
return GrpcTransportChannel.newBuilder()
306+
.setManagedChannel(KeyAwareChannel.create(delegateBuilder))
307+
.build();
308+
}
309+
310+
@Override
311+
public String getTransportName() {
312+
return delegate.getTransportName();
313+
}
314+
315+
@Override
316+
public boolean needsEndpoint() {
317+
return delegate.needsEndpoint();
318+
}
319+
320+
@Override
321+
public boolean needsCredentials() {
322+
return delegate.needsCredentials();
323+
}
324+
325+
@Override
326+
public boolean needsExecutor() {
327+
return delegate.needsExecutor();
328+
}
329+
330+
@Override
331+
public boolean needsHeaders() {
332+
return delegate.needsHeaders();
333+
}
334+
335+
@Override
336+
public boolean shouldAutoClose() {
337+
return delegate.shouldAutoClose();
338+
}
339+
340+
@Override
341+
public TransportChannelProvider withEndpoint(String endpoint) {
342+
return delegate.withEndpoint(endpoint);
343+
}
344+
345+
@Override
346+
public TransportChannelProvider withCredentials(Credentials credentials) {
347+
return delegate.withCredentials(credentials);
348+
}
349+
350+
@Override
351+
public TransportChannelProvider withHeaders(java.util.Map<String, String> headers) {
352+
return delegate.withHeaders(headers);
353+
}
354+
355+
@Override
356+
public TransportChannelProvider withPoolSize(int poolSize) {
357+
return delegate.withPoolSize(poolSize);
358+
}
359+
360+
@Override
361+
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
362+
return delegate.withExecutor(executor);
363+
}
364+
365+
@Override
366+
public TransportChannelProvider withExecutor(Executor executor) {
367+
return delegate.withExecutor(executor);
368+
}
369+
370+
@Override
371+
public boolean acceptsPoolSize() {
372+
return delegate.acceptsPoolSize();
373+
}
374+
}
375+
288376
public static GapicSpannerRpc create(SpannerOptions options) {
289377
return new GapicSpannerRpc(options);
290378
}
@@ -393,9 +481,33 @@ public GapicSpannerRpc(final SpannerOptions options) {
393481
// If it is enabled in options uses the channel pool provided by the gRPC-GCP extension.
394482
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);
395483

396-
TransportChannelProvider channelProvider =
397-
MoreObjects.firstNonNull(
398-
options.getChannelProvider(), defaultChannelProviderBuilder.build());
484+
TransportChannelProvider channelProvider;
485+
// Enable KeyAwareChannel (SpanFE bypass / location API) only when BOTH conditions are met:
486+
// 1. Using experimental host (setExperimentalHost was called)
487+
// 2. GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API env var is set to "true"
488+
// Default is DISABLED even for experimental host.
489+
String locationApiEnvVar = System.getenv("GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API");
490+
boolean isExperimentalHost = options.isExperimentalHost();
491+
boolean envVarEnabled = "true".equalsIgnoreCase(locationApiEnvVar);
492+
493+
// Both conditions must be true to enable bypass
494+
boolean enableLocationApi = isExperimentalHost && envVarEnabled;
495+
496+
logger.log(
497+
Level.INFO,
498+
"SpanFE bypass (KeyAwareChannel) configuration: "
499+
+ "GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API={0}, "
500+
+ "isExperimentalHost={1}, "
501+
+ "enableLocationApi={2}",
502+
new Object[] {locationApiEnvVar, isExperimentalHost, enableLocationApi});
503+
504+
if (enableLocationApi) {
505+
channelProvider = new KeyAwareTransportChannelProvider(defaultChannelProviderBuilder);
506+
} else {
507+
channelProvider =
508+
MoreObjects.firstNonNull(
509+
options.getChannelProvider(), defaultChannelProviderBuilder.build());
510+
}
399511

400512
CredentialsProvider credentialsProvider =
401513
GrpcTransportOptions.setUpCredentialsProvider(options);

0 commit comments

Comments
 (0)