From 74844ef0f7f88719320fcb7f9fb500cffcd49c3e Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Wed, 5 Mar 2025 14:40:26 -0500 Subject: [PATCH 1/5] wait for cache initialization in CachedModeledFrameworkImpl --- .../details/CachedModeledFrameworkImpl.java | 107 +++++++++++++----- .../modeled/TestCachedModeledFramework.java | 26 +++++ 2 files changed, 102 insertions(+), 31 deletions(-) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index acccbfecf..b4afa5976 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; @@ -48,15 +47,40 @@ class CachedModeledFrameworkImpl implements CachedModeledFramework { private final ModeledFramework client; private final ModeledCacheImpl cache; private final Executor executor; + private final ModelStage init; CachedModeledFrameworkImpl(ModeledFramework client, ExecutorService executor) { - this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor); - } - - private CachedModeledFrameworkImpl(ModeledFramework client, ModeledCacheImpl cache, Executor executor) { + this( + client, + new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), + executor, + ModelStage.make()); + listenable().addListener(new ModeledCacheListener() { + @Override + public void accept(Type type, ZPath path, Stat stat, Object model) { + // NOP + } + + @Override + public void initialized() { + init.complete(null); + ModeledCacheListener.super.initialized(); + } + + @Override + public void handleException(Exception e) { + init.completeExceptionally(e); + ModeledCacheListener.super.handleException(e); + } + }); + } + + private CachedModeledFrameworkImpl( + ModeledFramework client, ModeledCacheImpl cache, Executor executor, ModelStage init) { this.client = client; this.cache = cache; this.executor = executor; + this.init = init; } @Override @@ -106,7 +130,7 @@ public ModelSpec modelSpec() { @Override public CachedModeledFramework child(Object child) { - return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor); + return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor, init); } @Override @@ -117,7 +141,7 @@ public ModeledFramework parent() { @Override public CachedModeledFramework withPath(ZPath path) { - return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor); + return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor, init); } @Override @@ -179,9 +203,7 @@ public AsyncStage> readThroughAsZNode() { @Override public AsyncStage> list() { - List children = - cache.currentChildren().values().stream().map(ZNode::model).collect(Collectors.toList()); - return ModelStage.completed(children); + return internalChildren(entry -> entry.getValue().model()); } @Override @@ -206,28 +228,27 @@ public AsyncStage delete(int version) { @Override public AsyncStage checkExists() { - ZPath path = client.modelSpec().path(); - Optional> data = cache.currentData(path); - return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null)); + ModelStage stage = ModelStage.make(); + init.whenComplete((__, throwable) -> { + if (throwable == null) { + ZPath path = client.modelSpec().path(); + Stat stat = cache.currentData(path).map(ZNode::stat).orElse(null); + stage.complete(stat); + } else { + stage.completeExceptionally(throwable); + } + }); + return stage; } @Override public AsyncStage> children() { - List paths = cache.currentChildren(client.modelSpec().path()).keySet().stream() - .filter(path -> !path.isRoot() - && path.parent().equals(client.modelSpec().path())) - .collect(Collectors.toList()); - return completed(paths); + return internalChildren(Map.Entry::getKey); } @Override public AsyncStage>> childrenAsZNodes() { - List> nodes = cache.currentChildren(client.modelSpec().path()).entrySet().stream() - .filter(e -> !e.getKey().isRoot() - && e.getKey().parent().equals(client.modelSpec().path())) - .map(Map.Entry::getValue) - .collect(Collectors.toList()); - return completed(nodes); + return internalChildren(Map.Entry::getValue); } @Override @@ -270,10 +291,6 @@ public AsyncStage> inTransaction(List return client.inTransaction(operations); } - private AsyncStage completed(U value) { - return ModelStage.completed(value); - } - private AsyncStage exceptionally() { KeeperException.NoNodeException exception = new KeeperException.NoNodeException(client.modelSpec().path().fullPath()); @@ -281,8 +298,36 @@ private AsyncStage exceptionally() { } private AsyncStage internalRead(Function, U> resolver, Supplier> elseProc) { - ZPath path = client.modelSpec().path(); - Optional> data = cache.currentData(path); - return data.map(node -> completed(resolver.apply(node))).orElseGet(elseProc); + ModelStage stage = ModelStage.make(); + init.whenComplete((__, throwable) -> { + if (throwable == null) { + ZPath path = client.modelSpec().path(); + ZNode zNode = cache.currentData(path).orElse(null); + if (zNode == null) { + stage.acceptEitherAsync(elseProc.get(), stage::complete); + } else { + stage.complete(resolver.apply(zNode)); + } + } else { + stage.completeExceptionally(throwable); + } + }); + return stage; + } + + private ModelStage> internalChildren(Function>, U> resolver) { + ModelStage> stage = ModelStage.make(); + init.whenComplete((__, throwable) -> { + if (throwable == null) { + stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream() + .filter(e -> !e.getKey().isRoot() + && e.getKey().parent().equals(client.modelSpec().path())) + .map(resolver) + .collect(Collectors.toList())); + } else { + stage.completeExceptionally(throwable); + } + }); + return stage; } } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 65c68aba9..e0d157b80 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -32,7 +32,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -41,6 +43,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.test.Timing; import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.curator.x.async.modeled.models.TestModel; @@ -78,6 +81,10 @@ public void testDownServer() throws IOException { assertNotNull(value); assertNull(e); })); + complete(client.child(model).checkExists().whenComplete((value, e) -> { + assertNotNull(value); + assertNull(e); + })); } finally { client.close(); } @@ -150,6 +157,13 @@ public void testChildren() { complete( client.child("p").child("c2").childrenAsZNodes(), (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(grandChild2))); + + complete( + client.child("p").child("c1").list(), + (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild1))); + complete( + client.child("p").child("c2").list(), + (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild2))); } } @@ -201,6 +215,18 @@ public void testEmptyNodeRawDeserialization() { verifyEmptyNodeDeserialization(byteModel, byteModelSpec); } + @Test + void testInitializedCachedModeledFramework() throws ExecutionException, InterruptedException, TimeoutException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); + AsyncStage asyncModel = client.read(); + client.start(); + assertEquals(model, timing.getFuture(asyncModel.toCompletableFuture())); + } + } + private void verifyEmptyNodeDeserialization(T model, ModelSpec parentModelSpec) { // The sub-path is the ZNode that will be removed that does not contain any model data. Their should be no // attempt to deserialize this empty ZNode. From dd64ccec636e7afc8bccec51ddd327a5dc57b01b Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Wed, 5 Mar 2025 22:12:15 -0500 Subject: [PATCH 2/5] fix bugs in cache miss scenario --- .../details/CachedModeledFrameworkImpl.java | 39 +++++++++------- .../modeled/TestCachedModeledFramework.java | 46 +++++++++++++++++-- 2 files changed, 64 insertions(+), 21 deletions(-) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index b4afa5976..29eae0e3d 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -166,24 +166,22 @@ public AsyncStage set(T model, int version) { @Override public AsyncStage read() { - return internalRead(ZNode::model, this::exceptionally); + return internalRead(ZNode::model); } @Override public AsyncStage read(Stat storingStatIn) { - return internalRead( - n -> { - if (storingStatIn != null) { - DataTree.copyStat(n.stat(), storingStatIn); - } - return n.model(); - }, - this::exceptionally); + return internalRead(n -> { + if (storingStatIn != null) { + DataTree.copyStat(n.stat(), storingStatIn); + } + return n.model(); + }); } @Override public AsyncStage> readAsZNode() { - return internalRead(Function.identity(), this::exceptionally); + return internalRead(Function.identity()); } @Override @@ -291,20 +289,29 @@ public AsyncStage> inTransaction(List return client.inTransaction(operations); } - private AsyncStage exceptionally() { - KeeperException.NoNodeException exception = - new KeeperException.NoNodeException(client.modelSpec().path().fullPath()); - return ModelStage.exceptionally(exception); + private AsyncStage internalRead(Function, U> resolver) { + return internalRead(resolver, null); } - private AsyncStage internalRead(Function, U> resolver, Supplier> elseProc) { + private AsyncStage internalRead(Function, U> resolver, Supplier> defaultSupplier) { ModelStage stage = ModelStage.make(); init.whenComplete((__, throwable) -> { if (throwable == null) { ZPath path = client.modelSpec().path(); ZNode zNode = cache.currentData(path).orElse(null); if (zNode == null) { - stage.acceptEitherAsync(elseProc.get(), stage::complete); + if (defaultSupplier == null) { + stage.completeExceptionally(new KeeperException.NoNodeException( + client.modelSpec().path().fullPath())); + } else { + defaultSupplier.get().whenComplete((elseData, elseThrowable) -> { + if (elseThrowable == null) { + stage.complete(elseData); + } else { + stage.completeExceptionally(elseThrowable); + } + }); + } } else { stage.complete(resolver.apply(zNode)); } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index e0d157b80..0f08592c4 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -19,11 +19,7 @@ package org.apache.curator.x.async.modeled; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import com.google.common.collect.Sets; import java.io.IOException; import java.math.BigInteger; @@ -47,6 +43,7 @@ import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.curator.x.async.modeled.models.TestModel; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -227,6 +224,45 @@ void testInitializedCachedModeledFramework() throws ExecutionException, Interrup } } + @Test + void testNoNodeException() throws InterruptedException, TimeoutException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + AsyncStage asyncModel = client.read(); + client.start(); + try { + timing.getFuture(asyncModel.toCompletableFuture()); + fail("This test should result in a NoNodeException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof KeeperException.NoNodeException); + } + } + } + + @Test + void testReadThrough() throws InterruptedException, TimeoutException, ExecutionException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + Semaphore semaphore = new Semaphore(0); + client.listenable().addListener(new ModeledCacheListener() { + @Override + public void accept(Type type, ZPath path, Stat stat, TestModel model) { + // NOP + } + + @Override + public void initialized() { + semaphore.release(); + } + }); + client.start(); + assertTrue(timing.acquireSemaphore(semaphore)); + assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); + assertEquals(model, timing.getFuture(client.readThrough().toCompletableFuture())); + } + } + private void verifyEmptyNodeDeserialization(T model, ModelSpec parentModelSpec) { // The sub-path is the ZNode that will be removed that does not contain any model data. Their should be no // attempt to deserialize this empty ZNode. From 3fba8a7b6446b1c39f71e4b56c2e46e19ef80701 Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Thu, 6 Mar 2025 09:54:49 -0500 Subject: [PATCH 3/5] call internalRead from checkExists --- .../modeled/details/CachedModeledFrameworkImpl.java | 12 +----------- .../x/async/modeled/TestCachedModeledFramework.java | 5 +++++ 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 29eae0e3d..791e04523 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -226,17 +226,7 @@ public AsyncStage delete(int version) { @Override public AsyncStage checkExists() { - ModelStage stage = ModelStage.make(); - init.whenComplete((__, throwable) -> { - if (throwable == null) { - ZPath path = client.modelSpec().path(); - Stat stat = cache.currentData(path).map(ZNode::stat).orElse(null); - stage.complete(stat); - } else { - stage.completeExceptionally(throwable); - } - }); - return stage; + return internalRead(ZNode::stat, () -> ModelStage.completed(null)); } @Override diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 0f08592c4..d04fb8de8 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -236,6 +236,11 @@ void testNoNodeException() throws InterruptedException, TimeoutException { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof KeeperException.NoNodeException); } + + complete(client.child(asyncModel).checkExists().whenComplete((value, e) -> { + assertNull(value); + assertNull(e); + })); } } From 7534c280828c74e925f409091ec6dfd877c6834d Mon Sep 17 00:00:00 2001 From: Luke Kot-Zaniewski Date: Wed, 19 Mar 2025 11:35:15 -0400 Subject: [PATCH 4/5] Update curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java Co-authored-by: Kezhu Wang --- .../curator/x/async/modeled/TestCachedModeledFramework.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index d04fb8de8..1c6a3f38c 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -237,7 +237,7 @@ void testNoNodeException() throws InterruptedException, TimeoutException { assertTrue(e.getCause() instanceof KeeperException.NoNodeException); } - complete(client.child(asyncModel).checkExists().whenComplete((value, e) -> { + complete(client.checkExists().whenComplete((value, e) -> { assertNull(value); assertNull(e); })); From 36c0d92a110d9dc6dadd956a0bb74e658f0a5882 Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Wed, 19 Mar 2025 14:42:17 -0400 Subject: [PATCH 5/5] test failure condition --- .../modeled/TestCachedModeledFramework.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 1c6a3f38c..0e767ea37 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -268,6 +268,22 @@ public void initialized() { } } + @Test + void testReadThroughFailure() throws InterruptedException, TimeoutException, ExecutionException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + client.start(); + assertNull(timing.getFuture(client.delete().toCompletableFuture())); + complete(client.readThrough(), (d, e) -> { + if (e == null) { + fail("This test should result in a NoNodeException"); + } else { + assertEquals(KeeperException.NoNodeException.class, e.getClass()); + } + }); + } + } + private void verifyEmptyNodeDeserialization(T model, ModelSpec parentModelSpec) { // The sub-path is the ZNode that will be removed that does not contain any model data. Their should be no // attempt to deserialize this empty ZNode.