Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit e836a5d

Browse files
committed
incorporate suggestions
1 parent 3c99d2a commit e836a5d

4 files changed

Lines changed: 129 additions & 70 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -94,40 +94,36 @@ public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder, boolean
9494
}
9595

9696
public ChannelEndpoint findServer(BeginTransactionRequest.Builder reqBuilder) {
97-
if (!reqBuilder.hasMutationKey()
98-
|| !recipeCache.computeKeys(
99-
reqBuilder.getMutationKey(), reqBuilder.getRoutingHintBuilder())) {
97+
if (!reqBuilder.hasMutationKey()) {
10098
return null;
10199
}
102-
return fillRoutingHint(
100+
return routeMutation(
101+
reqBuilder.getMutationKey(),
103102
preferLeader(reqBuilder.getOptions()),
104-
KeyRangeCache.RangeMode.COVERING_SPLIT,
105-
DirectedReadOptions.getDefaultInstance(),
106103
reqBuilder.getRoutingHintBuilder());
107104
}
108105

109-
public void fillRoutingHint(CommitRequest.Builder reqBuilder) {
106+
public ChannelEndpoint fillRoutingHint(CommitRequest.Builder reqBuilder) {
110107
if (reqBuilder.getMutationsCount() == 0) {
111-
return;
108+
return null;
112109
}
113110
Mutation mutation = reqBuilder.getMutations(0);
114-
if (!recipeCache.computeKeys(mutation, reqBuilder.getRoutingHintBuilder())) {
115-
return;
116-
}
117-
fillRoutingHint(
118-
/* preferLeader= */ true,
119-
KeyRangeCache.RangeMode.COVERING_SPLIT,
120-
DirectedReadOptions.getDefaultInstance(),
121-
reqBuilder.getRoutingHintBuilder());
111+
return routeMutation(mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder());
122112
}
123113

124-
private ChannelEndpoint fillRoutingHint(
125-
TransactionSelector transactionSelector,
126-
DirectedReadOptions directedReadOptions,
127-
KeyRangeCache.RangeMode rangeMode,
128-
RoutingHint.Builder hintBuilder) {
114+
private ChannelEndpoint routeMutation(
115+
Mutation mutation, boolean preferLeader, RoutingHint.Builder hintBuilder) {
116+
recipeCache.applySchemaGeneration(hintBuilder);
117+
TargetRange target = recipeCache.mutationToTargetRange(mutation);
118+
if (target == null) {
119+
return null;
120+
}
121+
recipeCache.applyTargetRange(hintBuilder, target);
129122
return fillRoutingHint(
130-
preferLeader(transactionSelector), rangeMode, directedReadOptions, hintBuilder);
123+
preferLeader,
124+
KeyRangeCache.RangeMode.COVERING_SPLIT,
125+
DirectedReadOptions.getDefaultInstance(),
126+
hintBuilder);
131127
}
132128

133129
private ChannelEndpoint fillRoutingHint(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -371,19 +371,24 @@ public void sendMessage(RequestT message) {
371371
}
372372
message = (RequestT) reqBuilder.build();
373373
} else if (message instanceof CommitRequest) {
374-
CommitRequest.Builder reqBuilder = ((CommitRequest) message).toBuilder();
375-
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
374+
CommitRequest request = (CommitRequest) message;
375+
String databaseId = parentChannel.extractDatabaseIdFromSession(request.getSession());
376376
if (databaseId != null) {
377377
finder = parentChannel.getOrCreateChannelFinder(databaseId);
378378
}
379-
if (finder != null) {
380-
finder.fillRoutingHint(reqBuilder);
379+
CommitRequest.Builder reqBuilder = null;
380+
if (finder != null && request.getMutationsCount() > 0) {
381+
reqBuilder = request.toBuilder();
382+
endpoint = finder.fillRoutingHint(reqBuilder);
383+
request = reqBuilder.build();
384+
}
385+
if (!request.getTransactionId().isEmpty()) {
386+
endpoint = parentChannel.affinityEndpoint(request.getTransactionId());
387+
transactionIdToClear = request.getTransactionId();
381388
}
382-
if (!reqBuilder.getTransactionId().isEmpty()) {
383-
endpoint = parentChannel.affinityEndpoint(reqBuilder.getTransactionId());
384-
transactionIdToClear = reqBuilder.getTransactionId();
389+
if (reqBuilder != null) {
390+
message = (RequestT) request;
385391
}
386-
message = (RequestT) reqBuilder.build();
387392
} else if (message instanceof RollbackRequest) {
388393
RollbackRequest request = (RollbackRequest) message;
389394
if (!request.getTransactionId().isEmpty()) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRecipeCache.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,7 @@ public void computeKeys(ReadRequest.Builder reqBuilder) {
158158
long reqFp = fingerprint(reqBuilder.buildPartial());
159159

160160
RoutingHint.Builder hintBuilder = reqBuilder.getRoutingHintBuilder();
161-
if (!schemaGeneration.isEmpty()) {
162-
hintBuilder.setSchemaGeneration(schemaGeneration);
163-
}
161+
applySchemaGeneration(hintBuilder);
164162

165163
PreparedRead preparedRead = getIfPresent(preparedReads, reqFp);
166164
if (preparedRead == null) {
@@ -186,10 +184,7 @@ public void computeKeys(ReadRequest.Builder reqBuilder) {
186184

187185
try {
188186
TargetRange target = recipe.keySetToTargetRange(reqBuilder.getKeySet());
189-
hintBuilder.setKey(target.start);
190-
if (!target.limit.isEmpty()) {
191-
hintBuilder.setLimitKey(target.limit);
192-
}
187+
applyTargetRange(hintBuilder, target);
193188
} catch (IllegalArgumentException e) {
194189
logger.fine("Failed key encoding: " + e.getMessage());
195190
}
@@ -199,9 +194,7 @@ public void computeKeys(ExecuteSqlRequest.Builder reqBuilder) {
199194
long reqFp = fingerprint(reqBuilder.buildPartial());
200195

201196
RoutingHint.Builder hintBuilder = reqBuilder.getRoutingHintBuilder();
202-
if (!schemaGeneration.isEmpty()) {
203-
hintBuilder.setSchemaGeneration(schemaGeneration);
204-
}
197+
applySchemaGeneration(hintBuilder);
205198

206199
PreparedQuery preparedQuery = getIfPresent(preparedQueries, reqFp);
207200
if (preparedQuery == null) {
@@ -221,30 +214,23 @@ public void computeKeys(ExecuteSqlRequest.Builder reqBuilder) {
221214

222215
try {
223216
TargetRange target = recipe.queryParamsToTargetRange(reqBuilder.getParams());
224-
hintBuilder.setKey(target.start);
225-
if (!target.limit.isEmpty()) {
226-
hintBuilder.setLimitKey(target.limit);
227-
}
217+
applyTargetRange(hintBuilder, target);
228218
} catch (IllegalArgumentException e) {
229219
logger.fine("Failed query param encoding: " + e.getMessage());
230220
}
231221
}
232222

233-
boolean computeKeys(Mutation mutation, RoutingHint.Builder hintBuilder) {
223+
void applySchemaGeneration(RoutingHint.Builder hintBuilder) {
234224
if (!schemaGeneration.isEmpty()) {
235225
hintBuilder.setSchemaGeneration(schemaGeneration);
236226
}
227+
}
237228

238-
TargetRange target = mutationToTargetRange(mutation);
239-
if (target == null) {
240-
return false;
241-
}
242-
229+
void applyTargetRange(RoutingHint.Builder hintBuilder, TargetRange target) {
243230
hintBuilder.setKey(target.start);
244231
if (!target.limit.isEmpty()) {
245232
hintBuilder.setLimitKey(target.limit);
246233
}
247-
return true;
248234
}
249235

250236
public TargetRange mutationToTargetRange(Mutation mutation) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java

Lines changed: 91 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package com.google.cloud.spanner.spi.v1;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNotNull;
2023
import static org.junit.Assert.assertThrows;
2124

2225
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
@@ -296,11 +299,11 @@ public void beginTransactionWithMutationKeyAddsRoutingHint() throws Exception {
296299
(RecordingClientCall<BeginTransactionRequest, Transaction>)
297300
harness.defaultManagedChannel.latestCall();
298301

299-
assertThat(beginDelegate.lastMessage).isNotNull();
300-
assertThat(beginDelegate.lastMessage.getRoutingHint().getDatabaseId()).isEqualTo(7L);
301-
assertThat(beginDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8())
302-
.isEqualTo("1");
303-
assertThat(beginDelegate.lastMessage.getRoutingHint().getKey().isEmpty()).isFalse();
302+
assertNotNull(beginDelegate.lastMessage);
303+
assertEquals(7L, beginDelegate.lastMessage.getRoutingHint().getDatabaseId());
304+
assertEquals(
305+
"1", beginDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8());
306+
assertFalse(beginDelegate.lastMessage.getRoutingHint().getKey().isEmpty());
304307
}
305308

306309
@Test
@@ -339,11 +342,66 @@ public void transactionCacheUpdateEnablesCommitRoutingHint() throws Exception {
339342
(RecordingClientCall<CommitRequest, CommitResponse>)
340343
harness.defaultManagedChannel.latestCall();
341344

342-
assertThat(commitDelegate.lastMessage).isNotNull();
343-
assertThat(commitDelegate.lastMessage.getRoutingHint().getDatabaseId()).isEqualTo(7L);
344-
assertThat(commitDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8())
345-
.isEqualTo("1");
346-
assertThat(commitDelegate.lastMessage.getRoutingHint().getKey().isEmpty()).isFalse();
345+
assertNotNull(commitDelegate.lastMessage);
346+
assertEquals(7L, commitDelegate.lastMessage.getRoutingHint().getDatabaseId());
347+
assertEquals(
348+
"1", commitDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8());
349+
assertFalse(commitDelegate.lastMessage.getRoutingHint().getKey().isEmpty());
350+
}
351+
352+
@Test
353+
public void singleUseCommitWithMutationsRoutesUsingRoutingHint() throws Exception {
354+
TestHarness harness = createHarness();
355+
seedCache(harness, createMutationRecipeCacheUpdate());
356+
357+
ClientCall<CommitRequest, CommitResponse> firstCommitCall =
358+
harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT);
359+
firstCommitCall.start(new CapturingListener<CommitResponse>(), new Metadata());
360+
firstCommitCall.sendMessage(
361+
CommitRequest.newBuilder()
362+
.setSession(SESSION)
363+
.setSingleUseTransaction(
364+
TransactionOptions.newBuilder()
365+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
366+
.addMutations(createInsertMutation("b"))
367+
.build());
368+
369+
@SuppressWarnings("unchecked")
370+
RecordingClientCall<CommitRequest, CommitResponse> firstCommitDelegate =
371+
(RecordingClientCall<CommitRequest, CommitResponse>)
372+
harness.defaultManagedChannel.latestCall();
373+
374+
assertNotNull(firstCommitDelegate.lastMessage);
375+
RoutingHint routingHint = firstCommitDelegate.lastMessage.getRoutingHint();
376+
assertFalse(routingHint.getKey().isEmpty());
377+
378+
seedCache(harness, createRangeCacheUpdateForHint(routingHint));
379+
380+
ClientCall<CommitRequest, CommitResponse> secondCommitCall =
381+
harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT);
382+
secondCommitCall.start(new CapturingListener<CommitResponse>(), new Metadata());
383+
secondCommitCall.sendMessage(
384+
CommitRequest.newBuilder()
385+
.setSession(SESSION)
386+
.setSingleUseTransaction(
387+
TransactionOptions.newBuilder()
388+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
389+
.addMutations(createInsertMutation("b"))
390+
.build());
391+
392+
assertThat(harness.endpointCache.callCountForAddress(DEFAULT_ADDRESS)).isEqualTo(3);
393+
assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1);
394+
395+
@SuppressWarnings("unchecked")
396+
RecordingClientCall<CommitRequest, CommitResponse> commitDelegate =
397+
(RecordingClientCall<CommitRequest, CommitResponse>)
398+
harness.endpointCache.latestCallForAddress("server-a:1234");
399+
400+
assertNotNull(commitDelegate.lastMessage);
401+
assertEquals(7L, commitDelegate.lastMessage.getRoutingHint().getDatabaseId());
402+
assertEquals(
403+
"1", commitDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8());
404+
assertFalse(commitDelegate.lastMessage.getRoutingHint().getKey().isEmpty());
347405
}
348406

349407
@Test
@@ -389,12 +447,11 @@ public void commitResponseCacheUpdateEnablesSubsequentBeginRoutingHint() throws
389447
(RecordingClientCall<BeginTransactionRequest, Transaction>)
390448
harness.defaultManagedChannel.latestCall();
391449

392-
assertThat(routedBeginDelegate.lastMessage).isNotNull();
393-
assertThat(routedBeginDelegate.lastMessage.getRoutingHint().getDatabaseId()).isEqualTo(7L);
394-
assertThat(
395-
routedBeginDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8())
396-
.isEqualTo("1");
397-
assertThat(routedBeginDelegate.lastMessage.getRoutingHint().getKey().isEmpty()).isFalse();
450+
assertNotNull(routedBeginDelegate.lastMessage);
451+
assertEquals(7L, routedBeginDelegate.lastMessage.getRoutingHint().getDatabaseId());
452+
assertEquals(
453+
"1", routedBeginDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8());
454+
assertFalse(routedBeginDelegate.lastMessage.getRoutingHint().getKey().isEmpty());
398455
}
399456

400457
@Test
@@ -759,6 +816,13 @@ private static CacheUpdate createTwoRangeCacheUpdate() {
759816
}
760817

761818
private static CacheUpdate createMutationRoutingCacheUpdate() throws TextFormat.ParseException {
819+
return createMutationRecipeCacheUpdate().toBuilder()
820+
.mergeFrom(
821+
createRangeCacheUpdateForHint(RoutingHint.newBuilder().setKey(bytes("a")).build()))
822+
.build();
823+
}
824+
825+
private static CacheUpdate createMutationRecipeCacheUpdate() throws TextFormat.ParseException {
762826
RecipeList keyRecipes =
763827
parseRecipeList(
764828
"schema_generation: \"1\"\n"
@@ -772,13 +836,21 @@ private static CacheUpdate createMutationRoutingCacheUpdate() throws TextFormat.
772836
+ " identifier: \"k\"\n"
773837
+ " }\n"
774838
+ "}\n");
839+
return CacheUpdate.newBuilder().setDatabaseId(7L).setKeyRecipes(keyRecipes).build();
840+
}
841+
842+
private static CacheUpdate createRangeCacheUpdateForHint(RoutingHint hint) {
843+
ByteString key = hint.getKey();
844+
ByteString limitKey =
845+
hint.getLimitKey().isEmpty()
846+
? key.concat(ByteString.copyFrom(new byte[] {0}))
847+
: hint.getLimitKey();
775848
return CacheUpdate.newBuilder()
776849
.setDatabaseId(7L)
777-
.setKeyRecipes(keyRecipes)
778850
.addRange(
779851
Range.newBuilder()
780-
.setStartKey(bytes("a"))
781-
.setLimitKey(bytes("m"))
852+
.setStartKey(key)
853+
.setLimitKey(limitKey)
782854
.setGroupUid(1L)
783855
.setSplitId(1L)
784856
.setGeneration(bytes("1")))

0 commit comments

Comments
 (0)