diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java index cc98a0957..c3009ac85 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java @@ -58,6 +58,11 @@ public abstract class CuratorFrameworkBase implements CuratorFramework { abstract NamespaceImpl getNamespaceImpl(); + /** + * Return the underlying client which is the one constructed from {@link org.apache.curator.framework.CuratorFrameworkFactory}. + */ + public abstract CuratorFramework client(); + @Override public final CuratorFramework nonNamespaceView() { return usingNamespace(null); diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 0f1680f3e..ae4ba3a81 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -254,6 +254,11 @@ public void clearWatcherReferences(Watcher watcher) { // NOP } + @Override + public CuratorFramework client() { + return this; + } + @Override public CuratorFrameworkState getState() { return state.get(); diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java index 5850242bf..df9ff0b61 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java @@ -47,6 +47,11 @@ public DelegatingCuratorFramework(CuratorFrameworkBase client) { this.client = client; } + @Override + public final CuratorFramework client() { + return client.client(); + } + @Override public CuratorFrameworkState getState() { return client.getState(); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java index e8d191c46..38487d237 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -27,6 +27,7 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorClosedException; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.imps.CuratorFrameworkBase; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.StandardListenerManager; import org.apache.curator.framework.state.ConnectionStateListener; @@ -80,7 +81,8 @@ public PersistentWatcher(CuratorFramework client, String basePath, boolean recur public void start() { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); client.getConnectionStateListenable().addListener(connectionStateListener); - client.getCuratorListenable().addListener(((ignored, event) -> { + // This could be a namespaced facade which does not support getCuratorListenable. + ((CuratorFrameworkBase) client).client().getCuratorListenable().addListener(((ignored, event) -> { if (event.getType() == CuratorEventType.CLOSING) { onClientClosed(); } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java index 902b18a0a..a94a99281 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -48,6 +48,45 @@ public void testConnectionLost() throws Exception { internalTest(false); } + @Test + public void testNamespacedWatching() throws Exception { + BlockingQueue events = new LinkedBlockingQueue<>(); + + try (CuratorFramework client = CuratorFrameworkFactory.newClient( + server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) { + client.start(); + // given: connected curator client + client.blockUntilConnected(); + + // given: started persistent watcher under namespaced facade + PersistentWatcher persistentWatcher = new PersistentWatcher(client.usingNamespace("top"), "/main", true); + persistentWatcher.getListenable().addListener(events::add); + persistentWatcher.start(); + + // when: create paths + client.create().forPath("/top/main"); + client.create().forPath("/top/main/a"); + + // then: receive node watch events + WatchedEvent event1 = events.poll(5, TimeUnit.SECONDS); + assertNotNull(event1); + assertEquals(Watcher.Event.EventType.NodeCreated, event1.getType()); + assertEquals("/main", event1.getPath()); + + WatchedEvent event2 = events.poll(5, TimeUnit.SECONDS); + assertNotNull(event2); + assertEquals(Watcher.Event.EventType.NodeCreated, event2.getType()); + assertEquals("/main/a", event2.getPath()); + } + + // when: curator client closed + // then: listener get Closed notification + WatchedEvent event = events.poll(5, TimeUnit.SECONDS); + assertNotNull(event); + assertEquals(Watcher.Event.EventType.None, event.getType()); + assertEquals(Watcher.Event.KeeperState.Closed, event.getState()); + } + @Test public void testConcurrentClientClose() throws Exception { BlockingQueue events = new LinkedBlockingQueue<>();