Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 0a4d906

Browse files
committed
all rpcs on explicit begin started transactions should use same server/channel conncetion
1 parent a487b8e commit 0a4d906

4 files changed

Lines changed: 59 additions & 4 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,15 @@ ByteString getTransactionId() {
404404
}
405405
}
406406

407+
@Override
408+
public void close() {
409+
ByteString id = getTransactionId();
410+
if (id != null && !id.isEmpty()) {
411+
rpc.clearTransactionAffinity(id);
412+
}
413+
super.close();
414+
}
415+
407416
/**
408417
* Initializes the transaction with the timestamp specified within MultiUseReadOnlyTransaction.
409418
* This is used only for fallback of PartitionQueryRequest and PartitionReadRequest with

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.google.api.gax.rpc.StatusCode;
5252
import com.google.api.gax.rpc.StatusCode.Code;
5353
import com.google.api.gax.rpc.StreamController;
54+
import com.google.api.gax.rpc.TransportChannel;
5455
import com.google.api.gax.rpc.TransportChannelProvider;
5556
import com.google.api.gax.rpc.UnaryCallSettings;
5657
import com.google.api.gax.rpc.UnaryCallable;
@@ -105,6 +106,7 @@
105106
import com.google.longrunning.GetOperationRequest;
106107
import com.google.longrunning.Operation;
107108
import com.google.longrunning.OperationsGrpc;
109+
import com.google.protobuf.ByteString;
108110
import com.google.protobuf.Empty;
109111
import com.google.protobuf.FieldMask;
110112
import com.google.protobuf.InvalidProtocolBufferException;
@@ -187,6 +189,7 @@
187189
import com.google.spanner.v1.Transaction;
188190
import io.grpc.CallCredentials;
189191
import io.grpc.Context;
192+
import io.grpc.ManagedChannel;
190193
import io.grpc.ManagedChannelBuilder;
191194
import io.grpc.MethodDescriptor;
192195
import java.io.IOException;
@@ -286,6 +289,7 @@ public class GapicSpannerRpc implements SpannerRpc {
286289
private final int numChannels;
287290
private final boolean isGrpcGcpExtensionEnabled;
288291
private final boolean isDynamicChannelPoolEnabled;
292+
@Nullable private final KeyAwareChannel keyAwareChannel;
289293

290294
private final GrpcCallContext baseGrpcCallContext;
291295

@@ -539,6 +543,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
539543
/* isAdminClient= */ false, isEmulatorEnabled(options, emulatorHost)))
540544
.build();
541545
ClientContext clientContext = ClientContext.create(spannerStubSettings);
546+
this.keyAwareChannel = extractKeyAwareChannel(clientContext.getTransportChannel());
542547
this.spannerStub =
543548
GrpcSpannerStubWithStubSettingsAndClientContext.create(
544549
spannerStubSettings, clientContext);
@@ -658,6 +663,7 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
658663
throw asSpannerException(e);
659664
}
660665
} else {
666+
this.keyAwareChannel = null;
661667
this.databaseAdminStub = null;
662668
this.instanceAdminStub = null;
663669
this.spannerStub = null;
@@ -675,6 +681,23 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
675681
}
676682
}
677683

