From 7e77ea6e36b8522f1ec560110f5984ba45d016ce Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Fri, 21 Mar 2025 10:10:36 -0400 Subject: [PATCH 1/4] make CachedModeledFramework:list return all cache children --- .../details/CachedModeledFrameworkImpl.java | 21 +++++++++++++------ .../modeled/details/ModeledCacheImpl.java | 4 ---- .../modeled/TestCachedModeledFramework.java | 14 +++++++++++-- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 791e04523..c3c1d9e36 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.curator.framework.api.transaction.CuratorOp; @@ -201,7 +202,7 @@ public AsyncStage> readThroughAsZNode() { @Override public AsyncStage> list() { - return internalChildren(entry -> entry.getValue().model()); + return this.internalList(entry -> entry.getValue().model()); } @Override @@ -231,12 +232,12 @@ public AsyncStage checkExists() { @Override public AsyncStage> children() { - return internalChildren(Map.Entry::getKey); + return this.internalChildren(Map.Entry::getKey); } @Override public AsyncStage>> childrenAsZNodes() { - return internalChildren(Map.Entry::getValue); + return this.internalChildren(Map.Entry::getValue); } @Override @@ -312,13 +313,21 @@ private AsyncStage internalRead(Function, U> resolver, Supplier< return stage; } + private ModelStage> internalList(Function>, U> resolver) { + return internalChildren(resolver, __ -> true); + } + private ModelStage> internalChildren(Function>, U> resolver) { + return internalChildren(resolver, e -> e.getKey().parent().equals(client.modelSpec().path())); + } + + private ModelStage> internalChildren(Function>, U> resolver, + Predicate>> filter) { ModelStage> stage = ModelStage.make(); init.whenComplete((__, throwable) -> { if (throwable == null) { - stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream() - .filter(e -> !e.getKey().isRoot() - && e.getKey().parent().equals(client.modelSpec().path())) + stage.complete(cache.currentChildren().entrySet().stream() + .filter(filter) .map(resolver) .collect(Collectors.toList())); } else { diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index 6f23cdd3e..45b284edb 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -118,10 +118,6 @@ public Optional> currentData(ZPath path) { return Optional.empty(); } - ZPath basePath() { - return basePath; - } - Map> currentChildren() { return currentChildren(basePath); } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 0e767ea37..b0693c10d 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -148,6 +148,10 @@ public void testChildren() { }); }); + + complete( + client.child("p").childrenAsZNodes(), + (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2))); complete( client.child("p").child("c1").childrenAsZNodes(), (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(grandChild1))); @@ -157,10 +161,16 @@ public void testChildren() { complete( client.child("p").child("c1").list(), - (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild1))); + (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); complete( client.child("p").child("c2").list(), - (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild2))); + (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + complete( + client.child("p").list(), + (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + complete( + client.child("p").child("c2").child("g2").list(), + (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); } } From a2ab7dccd0498343aa10bd2e64876b753a29155a Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Fri, 21 Mar 2025 12:07:10 -0400 Subject: [PATCH 2/4] fix bug in readThrough(Stat) + tests --- .../details/CachedModeledFrameworkImpl.java | 35 +++++---- .../modeled/TestCachedModeledFramework.java | 75 ++++++++++++++++--- 2 files changed, 85 insertions(+), 25 deletions(-) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index c3c1d9e36..656373096 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -172,12 +172,7 @@ public AsyncStage read() { @Override public AsyncStage read(Stat storingStatIn) { - return internalRead(n -> { - if (storingStatIn != null) { - DataTree.copyStat(n.stat(), storingStatIn); - } - return n.model(); - }); + return internalRead(n -> this.toModelWithStat(n, storingStatIn)); } @Override @@ -192,7 +187,7 @@ public AsyncStage readThrough() { @Override public AsyncStage readThrough(Stat storingStatIn) { - return internalRead(ZNode::model, () -> client.read(storingStatIn)); + return internalRead(n -> this.toModelWithStat(n, storingStatIn), () -> client.read(storingStatIn)); } @Override @@ -202,7 +197,7 @@ public AsyncStage> readThroughAsZNode() { @Override public AsyncStage> list() { - return this.internalList(entry -> entry.getValue().model()); + return this.allCacheChildren(entry -> entry.getValue().model()); } @Override @@ -232,12 +227,12 @@ public AsyncStage checkExists() { @Override public AsyncStage> children() { - return this.internalChildren(Map.Entry::getKey); + return this.clientPathDirectChildren(Map.Entry::getKey); } @Override public AsyncStage>> childrenAsZNodes() { - return this.internalChildren(Map.Entry::getValue); + return this.clientPathDirectChildren(Map.Entry::getValue); } @Override @@ -280,6 +275,13 @@ public AsyncStage> inTransaction(List return client.inTransaction(operations); } + private T toModelWithStat(ZNode n, Stat storingStatIn) { + if (storingStatIn != null) { + DataTree.copyStat(n.stat(), storingStatIn); + } + return n.model(); + } + private AsyncStage internalRead(Function, U> resolver) { return internalRead(resolver, null); } @@ -313,16 +315,17 @@ private AsyncStage internalRead(Function, U> resolver, Supplier< return stage; } - private ModelStage> internalList(Function>, U> resolver) { - return internalChildren(resolver, __ -> true); + private ModelStage> allCacheChildren(Function>, U> resolver) { + return filteredCacheChildren(resolver, __ -> true); } - private ModelStage> internalChildren(Function>, U> resolver) { - return internalChildren(resolver, e -> e.getKey().parent().equals(client.modelSpec().path())); + private ModelStage> clientPathDirectChildren(Function>, U> resolver) { + return filteredCacheChildren( + resolver, e -> e.getKey().parent().equals(client.modelSpec().path())); } - private ModelStage> internalChildren(Function>, U> resolver, - Predicate>> filter) { + private ModelStage> filteredCacheChildren( + Function>, U> resolver, Predicate>> filter) { ModelStage> stage = ModelStage.make(); init.whenComplete((__, throwable) -> { if (throwable == null) { diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index b0693c10d..3543e9279 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -148,10 +148,12 @@ public void testChildren() { }); }); - complete( - client.child("p").childrenAsZNodes(), - (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2))); + client.child("p").childrenAsZNodes(), + (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2))); + complete( + client.withPath(ZPath.from(path, "p")).childrenAsZNodes(), + (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2))); complete( client.child("p").child("c1").childrenAsZNodes(), (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(grandChild1))); @@ -161,16 +163,34 @@ public void testChildren() { complete( client.child("p").child("c1").list(), - (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); complete( client.child("p").child("c2").list(), - (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); complete( - client.child("p").list(), - (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + client.child("p").list(), + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + complete( + client.child("p").child("c2").child("g2").list(), + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + } + + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec.withPath(ZPath.root)).cached()) { + client.start(); complete( - client.child("p").child("c2").child("g2").list(), - (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); + client.list(), + (v, e) -> assertEquals( + toSet(v.stream(), Function.identity()), + Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2))); } } @@ -254,6 +274,36 @@ void testNoNodeException() throws InterruptedException, TimeoutException { } } + @Test + void testRead() throws InterruptedException, TimeoutException, ExecutionException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); + Semaphore semaphore = new Semaphore(0); + client.listenable().addListener(new ModeledCacheListener() { + @Override + public void accept(Type type, ZPath path, Stat stat, TestModel model) { + // NOP + } + + @Override + public void initialized() { + semaphore.release(); + } + }); + client.start(); + assertTrue(timing.acquireSemaphore(semaphore)); + assertEquals(model, timing.getFuture(client.read().toCompletableFuture())); + assertEquals( + model, + timing.getFuture(client.readAsZNode().toCompletableFuture()).model()); + Stat stat = new Stat(); + assertEquals(model, timing.getFuture(client.read(stat).toCompletableFuture())); + assertTrue(stat.getDataLength() > 0); + } + } + @Test void testReadThrough() throws InterruptedException, TimeoutException, ExecutionException { try (CachedModeledFramework client = @@ -274,6 +324,13 @@ public void initialized() { client.start(); assertTrue(timing.acquireSemaphore(semaphore)); assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); + Stat stat = new Stat(); + assertEquals(model, timing.getFuture(client.readThrough(stat).toCompletableFuture())); + assertTrue(stat.getDataLength() > 0); + assertEquals( + model, + timing.getFuture(client.readThroughAsZNode().toCompletableFuture()) + .model()); assertEquals(model, timing.getFuture(client.readThrough().toCompletableFuture())); } } From a7c84dd7178e70a202494c6ab9ef8a83b2a3d68d Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Fri, 21 Mar 2025 12:10:10 -0400 Subject: [PATCH 3/4] remove redundant indirection --- .../x/async/modeled/details/CachedModeledFrameworkImpl.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 656373096..7d836cbbf 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -197,7 +197,7 @@ public AsyncStage> readThroughAsZNode() { @Override public AsyncStage> list() { - return this.allCacheChildren(entry -> entry.getValue().model()); + return this.filteredCacheChildren(entry -> entry.getValue().model(), __ -> true); } @Override @@ -315,10 +315,6 @@ private AsyncStage internalRead(Function, U> resolver, Supplier< return stage; } - private ModelStage> allCacheChildren(Function>, U> resolver) { - return filteredCacheChildren(resolver, __ -> true); - } - private ModelStage> clientPathDirectChildren(Function>, U> resolver) { return filteredCacheChildren( resolver, e -> e.getKey().parent().equals(client.modelSpec().path())); From ad61a8c13b4d12297f949db9d717bf501dbe6520 Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Fri, 21 Mar 2025 15:32:34 -0400 Subject: [PATCH 4/4] simplify testRead --- .../x/async/modeled/TestCachedModeledFramework.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 3543e9279..e8444d802 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -280,20 +280,7 @@ void testRead() throws InterruptedException, TimeoutException, ExecutionExceptio ModeledFramework.wrap(async, modelSpec).cached()) { TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); - Semaphore semaphore = new Semaphore(0); - client.listenable().addListener(new ModeledCacheListener() { - @Override - public void accept(Type type, ZPath path, Stat stat, TestModel model) { - // NOP - } - - @Override - public void initialized() { - semaphore.release(); - } - }); client.start(); - assertTrue(timing.acquireSemaphore(semaphore)); assertEquals(model, timing.getFuture(client.read().toCompletableFuture())); assertEquals( model,