@@ -31,23 +31,38 @@ public class SharedClusterManager {
3131 // Shared ClusterEnvironment with optimized connection settings
3232 private static ClusterEnvironment sharedEnvironment ;
3333
34+ // Track whether environment has been shutdown
35+ private static volatile boolean environmentShutdown = false ;
36+
37+ private static final Object environmentLock = new Object ();
38+
3439 // Store cluster instances per server connection string
3540 private static ConcurrentHashMap <String , ClusterWrapper > clusterMap = new ConcurrentHashMap <>();
3641
37- // Initialize the shared environment once (lazy initialization)
42+ // Initialize the shared environment (lazy initialization with recreation )
3843 private static void initializeSharedEnvironment () {
39- if (sharedEnvironment == null ) {
40- try {
41- sharedEnvironment = ClusterEnvironment .builder ()
42- .timeoutConfig (TimeoutConfig .builder ().kvTimeout (Duration .ofSeconds (10 )))
43- .securityConfig (SecurityConfig .enableTls (true )
44- .trustManagerFactory (InsecureTrustManagerFactory .INSTANCE ))
45- .ioConfig (IoConfig .enableDnsSrv (true ))
46- .ioConfig (IoConfig .numKvConnections (DEFAULT_KV_CONNECTIONS ))
47- .build ();
48- logger .info ("Shared Cluster Environment initialized with " + DEFAULT_KV_CONNECTIONS + " KV connections for massively parallel collection loads" );
49- } catch (Exception e ) {
50- logger .error ("Failed to initialize shared Cluster Environment" , e );
44+ if (sharedEnvironment == null || environmentShutdown ) {
45+ synchronized (environmentLock ) {
46+ // Double-check under lock
47+ if (sharedEnvironment == null || environmentShutdown ) {
48+ try {
49+ if (sharedEnvironment != null && environmentShutdown ) {
50+ logger .info ("Shared Cluster Environment was shutdown, recreating" );
51+ }
52+
53+ sharedEnvironment = ClusterEnvironment .builder ()
54+ .timeoutConfig (TimeoutConfig .builder ().kvTimeout (Duration .ofSeconds (10 )))
55+ .securityConfig (SecurityConfig .enableTls (true )
56+ .trustManagerFactory (InsecureTrustManagerFactory .INSTANCE ))
57+ .ioConfig (IoConfig .enableDnsSrv (true ))
58+ .ioConfig (IoConfig .numKvConnections (DEFAULT_KV_CONNECTIONS ))
59+ .build ();
60+ environmentShutdown = false ;
61+ logger .info ("Shared Cluster Environment initialized with " + DEFAULT_KV_CONNECTIONS + " KV connections for massively parallel collection loads" );
62+ } catch (Exception e ) {
63+ logger .error ("Failed to initialize shared Cluster Environment" , e );
64+ }
65+ }
5166 }
5267 }
5368 }
@@ -108,6 +123,7 @@ public static synchronized void shutdownAll() {
108123
109124 if (sharedEnvironment != null ) {
110125 sharedEnvironment .shutdown ();
126+ environmentShutdown = true ;
111127 logger .info ("Shared Cluster Environment shutdown complete" );
112128 }
113129 }
0 commit comments