1616
1717package com .google .cloud .spanner .spi .v1 ;
1818
19+ import com .google .api .core .InternalApi ;
1920import com .google .api .gax .grpc .GrpcTransportChannel ;
2021import com .google .api .gax .grpc .InstantiatingGrpcChannelProvider ;
22+ import com .google .api .gax .rpc .TransportChannelProvider ;
23+ import com .google .cloud .spanner .ErrorCode ;
24+ import com .google .cloud .spanner .SpannerExceptionFactory ;
25+ import com .google .common .annotations .VisibleForTesting ;
26+ import io .grpc .ConnectivityState ;
2127import io .grpc .ManagedChannel ;
2228import java .io .IOException ;
2329import java .util .Map ;
2430import java .util .concurrent .ConcurrentHashMap ;
31+ import java .util .concurrent .TimeUnit ;
2532
33+ /**
34+ * gRPC implementation of {@link ChannelFinderServerFactory}.
35+ *
36+ * <p>This factory creates and caches gRPC channels per address. It uses {@link
37+ * InstantiatingGrpcChannelProvider#withEndpoint(String)} to create new channels with the same
38+ * configuration but different endpoints, avoiding race conditions.
39+ */
40+ @ InternalApi
2641class GrpcChannelFinderServerFactory implements ChannelFinderServerFactory {
27- private final InstantiatingGrpcChannelProvider .Builder channelBuilder ;
42+
43+ /** Timeout for graceful channel shutdown. */
44+ private static final long SHUTDOWN_TIMEOUT_SECONDS = 5 ;
45+
46+ private final InstantiatingGrpcChannelProvider baseProvider ;
2847 private final Map <String , GrpcChannelFinderServer > servers = new ConcurrentHashMap <>();
2948 private final GrpcChannelFinderServer defaultServer ;
49+ private volatile boolean isShutdown = false ;
3050
31- public GrpcChannelFinderServerFactory (InstantiatingGrpcChannelProvider .Builder channelBuilder )
51+ /**
52+ * Creates a new factory with the given channel provider.
53+ *
54+ * @param channelProvider the base provider used to create channels. New channels for different
55+ * endpoints are created using {@link InstantiatingGrpcChannelProvider#withEndpoint(String)}.
56+ * @throws IOException if the default channel cannot be created
57+ */
58+ public GrpcChannelFinderServerFactory (InstantiatingGrpcChannelProvider channelProvider )
3259 throws IOException {
33- this .channelBuilder = channelBuilder ;
34- // The "default" server will use the original endpoint from the builder.
35- this .defaultServer =
36- new GrpcChannelFinderServer (this .channelBuilder .getEndpoint (), channelBuilder .build ());
37- this .servers .put (this .defaultServer .getAddress (), this .defaultServer );
60+ this .baseProvider = channelProvider ;
61+ String defaultEndpoint = channelProvider .getEndpoint ();
62+ this .defaultServer = new GrpcChannelFinderServer (defaultEndpoint , channelProvider );
63+ this .servers .put (defaultEndpoint , this .defaultServer );
3864 }
3965
4066 @ Override
@@ -44,37 +70,95 @@ public ChannelFinderServer defaultServer() {
4470
4571 @ Override
4672 public ChannelFinderServer create (String address ) {
73+ if (isShutdown ) {
74+ throw SpannerExceptionFactory .newSpannerException (
75+ ErrorCode .FAILED_PRECONDITION , "ChannelFinderServerFactory has been shut down" );
76+ }
77+
4778 return servers .computeIfAbsent (
4879 address ,
4980 addr -> {
5081 try {
51- // Modify the builder to use the new address
52- synchronized (channelBuilder ) {
53- InstantiatingGrpcChannelProvider .Builder newBuilder =
54- channelBuilder .setEndpoint (addr );
55- return new GrpcChannelFinderServer (addr , newBuilder .build ());
56- }
82+ // Create a new provider with the same config but different endpoint.
83+ // This is thread-safe as withEndpoint() returns a new provider instance.
84+ TransportChannelProvider newProvider = baseProvider .withEndpoint (addr );
85+ return new GrpcChannelFinderServer (addr , newProvider );
5786 } catch (IOException e ) {
58- throw new RuntimeException ("Failed to create channel for address: " + addr , e );
87+ throw SpannerExceptionFactory .newSpannerException (
88+ ErrorCode .INTERNAL , "Failed to create channel for address: " + addr , e );
5989 }
6090 });
6191 }
6292
93+ @ Override
94+ public void evict (String address ) {
95+ if (defaultServer .getAddress ().equals (address )) {
96+ return ;
97+ }
98+ GrpcChannelFinderServer server = servers .remove (address );
99+ if (server != null ) {
100+ shutdownServerGracefully (server );
101+ }
102+ }
103+
104+ @ Override
105+ public void shutdown () {
106+ isShutdown = true ;
107+ for (GrpcChannelFinderServer server : servers .values ()) {
108+ shutdownServerGracefully (server );
109+ }
110+ servers .clear ();
111+ }
112+
113+ /**
114+ * Gracefully shuts down a server's channel.
115+ *
116+ * <p>First attempts a graceful shutdown, waiting for in-flight RPCs to complete. If the timeout
117+ * is exceeded, forces immediate shutdown.
118+ */
119+ private void shutdownServerGracefully (GrpcChannelFinderServer server ) {
120+ ManagedChannel channel = server .getChannel ();
121+ if (channel .isShutdown ()) {
122+ return ;
123+ }
124+
125+ channel .shutdown ();
126+ try {
127+ if (!channel .awaitTermination (SHUTDOWN_TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
128+ channel .shutdownNow ();
129+ }
130+ } catch (InterruptedException e ) {
131+ channel .shutdownNow ();
132+ Thread .currentThread ().interrupt ();
133+ }
134+ }
135+
136+ /** gRPC implementation of {@link ChannelFinderServer}. */
63137 static class GrpcChannelFinderServer implements ChannelFinderServer {
64138 private final String address ;
65139 private final ManagedChannel channel ;
66140
67- public GrpcChannelFinderServer (String address , InstantiatingGrpcChannelProvider provider )
68- throws IOException {
141+ /**
142+ * Creates a server from a channel provider.
143+ *
144+ * @param address the server address
145+ * @param provider the channel provider (must be a gRPC provider)
146+ * @throws IOException if the channel cannot be created
147+ */
148+ GrpcChannelFinderServer (String address , TransportChannelProvider provider ) throws IOException {
69149 this .address = address ;
70- // It's assumed that getTransportChannel() returns a ManagedChannel or can be cast to one.
71- // For this example, GrpcTransportChannel is used as in KeyAwareChannel.
72150 GrpcTransportChannel transportChannel = (GrpcTransportChannel ) provider .getTransportChannel ();
73151 this .channel = (ManagedChannel ) transportChannel .getChannel ();
74152 }
75153
76- // Constructor for the default server that already has a channel
77- public GrpcChannelFinderServer (String address , ManagedChannel channel ) {
154+ /**
155+ * Creates a server with an existing channel. Primarily for testing.
156+ *
157+ * @param address the server address
158+ * @param channel the managed channel
159+ */
160+ @ VisibleForTesting
161+ GrpcChannelFinderServer (String address , ManagedChannel channel ) {
78162 this .address = address ;
79163 this .channel = channel ;
80164 }
@@ -86,8 +170,12 @@ public String getAddress() {
86170
87171 @ Override
88172 public boolean isHealthy () {
89- // A simple health check. In a real scenario, this might involve a ping or other checks.
90- return !channel .isShutdown () && !channel .isTerminated ();
173+ if (channel .isShutdown () || channel .isTerminated ()) {
174+ return false ;
175+ }
176+ // Check connectivity state without triggering a connection attempt
177+ ConnectivityState state = channel .getState (false );
178+ return state != ConnectivityState .SHUTDOWN && state != ConnectivityState .TRANSIENT_FAILURE ;
91179 }
92180
93181 @ Override
0 commit comments