@@ -50,25 +50,40 @@ def getInformationBeforeDispose(self):
5050 def getInformationAfterDispose (self ):
5151 return [shard .getInformationAfterDispose () for shard in self .shards ]
5252
53+ def _agreeOk (self ):
54+ ok = 0
55+ for shard in self .shards :
56+ con = shard .getConnection ()
57+ try :
58+ status = con .execute_command ('CLUSTER' , 'INFO' )
59+ except Exception as e :
60+ print ('got error on cluster slots, will try again, %s' % str (e ))
61+ continue
62+ if 'cluster_state:ok' in str (status ):
63+ ok += 1
64+ return ok == len (self .shards )
65+
66+ def _agreeSlots (self ):
67+ ok = 0
68+ first_view = None
69+ for shard in self .shards :
70+ con = shard .getConnection ()
71+ try :
72+ slots_view = con .execute_command ('CLUSTER' , 'SLOTS' )
73+ except Exception as e :
74+ print ('got error on cluster slots, will try again, %s' % str (e ))
75+ continue
76+ if first_view is None :
77+ first_view = slots_view
78+ if slots_view == first_view :
79+ ok += 1
80+ return ok == len (self .shards )
81+
5382 def waitCluster (self , timeout_sec = 40 ):
5483 st = time .time ()
55- ok = 0
5684
5785 while st + timeout_sec > time .time ():
58- ok = 0
59- first_view = None
60- for shard in self .shards :
61- con = shard .getConnection ()
62- try :
63- slots_pov = con .execute_command ('CLUSTER' , 'SLOTS' )
64- except Exception as e :
65- print ('got error on cluster info, will try again, %s' % str (e ))
66- continue
67- if first_view is None :
68- first_view = slots_pov
69- if slots_pov == first_view :
70- ok += 1
71- if ok == len (self .shards ):
86+ if self ._agreeOk () and self ._agreeSlots ():
7287 for shard in self .shards :
7388 try :
7489 shard .getConnection ().execute_command ('SEARCH.CLUSTERREFRESH' )
0 commit comments