@@ -446,6 +446,48 @@ def callback(event):
446446 assert events [1 ].key .decode () == '/doot/watch/prefix/callback/1'
447447 assert events [1 ].value .decode () == '1'
448448
449+ def test_watch_prefix_callback_with_filter (self , etcd ):
450+ def update_etcd (v ):
451+ etcdctl ('put' , '/doot/watch/prefix/callback/' + v , v )
452+ out = etcdctl ('get' , '/doot/watch/prefix/callback/' + v )
453+ assert base64 .b64decode (out ['kvs' ][0 ]['value' ]) == \
454+ utils .to_bytes (v )
455+
456+ def delete_etcd (v ):
457+ etcdctl ('del' , '/doot/watch/prefix/callback/' + v )
458+
459+ def update_key ():
460+ time .sleep (3 )
461+ update_etcd ('0' )
462+ time .sleep (1 )
463+ update_etcd ('1' )
464+ time .sleep (1 )
465+ delete_etcd ('1' )
466+ time .sleep (1 )
467+
468+ events = []
469+
470+ def callback (event ):
471+ events .extend (event .events )
472+
473+ t = threading .Thread (name = "update_key_prefix" , target = update_key )
474+ t .start ()
475+
476+ watch_id = etcd .add_watch_prefix_callback (
477+ '/doot/watch/prefix/callback/' ,
478+ callback ,
479+ filters = [etcdrpc .WatchCreateRequest .FilterType .NODELETE ]
480+ )
481+
482+ t .join ()
483+ etcd .cancel_watch (watch_id )
484+
485+ assert len (events ) == 2
486+ assert events [0 ].key .decode () == '/doot/watch/prefix/callback/0'
487+ assert events [0 ].value .decode () == '0'
488+ assert events [1 ].key .decode () == '/doot/watch/prefix/callback/1'
489+ assert events [1 ].value .decode () == '1'
490+
449491 def test_sequential_watch_prefix_once (self , etcd ):
450492 try :
451493 etcd .watch_prefix_once ('/doot/' , 1 )
0 commit comments