diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index acccbfecf..791e04523 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; @@ -48,15 +47,40 @@ class CachedModeledFrameworkImpl implements CachedModeledFramework { private final ModeledFramework client; private final ModeledCacheImpl cache; private final Executor executor; + private final ModelStage init; CachedModeledFrameworkImpl(ModeledFramework client, ExecutorService executor) { - this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor); - } - - private CachedModeledFrameworkImpl(ModeledFramework client, ModeledCacheImpl cache, Executor executor) { + this( + client, + new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), + executor, + ModelStage.make()); + listenable().addListener(new ModeledCacheListener() { + @Override + public void accept(Type type, ZPath path, Stat stat, Object model) { + // NOP + } + + @Override + public void initialized() { + init.complete(null); + ModeledCacheListener.super.initialized(); + } + + @Override + public void handleException(Exception e) { + init.completeExceptionally(e); + ModeledCacheListener.super.handleException(e); + } + }); + } + + private CachedModeledFrameworkImpl( + ModeledFramework client, ModeledCacheImpl cache, Executor executor, ModelStage init) { this.client = client; this.cache = cache; this.executor = executor; + this.init = init; } @Override @@ -106,7 +130,7 @@ public ModelSpec modelSpec() { @Override public CachedModeledFramework child(Object child) { - return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor); + return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor, init); } @Override @@ -117,7 +141,7 @@ public ModeledFramework parent() { @Override public CachedModeledFramework withPath(ZPath path) { - return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor); + return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor, init); } @Override @@ -142,24 +166,22 @@ public AsyncStage set(T model, int version) { @Override public AsyncStage read() { - return internalRead(ZNode::model, this::exceptionally); + return internalRead(ZNode::model); } @Override public AsyncStage read(Stat storingStatIn) { - return internalRead( - n -> { - if (storingStatIn != null) { - DataTree.copyStat(n.stat(), storingStatIn); - } - return n.model(); - }, - this::exceptionally); + return internalRead(n -> { + if (storingStatIn != null) { + DataTree.copyStat(n.stat(), storingStatIn); + } + return n.model(); + }); } @Override public AsyncStage> readAsZNode() { - return internalRead(Function.identity(), this::exceptionally); + return internalRead(Function.identity()); } @Override @@ -179,9 +201,7 @@ public AsyncStage> readThroughAsZNode() { @Override public AsyncStage> list() { - List children = - cache.currentChildren().values().stream().map(ZNode::model).collect(Collectors.toList()); - return ModelStage.completed(children); + return internalChildren(entry -> entry.getValue().model()); } @Override @@ -206,28 +226,17 @@ public AsyncStage delete(int version) { @Override public AsyncStage checkExists() { - ZPath path = client.modelSpec().path(); - Optional> data = cache.currentData(path); - return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null)); + return internalRead(ZNode::stat, () -> ModelStage.completed(null)); } @Override public AsyncStage> children() { - List paths = cache.currentChildren(client.modelSpec().path()).keySet().stream() - .filter(path -> !path.isRoot() - && path.parent().equals(client.modelSpec().path())) - .collect(Collectors.toList()); - return completed(paths); + return internalChildren(Map.Entry::getKey); } @Override public AsyncStage>> childrenAsZNodes() { - List> nodes = cache.currentChildren(client.modelSpec().path()).entrySet().stream() - .filter(e -> !e.getKey().isRoot() - && e.getKey().parent().equals(client.modelSpec().path())) - .map(Map.Entry::getValue) - .collect(Collectors.toList()); - return completed(nodes); + return internalChildren(Map.Entry::getValue); } @Override @@ -270,19 +279,52 @@ public AsyncStage> inTransaction(List return client.inTransaction(operations); } - private AsyncStage completed(U value) { - return ModelStage.completed(value); - } - - private AsyncStage exceptionally() { - KeeperException.NoNodeException exception = - new KeeperException.NoNodeException(client.modelSpec().path().fullPath()); - return ModelStage.exceptionally(exception); - } - - private AsyncStage internalRead(Function, U> resolver, Supplier> elseProc) { - ZPath path = client.modelSpec().path(); - Optional> data = cache.currentData(path); - return data.map(node -> completed(resolver.apply(node))).orElseGet(elseProc); + private AsyncStage internalRead(Function, U> resolver) { + return internalRead(resolver, null); + } + + private AsyncStage internalRead(Function, U> resolver, Supplier> defaultSupplier) { + ModelStage stage = ModelStage.make(); + init.whenComplete((__, throwable) -> { + if (throwable == null) { + ZPath path = client.modelSpec().path(); + ZNode zNode = cache.currentData(path).orElse(null); + if (zNode == null) { + if (defaultSupplier == null) { + stage.completeExceptionally(new KeeperException.NoNodeException( + client.modelSpec().path().fullPath())); + } else { + defaultSupplier.get().whenComplete((elseData, elseThrowable) -> { + if (elseThrowable == null) { + stage.complete(elseData); + } else { + stage.completeExceptionally(elseThrowable); + } + }); + } + } else { + stage.complete(resolver.apply(zNode)); + } + } else { + stage.completeExceptionally(throwable); + } + }); + return stage; + } + + private ModelStage> internalChildren(Function>, U> resolver) { + ModelStage> stage = ModelStage.make(); + init.whenComplete((__, throwable) -> { + if (throwable == null) { + stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream() + .filter(e -> !e.getKey().isRoot() + && e.getKey().parent().equals(client.modelSpec().path())) + .map(resolver) + .collect(Collectors.toList())); + } else { + stage.completeExceptionally(throwable); + } + }); + return stage; } } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 65c68aba9..0e767ea37 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -19,11 +19,7 @@ package org.apache.curator.x.async.modeled; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import com.google.common.collect.Sets; import java.io.IOException; import java.math.BigInteger; @@ -32,7 +28,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -41,9 +39,11 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.test.Timing; import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.curator.x.async.modeled.models.TestModel; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -78,6 +78,10 @@ public void testDownServer() throws IOException { assertNotNull(value); assertNull(e); })); + complete(client.child(model).checkExists().whenComplete((value, e) -> { + assertNotNull(value); + assertNull(e); + })); } finally { client.close(); } @@ -150,6 +154,13 @@ public void testChildren() { complete( client.child("p").child("c2").childrenAsZNodes(), (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(grandChild2))); + + complete( + client.child("p").child("c1").list(), + (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild1))); + complete( + client.child("p").child("c2").list(), + (v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild2))); } } @@ -201,6 +212,78 @@ public void testEmptyNodeRawDeserialization() { verifyEmptyNodeDeserialization(byteModel, byteModelSpec); } + @Test + void testInitializedCachedModeledFramework() throws ExecutionException, InterruptedException, TimeoutException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); + AsyncStage asyncModel = client.read(); + client.start(); + assertEquals(model, timing.getFuture(asyncModel.toCompletableFuture())); + } + } + + @Test + void testNoNodeException() throws InterruptedException, TimeoutException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + AsyncStage asyncModel = client.read(); + client.start(); + try { + timing.getFuture(asyncModel.toCompletableFuture()); + fail("This test should result in a NoNodeException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof KeeperException.NoNodeException); + } + + complete(client.checkExists().whenComplete((value, e) -> { + assertNull(value); + assertNull(e); + })); + } + } + + @Test + void testReadThrough() throws InterruptedException, TimeoutException, ExecutionException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + Semaphore semaphore = new Semaphore(0); + client.listenable().addListener(new ModeledCacheListener() { + @Override + public void accept(Type type, ZPath path, Stat stat, TestModel model) { + // NOP + } + + @Override + public void initialized() { + semaphore.release(); + } + }); + client.start(); + assertTrue(timing.acquireSemaphore(semaphore)); + assertNotNull(timing.getFuture(client.set(model).toCompletableFuture())); + assertEquals(model, timing.getFuture(client.readThrough().toCompletableFuture())); + } + } + + @Test + void testReadThroughFailure() throws InterruptedException, TimeoutException, ExecutionException { + try (CachedModeledFramework client = + ModeledFramework.wrap(async, modelSpec).cached()) { + client.start(); + assertNull(timing.getFuture(client.delete().toCompletableFuture())); + complete(client.readThrough(), (d, e) -> { + if (e == null) { + fail("This test should result in a NoNodeException"); + } else { + assertEquals(KeeperException.NoNodeException.class, e.getClass()); + } + }); + } + } + private void verifyEmptyNodeDeserialization(T model, ModelSpec parentModelSpec) { // The sub-path is the ZNode that will be removed that does not contain any model data. Their should be no // attempt to deserialize this empty ZNode.