diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 791e04523..7d836cbbf 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.curator.framework.api.transaction.CuratorOp; @@ -171,12 +172,7 @@ public AsyncStage read() { @Override public AsyncStage read(Stat storingStatIn) { - return internalRead(n -> { - if (storingStatIn != null) { - DataTree.copyStat(n.stat(), storingStatIn); - } - return n.model(); - }); + return internalRead(n -> this.toModelWithStat(n, storingStatIn)); } @Override @@ -191,7 +187,7 @@ public AsyncStage readThrough() { @Override public AsyncStage readThrough(Stat storingStatIn) { - return internalRead(ZNode::model, () -> client.read(storingStatIn)); + return internalRead(n -> this.toModelWithStat(n, storingStatIn), () -> client.read(storingStatIn)); } @Override @@ -201,7 +197,7 @@ public AsyncStage> readThroughAsZNode() { @Override public AsyncStage> list() { - return internalChildren(entry -> entry.getValue().model()); + return this.filteredCacheChildren(entry -> entry.getValue().model(), __ -> true); } @Override @@ -231,12 +227,12 @@ public AsyncStage checkExists() { @Override public AsyncStage> children() { - return internalChildren(Map.Entry::getKey); + return this.clientPathDirectChildren(Map.Entry::getKey); } @Override public AsyncStage>> childrenAsZNodes() { - return internalChildren(Map.Entry::getValue); + return this.clientPathDirectChildren(Map.Entry::getValue); } @Override @@ -279,6 +275,13 @@ public AsyncStage> inTransaction(List return client.inTransaction(operations); } + private T toModelWithStat(ZNode n, Stat storingStatIn) { + if (storingStatIn != null) { + DataTree.copyStat(n.stat(), storingStatIn); + } + return n.model(); + } + private AsyncStage internalRead(Function, U> resolver) { return internalRead(resolver, null); } @@ -312,13 +315,18 @@ private AsyncStage internalRead(Function, U> resolver, Supplier< return stage; } - private ModelStage> internalChildren(Function>, U> resolver) { + private ModelStage> clientPathDirectChildren(Function>, U> resolver) { + return filteredCacheChildren( + resolver, e -> e.getKey().parent().equals(client.modelSpec().path())); + } + + private ModelStage> filteredCacheChildren( + Function>, U> resolver, Predicate>> filter) { ModelStage> stage = ModelStage.make(); init.whenComplete((__, throwable) -> { if (throwable == null) { - stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream() - .filter(e -> !e.getKey().isRoot() - && e.getKey().parent().equals(client.modelSpec().path())) + stage.complete(cache.currentChildren().entrySet().stream() + .filter(filter) .map(resolver) .collect(Collectors.toList())); } else { diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index 6f23cdd3e..45b284edb 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -118,10 +118,6 @@ public Optional> currentData(ZPath path) { return Optional.empty(); } - ZPath basePath() { - return basePath; - } - Map> currentChildren() { return currentChildren(basePath); } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 0e767ea37..e8444d802 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -148,6 +148,12 @@ public void testChildren() { }); }); + complete( + client.child("p").childrenAsZNodes(), + (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2))); + complete( + client.withPath(ZPath.from(path, "p")).childrenAsZNodes(), + (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2))); complete( client.child("p").child("c1").childrenAsZNodes(), (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(grandChild1))); @@ -157,10 +163,34 @@ public void testChildren() { complete( client.child("p").child("c1").list(), - (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild1))); + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); complete( client.child("p").child("c2").list(), - (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild2))); + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + complete( + client.child("p").list(), + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + complete( + client.child("p").child("c2").child("g2").list(), + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + } + + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec.withPath(ZPath.root)).cached()) { + client.start(); + complete( + client.list(), + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); } } @@ -244,6 +274,23 @@ void testNoNodeException() throws InterruptedException, TimeoutException { } } + @Test + void testRead() throws InterruptedException, TimeoutException, ExecutionException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); + client.start(); + assertEquals(model, timing.getFuture(client.read().toCompletableFuture())); + assertEquals( + model, + timing.getFuture(client.readAsZNode().toCompletableFuture()).model()); + Stat stat = new Stat(); + assertEquals(model, timing.getFuture(client.read(stat).toCompletableFuture())); + assertTrue(stat.getDataLength() > 0); + } + } + @Test void testReadThrough() throws InterruptedException, TimeoutException, ExecutionException { try (CachedModeledFramework client = @@ -264,6 +311,13 @@ public void initialized() { client.start(); assertTrue(timing.acquireSemaphore(semaphore)); assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); + Stat stat = new Stat(); + assertEquals(model, timing.getFuture(client.readThrough(stat).toCompletableFuture())); + assertTrue(stat.getDataLength() > 0); + assertEquals( + model, + timing.getFuture(client.readThroughAsZNode().toCompletableFuture()) + .model()); assertEquals(model, timing.getFuture(client.readThrough().toCompletableFuture())); } }