1818
1919import com .google .api .core .InternalApi ;
2020import com .google .api .gax .grpc .InstantiatingGrpcChannelProvider ;
21+ import com .google .common .cache .Cache ;
22+ import com .google .common .cache .CacheBuilder ;
2123import com .google .protobuf .ByteString ;
2224import com .google .spanner .v1 .BeginTransactionRequest ;
2325import com .google .spanner .v1 .CommitRequest ;
5153 */
5254@ InternalApi
5355final class KeyAwareChannel extends ManagedChannel {
56+ private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L ;
5457 private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead" ;
5558 private static final String STREAMING_SQL_METHOD =
5659 "google.spanner.v1.Spanner/ExecuteStreamingSql" ;
@@ -67,6 +70,11 @@ final class KeyAwareChannel extends ManagedChannel {
6770 private final Map <String , SoftReference <ChannelFinder >> channelFinders =
6871 new ConcurrentHashMap <>();
6972 private final Map <ByteString , String > transactionAffinities = new ConcurrentHashMap <>();
73+ // Maps read-only transaction IDs to their preferLeader value.
74+ // Strong reads → true (prefer leader), Stale reads → false (any replica).
75+ // Bounded to prevent unbounded growth if application code does not close read-only transactions.
76+ private final Cache <ByteString , Boolean > readOnlyTxPreferLeader =
77+ CacheBuilder .newBuilder ().maximumSize (MAX_TRACKED_READ_ONLY_TRANSACTIONS ).build ();
7078
7179 private KeyAwareChannel (
7280 InstantiatingGrpcChannelProvider channelProvider ,
@@ -184,12 +192,34 @@ private void clearAffinity(ByteString transactionId) {
184192 return ;
185193 }
186194 transactionAffinities .remove (transactionId );
195+ readOnlyTxPreferLeader .invalidate (transactionId );
187196 }
188197
189198 void clearTransactionAffinity (ByteString transactionId ) {
190199 clearAffinity (transactionId );
191200 }
192201
202+ private boolean isReadOnlyTransaction (ByteString transactionId ) {
203+ return transactionId != null
204+ && !transactionId .isEmpty ()
205+ && readOnlyTxPreferLeader .getIfPresent (transactionId ) != null ;
206+ }
207+
208+ @ Nullable
209+ private Boolean readOnlyPreferLeader (ByteString transactionId ) {
210+ if (transactionId == null || transactionId .isEmpty ()) {
211+ return null ;
212+ }
213+ return readOnlyTxPreferLeader .getIfPresent (transactionId );
214+ }
215+
216+ private void trackReadOnlyTransaction (ByteString transactionId , boolean preferLeader ) {
217+ if (transactionId == null || transactionId .isEmpty ()) {
218+ return ;
219+ }
220+ readOnlyTxPreferLeader .put (transactionId , preferLeader );
221+ }
222+
193223 private void recordAffinity (
194224 ByteString transactionId , @ Nullable ChannelEndpoint endpoint , boolean allowDefault ) {
195225 if (transactionId == null || transactionId .isEmpty () || endpoint == null ) {
@@ -250,6 +280,8 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
250280 @ Nullable private Boolean pendingMessageCompression ;
251281 @ Nullable private io .grpc .Status cancelledStatus ;
252282 @ Nullable private Metadata cancelledTrailers ;
283+ private boolean isReadOnlyBegin ;
284+ private boolean readOnlyIsStrong ;
253285 private final Object lock = new Object ();
254286
255287 KeyAwareClientCall (
@@ -307,12 +339,14 @@ public void sendMessage(RequestT message) {
307339
308340 if (message instanceof ReadRequest ) {
309341 ReadRequest .Builder reqBuilder = ((ReadRequest ) message ).toBuilder ();
342+ maybeTrackReadOnlyBegin (reqBuilder .getTransaction ());
310343 RoutingDecision routing = routeFromRequest (reqBuilder );
311344 finder = routing .finder ;
312345 endpoint = routing .endpoint ;
313346 message = (RequestT ) reqBuilder .build ();
314347 } else if (message instanceof ExecuteSqlRequest ) {
315348 ExecuteSqlRequest .Builder reqBuilder = ((ExecuteSqlRequest ) message ).toBuilder ();
349+ maybeTrackReadOnlyBegin (reqBuilder .getTransaction ());
316350 RoutingDecision routing = routeFromRequest (reqBuilder );
317351 finder = routing .finder ;
318352 endpoint = routing .endpoint ;
@@ -325,7 +359,12 @@ public void sendMessage(RequestT message) {
325359 finder = parentChannel .getOrCreateChannelFinder (databaseId );
326360 endpoint = finder .findServer (reqBuilder );
327361 }
328- allowDefaultAffinity = true ;
362+ if (reqBuilder .hasOptions () && reqBuilder .getOptions ().hasReadOnly ()) {
363+ isReadOnlyBegin = true ;
364+ readOnlyIsStrong = reqBuilder .getOptions ().getReadOnly ().getStrong ();
365+ } else {
366+ allowDefaultAffinity = true ;
367+ }
329368 message = (RequestT ) reqBuilder .build ();
330369 } else if (message instanceof CommitRequest ) {
331370 CommitRequest request = (CommitRequest ) message ;
@@ -483,32 +522,52 @@ void maybeClearAffinity() {
483522 parentChannel .clearAffinity (transactionIdToClear );
484523 }
485524
525+ private void maybeTrackReadOnlyBegin (TransactionSelector selector ) {
526+ if (selector .getSelectorCase () == TransactionSelector .SelectorCase .BEGIN
527+ && selector .getBegin ().hasReadOnly ()) {
528+ isReadOnlyBegin = true ;
529+ readOnlyIsStrong = selector .getBegin ().getReadOnly ().getStrong ();
530+ }
531+ }
532+
486533 private RoutingDecision routeFromRequest (ReadRequest .Builder reqBuilder ) {
487534 String databaseId = parentChannel .extractDatabaseIdFromSession (reqBuilder .getSession ());
488535 ByteString transactionId = transactionIdFromSelector (reqBuilder .getTransaction ());
489- ChannelEndpoint endpoint = parentChannel .affinityEndpoint (transactionId );
536+ // Skip affinity for read-only transactions so each read routes independently.
537+ boolean isReadOnly = parentChannel .isReadOnlyTransaction (transactionId );
538+ ChannelEndpoint endpoint = isReadOnly ? null : parentChannel .affinityEndpoint (transactionId );
490539 ChannelFinder finder = null ;
491540 if (databaseId != null ) {
492541 finder = parentChannel .getOrCreateChannelFinder (databaseId );
493- ChannelEndpoint routed = finder .findServer (reqBuilder );
494- if (endpoint == null ) {
495- endpoint = routed ;
496- }
542+ }
543+ if (databaseId != null && endpoint == null ) {
544+ Boolean preferLeaderOverride = parentChannel .readOnlyPreferLeader (transactionId );
545+ ChannelEndpoint routed =
546+ preferLeaderOverride != null
547+ ? finder .findServer (reqBuilder , preferLeaderOverride )
548+ : finder .findServer (reqBuilder );
549+ endpoint = routed ;
497550 }
498551 return new RoutingDecision (finder , endpoint );
499552 }
500553
501554 private RoutingDecision routeFromRequest (ExecuteSqlRequest .Builder reqBuilder ) {
502555 String databaseId = parentChannel .extractDatabaseIdFromSession (reqBuilder .getSession ());
503556 ByteString transactionId = transactionIdFromSelector (reqBuilder .getTransaction ());
504- ChannelEndpoint endpoint = parentChannel .affinityEndpoint (transactionId );
557+ // Skip affinity for read-only transactions so each query routes independently.
558+ boolean isReadOnly = parentChannel .isReadOnlyTransaction (transactionId );
559+ ChannelEndpoint endpoint = isReadOnly ? null : parentChannel .affinityEndpoint (transactionId );
505560 ChannelFinder finder = null ;
506561 if (databaseId != null ) {
507562 finder = parentChannel .getOrCreateChannelFinder (databaseId );
508- ChannelEndpoint routed = finder .findServer (reqBuilder );
509- if (endpoint == null ) {
510- endpoint = routed ;
511- }
563+ }
564+ if (databaseId != null && endpoint == null ) {
565+ Boolean preferLeaderOverride = parentChannel .readOnlyPreferLeader (transactionId );
566+ ChannelEndpoint routed =
567+ preferLeaderOverride != null
568+ ? finder .findServer (reqBuilder , preferLeaderOverride )
569+ : finder .findServer (reqBuilder );
570+ endpoint = routed ;
512571 }
513572 return new RoutingDecision (finder , endpoint );
514573 }
@@ -554,7 +613,13 @@ public void onMessage(ResponseT message) {
554613 transactionId = transactionIdFromTransaction (response );
555614 }
556615 if (transactionId != null ) {
557- call .maybeRecordAffinity (transactionId );
616+ if (call .isReadOnlyBegin ) {
617+ // Track the read-only transaction so subsequent reads skip affinity
618+ // and route independently based on key-based routing.
619+ call .parentChannel .trackReadOnlyTransaction (transactionId , call .readOnlyIsStrong );
620+ } else if (!call .parentChannel .isReadOnlyTransaction (transactionId )) {
621+ call .maybeRecordAffinity (transactionId );
622+ }
558623 }
559624 super .onMessage (message );
560625 }
0 commit comments