@@ -1311,3 +1311,86 @@ def test_compare_create(self):
13111311 create_compare = tx .create (key ) > - 12
13121312 assert create_compare .op == etcdrpc .Compare .GREATER
13131313 assert create_compare .build_message ().target == etcdrpc .Compare .CREATE
1314+
1315+
1316+ @pytest .mark .skipif (not os .environ .get ('ETCDCTL_ENDPOINTS' ),
1317+ reason = "Expected etcd to have been run by pifpaf" )
1318+ class TestFailoverClient (object ):
1319+ @pytest .fixture
1320+ def etcd (self ):
1321+ endpoint_urls = os .environ .get ('ETCDCTL_ENDPOINTS' ).split (',' )
1322+ timeout = 5
1323+ endpoints = []
1324+ for url in endpoint_urls :
1325+ url = urlparse (url )
1326+ endpoints .append (etcd3 .Endpoint (host = url .hostname ,
1327+ port = url .port ,
1328+ secure = False ))
1329+ with etcd3 .client (endpoints = endpoints ,
1330+ timeout = timeout ,
1331+ failover = True ) as client :
1332+ yield client
1333+
1334+ @retry (wait = wait_fixed (2 ), stop = stop_after_attempt (3 ))
1335+ def delete_keys_definitely ():
1336+ # clean up after fixture goes out of scope
1337+ etcdctl ('del' , '--prefix' , '/' )
1338+ out = etcdctl ('get' , '--prefix' , '/' )
1339+ assert 'kvs' not in out
1340+
1341+ delete_keys_definitely ()
1342+
1343+ def test_endpoint_offline (self , etcd ):
1344+ original_endpoint = etcd .endpoint_in_use
1345+ assert not original_endpoint .is_failed ()
1346+ exception = TestEtcd3 .MockedException (grpc .StatusCode .UNAVAILABLE )
1347+ kv_mock = mock .PropertyMock ()
1348+ kv_mock .Range .side_effect = exception
1349+ with mock .patch ('etcd3.Etcd3Client.kvstub' ,
1350+ new_callable = mock .PropertyMock ) as property_mock :
1351+ property_mock .return_value = kv_mock
1352+ with pytest .raises (etcd3 .exceptions .ConnectionFailedError ):
1353+ etcd .get ("foo" )
1354+ assert etcd .endpoint_in_use is original_endpoint
1355+ assert etcd .endpoint_in_use .is_failed ()
1356+ etcd .get ("foo" )
1357+ assert etcd .endpoint_in_use is not original_endpoint
1358+ assert not etcd .endpoint_in_use .is_failed ()
1359+
1360+ def test_failover_during_watch (self , etcd ):
1361+ class Interceptor (grpc .StreamStreamClientInterceptor ):
1362+ def intercept_stream_stream (self , continuation ,
1363+ client_call_details , request_iterator ):
1364+ response_iterator = continuation (client_call_details ,
1365+ request_iterator )
1366+
1367+ def new_iterator ():
1368+ yield next (response_iterator )
1369+ with etcd .watcher ._new_watch_cond :
1370+ while True :
1371+ etcd .watcher ._new_watch_cond .wait ()
1372+ if etcd .watcher ._new_watch is None :
1373+ break
1374+ with response_iterator ._state .condition :
1375+ response_iterator ._state .code = \
1376+ grpc .StatusCode .UNAVAILABLE
1377+ yield next (response_iterator )
1378+ return new_iterator ()
1379+
1380+ original_endpoint = etcd .endpoint_in_use
1381+ assert not original_endpoint .is_failed ()
1382+ failing_channel = grpc .intercept_channel (original_endpoint .channel ,
1383+ Interceptor ())
1384+ with mock .patch .object (original_endpoint , "channel" , failing_channel ):
1385+ iterator , cancel = etcd .watch ("foo" )
1386+ with pytest .raises (etcd3 .exceptions .ConnectionFailedError ):
1387+ next (iterator )
1388+ assert etcd .endpoint_in_use is original_endpoint
1389+ assert etcd .endpoint_in_use .is_failed ()
1390+ cancel ()
1391+ assert etcd .endpoint_in_use is not original_endpoint
1392+ assert not etcd .endpoint_in_use .is_failed ()
1393+ iterator , cancel = etcd .watch ("foo" )
1394+ etcd .put ("foo" , b"foo" )
1395+ assert next (iterator )
1396+ cancel ()
0 commit comments