684+
private static KeyAwareChannel extractKeyAwareChannel(TransportChannel transportChannel) {
685+
if (transportChannel instanceof GrpcTransportChannel) {
686+
ManagedChannel channel = ((GrpcTransportChannel) transportChannel).getChannel();
687+
if (channel instanceof KeyAwareChannel) {
688+
return (KeyAwareChannel) channel;
689+
}
690+
}
691+
return null;
692+
}
693+
694+
@Override
695+
public void clearTransactionAffinity(ByteString transactionId) {
696+
if (keyAwareChannel != null) {
697+
keyAwareChannel.clearTransactionAffinity(transactionId);
698+
}
699+
}
700+
678701
private static String parseGrpcGcpApiConfig() {
679702
try {
680703
return Resources.toString(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.api.core.InternalApi;
2020
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
2121
import com.google.protobuf.ByteString;
22+
import com.google.spanner.v1.BeginTransactionRequest;
2223
import com.google.spanner.v1.CommitRequest;
2324
import com.google.spanner.v1.ExecuteSqlRequest;
2425
import com.google.spanner.v1.PartialResultSet;
@@ -52,6 +53,8 @@ final class KeyAwareChannel extends ManagedChannel {
5253
private static final String STREAMING_SQL_METHOD =
5354
"google.spanner.v1.Spanner/ExecuteStreamingSql";
5455
private static final String UNARY_SQL_METHOD = "google.spanner.v1.Spanner/ExecuteSql";
56+
private static final String BEGIN_TRANSACTION_METHOD =
57+
"google.spanner.v1.Spanner/BeginTransaction";
5558
private static final String COMMIT_METHOD = "google.spanner.v1.Spanner/Commit";
5659
private static final String ROLLBACK_METHOD = "google.spanner.v1.Spanner/Rollback";
5760

@@ -162,6 +165,7 @@ private static boolean isKeyAware(MethodDescriptor<?, ?> methodDescriptor) {
162165
return STREAMING_READ_METHOD.equals(method)
163166
|| STREAMING_SQL_METHOD.equals(method)
164167
|| UNARY_SQL_METHOD.equals(method)
168+
|| BEGIN_TRANSACTION_METHOD.equals(method)
165169
|| COMMIT_METHOD.equals(method)
166170
|| ROLLBACK_METHOD.equals(method);
167171
}
@@ -185,12 +189,17 @@ private void clearAffinity(ByteString transactionId) {
185189
transactionAffinities.remove(transactionId);
186190
}
187191

188-
private void recordAffinity(ByteString transactionId, @Nullable ChannelEndpoint endpoint) {
192+
void clearTransactionAffinity(ByteString transactionId) {
193+
clearAffinity(transactionId);
194+
}
195+
196+
private void recordAffinity(
197+
ByteString transactionId, @Nullable ChannelEndpoint endpoint, boolean allowDefault) {
189198
if (transactionId == null || transactionId.isEmpty() || endpoint == null) {
190199
return;
191200
}
192201
String address = endpoint.getAddress();
193-
if (defaultEndpointAddress.equals(address)) {
202+
if (!allowDefault && defaultEndpointAddress.equals(address)) {
194203
return;
195204
}
196205
transactionAffinities.put(transactionId, address);
@@ -238,6 +247,7 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
238247
private ChannelFinder channelFinder;
239248
@Nullable private ChannelEndpoint selectedEndpoint;
240249
@Nullable private ByteString transactionIdToClear;
250+
private boolean allowDefaultAffinity;
241251

242252
KeyAwareClientCall(
243253
KeyAwareChannel parentChannel,
@@ -295,6 +305,8 @@ public void sendMessage(RequestT message) {
295305
}
296306
}
297307
message = (RequestT) reqBuilder.build();
308+
} else if (message instanceof BeginTransactionRequest) {
309+
allowDefaultAffinity = true;
298310
} else if (message instanceof CommitRequest) {
299311
CommitRequest request = (CommitRequest) message;
300312
if (!request.getTransactionId().isEmpty()) {
@@ -309,7 +321,8 @@ public void sendMessage(RequestT message) {
309321
}
310322
} else {
311323
throw new IllegalStateException(
312-
"Only read, query, commit, and rollback requests are supported for key-aware calls.");
324+
"Only read, query, begin transaction, commit, and rollback requests are supported for"
325+
+ " key-aware calls.");
313326
}
314327

315328
if (endpoint == null) {
@@ -343,7 +356,7 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
343356
}
344357

345358
void maybeRecordAffinity(ByteString transactionId) {
346-
parentChannel.recordAffinity(transactionId, selectedEndpoint);
359+
parentChannel.recordAffinity(transactionId, selectedEndpoint, allowDefaultAffinity);
347360
}
348361

349362
void maybeClearAffinity() {
@@ -378,6 +391,12 @@ public void onMessage(ResponseT message) {
378391
if (transactionId != null) {
379392
call.maybeRecordAffinity(transactionId);
380393
}
394+
} else if (message instanceof Transaction) {
395+
Transaction response = (Transaction) message;
396+
ByteString transactionId = transactionIdFromTransaction(response);
397+
if (transactionId != null) {
398+
call.maybeRecordAffinity(transactionId);
399+
}
381400
}
382401
super.onMessage(message);
383402
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.google.iam.v1.Policy;
4040
import com.google.iam.v1.TestIamPermissionsResponse;
4141
import com.google.longrunning.Operation;
42+
import com.google.protobuf.ByteString;
4243
import com.google.protobuf.Empty;
4344
import com.google.protobuf.FieldMask;
4445
import com.google.spanner.admin.database.v1.Backup;
@@ -194,6 +195,9 @@ default RequestIdCreator getRequestIdCreator() {
194195
throw new UnsupportedOperationException("Not implemented");
195196
}
196197

198+
/** Clears any client-side affinity associated with the given transaction id. */
199+
default void clearTransactionAffinity(ByteString transactionId) {}
200+
197201
// Instance admin APIs.
198202
Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable String pageToken)
199203
throws SpannerException;

0 commit comments

Comments
 (0)