From 9891901d681f814f73342a842a5f345b0bacbdc1 Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Fri, 21 Feb 2025 16:23:47 -0500 Subject: [PATCH 1/3] implement CachedModeledFramework which waits for cache initialization --- curator-x-async/pom.xml | 6 + .../x/async/modeled/ModeledFramework.java | 7 + .../details/CachedModeledFrameworkImpl.java | 5 + .../InitializedCachedModeledFramework.java | 289 ++++++++++++++++++ .../modeled/details/ModeledFrameworkImpl.java | 5 + .../modeled/TestCachedModeledFramework.java | 80 +++-- pom.xml | 6 + 7 files changed, 373 insertions(+), 25 deletions(-) create mode 100644 curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml index 751e4fd7a..e7b9ea7e1 100644 --- a/curator-x-async/pom.xml +++ b/curator-x-async/pom.xml @@ -64,6 +64,12 @@ test + + org.junit.jupiter + junit-jupiter-params + test + + org.slf4j slf4j-log4j12 diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java index 9a5285677..c0a2bf128 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java @@ -93,6 +93,13 @@ static ModeledFrameworkBuilder builder() { */ CachedModeledFramework cached(ExecutorService executor); + /** + * Return a cached framework which waits for cache to be initialized before accessing it. + * + * @return wrapped cached framework that waits for initialization. + */ + CachedModeledFramework initialized(); + /** * Return mutator APIs that work with {@link org.apache.curator.x.async.modeled.versioned.Versioned} containers * diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index acccbfecf..72b08a8e6 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -184,6 +184,11 @@ public AsyncStage> list() { return ModelStage.completed(children); } + @Override + public CachedModeledFramework initialized() { + return new InitializedCachedModeledFramework<>(this); + } + @Override public AsyncStage update(T model) { return client.update(model); diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java new file mode 100644 index 000000000..de76da0ec --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.ZNode; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; +import org.apache.curator.x.async.modeled.cached.ModeledCache; +import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; +import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework; +import org.apache.zookeeper.data.Stat; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; + +public class InitializedCachedModeledFramework implements CachedModeledFramework { + + private final CachedModeledFramework framework; + private final ModelStage init; + + private InitializedCachedModeledFramework(CachedModeledFramework framework, ModelStage init) { + this.framework = framework; + this.init = init; + } + + InitializedCachedModeledFramework(CachedModeledFramework framework) { + this.framework = framework; + init = ModelStage.make(); + listenable().addListener(new ModeledCacheListener() { + + @Override + public void accept(Type type, ZPath path, Stat stat, Object model) { + // NOP + } + + @Override + public void initialized() { + init.complete(null); + ModeledCacheListener.super.initialized(); + } + + @Override + public void handleException(Exception e) { + init.completeExceptionally(e); + ModeledCacheListener.super.handleException(e); + } + }); + } + + @Override + public ModeledCache cache() { + return framework.cache(); + } + + @Override + public void start() { + framework.start(); + } + + @Override + public void close() { + framework.close(); + } + + @Override + public Listenable> listenable() { + return framework.listenable(); + } + + @Override + public AsyncStage>> childrenAsZNodes() { + return internalRead(framework::childrenAsZNodes); + } + + @Override + public CuratorOp createOp(T model) { + return framework.createOp(model); + } + + @Override + public CuratorOp updateOp(T model) { + return framework.updateOp(model); + } + + @Override + public CuratorOp updateOp(T model, int version) { + return framework.updateOp(model, version); + } + + @Override + public CuratorOp deleteOp() { + return framework.deleteOp(); + } + + @Override + public CuratorOp deleteOp(int version) { + return framework.deleteOp(version); + } + + @Override + public CuratorOp checkExistsOp() { + return framework.checkExistsOp(); + } + + @Override + public CuratorOp checkExistsOp(int version) { + return framework.checkExistsOp(version); + } + + @Override + public AsyncStage> inTransaction(List operations) { + return framework.inTransaction(operations); + } + + @Override + public CachedModeledFramework cached() { + throw new UnsupportedOperationException("Already a cached instance"); + } + + @Override + public CachedModeledFramework cached(ExecutorService executor) { + throw new UnsupportedOperationException("Already a cached instance"); + } + + @Override + public VersionedModeledFramework versioned() { + return new VersionedModeledFrameworkImpl<>(this); + } + + @Override + public AsyncCuratorFramework unwrap() { + return framework.unwrap(); + } + + @Override + public ModelSpec modelSpec() { + return framework.modelSpec(); + } + + @Override + public CachedModeledFramework child(Object child) { + return new InitializedCachedModeledFramework<>(framework.child(child), init); + } + + @Override + public ModeledFramework parent() { + throw new UnsupportedOperationException( + "Not supported for CachedModeledFramework. Instead, call parent() on the ModeledFramework before calling cached()"); + } + + @Override + public CachedModeledFramework withPath(ZPath path) { + return new InitializedCachedModeledFramework<>(framework.withPath(path), init); + } + + @Override + public AsyncStage set(T model) { + return framework.set(model); + } + + @Override + public AsyncStage set(T model, int version) { + return framework.set(model, version); + } + + @Override + public AsyncStage set(T model, Stat storingStatIn) { + return framework.set(model, storingStatIn); + } + + @Override + public AsyncStage set(T model, Stat storingStatIn, int version) { + return framework.set(model, storingStatIn, version); + } + + @Override + public AsyncStage read() { + return internalRead(framework::read); + } + + @Override + public AsyncStage read(Stat storingStatIn) { + return internalRead(() -> framework.read(storingStatIn)); + } + + @Override + public AsyncStage> readAsZNode() { + return internalRead(framework::readAsZNode); + } + + @Override + public AsyncStage update(T model) { + return framework.update(model); + } + + @Override + public AsyncStage update(T model, int version) { + return framework.update(model, version); + } + + @Override + public AsyncStage delete() { + return framework.delete(); + } + + @Override + public AsyncStage delete(int version) { + return framework.delete(version); + } + + @Override + public AsyncStage checkExists() { + return framework.checkExists(); + } + + @Override + public AsyncStage> children() { + return internalRead(framework::children); + } + + @Override + public AsyncStage readThrough() { + return internalRead(framework::readThrough); + } + + @Override + public AsyncStage readThrough(Stat storingStatIn) { + return internalRead(() -> framework.readThrough(storingStatIn)); + } + + @Override + public AsyncStage> readThroughAsZNode() { + return internalRead(framework::readThroughAsZNode); + } + + @Override + public AsyncStage> list() { + return internalRead(framework::list); + } + + @Override + public CachedModeledFramework initialized() { + throw new UnsupportedOperationException("Already an initialized instance"); + } + + private AsyncStage internalRead(Supplier> innerSupplier) { + ModelStage stage = ModelStage.make(); + init.whenComplete((data, throwable) -> { + if (throwable == null) { + innerSupplier.get().whenComplete((data1, throwable1) -> { + if (throwable1 == null) { + stage.complete(data1); + } else { + stage.completeExceptionally(throwable1); + } + } + ); + } else { + stage.completeExceptionally(throwable); + } + }); + return stage; + } +} diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index c02c93127..9a57f2f2f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -131,6 +131,11 @@ public CachedModeledFramework cached() { return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework")); } + @Override + public CachedModeledFramework initialized() { + return new InitializedCachedModeledFramework<>(cached()); + } + @Override public CachedModeledFramework cached(ExecutorService executor) { Preconditions.checkState( diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 65c68aba9..c82947107 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -32,7 +32,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -41,22 +43,30 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.test.Timing; import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.curator.x.async.modeled.models.TestModel; import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; @Tag(CuratorTestBase.zk35TestCompatibilityGroup) public class TestCachedModeledFramework extends TestModeledFrameworkBase { - @Test - public void testDownServer() throws IOException { + enum CachedModeledFrameworkType { + UNINITIALIZED, + INITIALIZED + } + + @ParameterizedTest + @EnumSource(CachedModeledFrameworkType.class) + void testDownServer(CachedModeledFrameworkType type) throws IOException { Timing timing = new Timing(); TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); - CachedModeledFramework client = - ModeledFramework.wrap(async, modelSpec).cached(); + CachedModeledFramework client = build(type, modelSpec); Semaphore semaphore = new Semaphore(0); client.listenable().addListener((t, p, s, m) -> semaphore.release()); @@ -83,12 +93,12 @@ public void testDownServer() throws IOException { } } - @Test - public void testPostInitializedFilter() { + @ParameterizedTest + @EnumSource(CachedModeledFrameworkType.class) + void testPostInitializedFilter(CachedModeledFrameworkType type) { TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.ONE); TestModel model2 = new TestModel("d", "e", "f", 1, BigInteger.ONE); - CachedModeledFramework client = - ModeledFramework.wrap(async, modelSpec).cached(); + CachedModeledFramework client = build(type, modelSpec); Semaphore semaphore = new Semaphore(0); ModeledCacheListener listener = (t, p, s, m) -> semaphore.release(); client.listenable().addListener(listener.postInitializedOnly()); @@ -105,16 +115,16 @@ public void testPostInitializedFilter() { } } - @Test - public void testChildren() { + @ParameterizedTest + @EnumSource(CachedModeledFrameworkType.class) + void testChildren(CachedModeledFrameworkType type) { TestModel parent = new TestModel("a", "b", "c", 20, BigInteger.ONE); TestModel child1 = new TestModel("d", "e", "f", 1, BigInteger.ONE); TestModel child2 = new TestModel("g", "h", "i", 1, BigInteger.ONE); TestModel grandChild1 = new TestModel("j", "k", "l", 10, BigInteger.ONE); TestModel grandChild2 = new TestModel("m", "n", "0", 5, BigInteger.ONE); - try (CachedModeledFramework client = - ModeledFramework.wrap(async, modelSpec).cached()) { + try (CachedModeledFramework client = build(type, modelSpec)) { CountDownLatch latch = new CountDownLatch(5); client.listenable().addListener((t, p, s, m) -> latch.countDown()); @@ -154,11 +164,11 @@ public void testChildren() { } // note: CURATOR-546 - @Test - public void testAccessCacheDirectly() { + @ParameterizedTest + @EnumSource(CachedModeledFrameworkType.class) + void testAccessCacheDirectly(CachedModeledFrameworkType type) { TestModel model = new TestModel("a", "b", "c", 20, BigInteger.ONE); - try (CachedModeledFramework client = - ModeledFramework.wrap(async, modelSpec).cached()) { + try (CachedModeledFramework client = build(type, modelSpec)) { CountDownLatch latch = new CountDownLatch(1); client.listenable().addListener((t, p, s, m) -> latch.countDown()); @@ -184,24 +194,26 @@ public void testAccessCacheDirectly() { // Verify the CachedModeledFramework does not attempt to deserialize empty ZNodes on deletion using the Jackson // model serializer. // See: CURATOR-609 - @Test - public void testEmptyNodeJacksonDeserialization() { + @ParameterizedTest + @EnumSource(CachedModeledFrameworkType.class) + void testEmptyNodeJacksonDeserialization(CachedModeledFrameworkType type) { final TestModel model = new TestModel("a", "b", "c", 20, BigInteger.ONE); - verifyEmptyNodeDeserialization(model, modelSpec); + verifyEmptyNodeDeserialization(model, modelSpec, type); } // Verify the CachedModeledFramework does not attempt to deserialize empty ZNodes on deletion using the raw // model serializer. // See: CURATOR-609 - @Test - public void testEmptyNodeRawDeserialization() { + @ParameterizedTest + @EnumSource(CachedModeledFrameworkType.class) + void testEmptyNodeRawDeserialization(CachedModeledFrameworkType type) { final byte[] byteModel = {0x01, 0x02, 0x03}; final ModelSpec byteModelSpec = ModelSpec.builder(path, ModelSerializer.raw).build(); - verifyEmptyNodeDeserialization(byteModel, byteModelSpec); + verifyEmptyNodeDeserialization(byteModel, byteModelSpec, type); } - private void verifyEmptyNodeDeserialization(T model, ModelSpec parentModelSpec) { + private void verifyEmptyNodeDeserialization(T model, ModelSpec parentModelSpec, CachedModeledFrameworkType type) { // The sub-path is the ZNode that will be removed that does not contain any model data. Their should be no // attempt to deserialize this empty ZNode. final String subPath = parentModelSpec.path().toString() + "/sub"; @@ -236,8 +248,7 @@ public void handleException(Exception e) { final ModelSerializer serializer = parentModelSpec.serializer(); // Create a cache client to watch the parent path. - try (CachedModeledFramework cacheClient = - ModeledFramework.wrap(async, parentModelSpec).cached()) { + try (CachedModeledFramework cacheClient = build(type, parentModelSpec)) { cacheClient.listenable().addListener(listener); ModelSpec testModelSpec = @@ -268,7 +279,26 @@ public void handleException(Exception e) { } } + @Test + void testInitializedCachedModeledFramework() throws ExecutionException, InterruptedException, TimeoutException { + try (CachedModeledFramework client = build(CachedModeledFrameworkType.INITIALIZED, modelSpec)) { + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + client.set(model); + AsyncStage asyncModel = client.read(); + client.start(); + assertEquals(model, timing.getFuture(asyncModel.toCompletableFuture())); + } + } + private Set toSet(Stream stream, Function mapper) { return stream.map(mapper).collect(Collectors.toSet()); } + + private CachedModeledFramework build(CachedModeledFrameworkType type, ModelSpec modelSpec) { + if (type == CachedModeledFrameworkType.INITIALIZED) { + return ModeledFramework.wrap(async, modelSpec).initialized(); + } else { + return ModeledFramework.wrap(async, modelSpec).cached(); + } + } } diff --git a/pom.xml b/pom.xml index d83d2f1ce..a1757d572 100644 --- a/pom.xml +++ b/pom.xml @@ -591,6 +591,12 @@ ${junit-version} + + org.junit.jupiter + junit-jupiter-params + ${junit-version} + + org.junit.jupiter junit-jupiter-engine From 01f46b75eb64b25ea8a40e52948cc7f3513acce4 Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Thu, 27 Feb 2025 11:52:00 -0500 Subject: [PATCH 2/3] apply spotless --- .../InitializedCachedModeledFramework.java | 496 +++++++++--------- .../modeled/TestCachedModeledFramework.java | 3 +- 2 files changed, 249 insertions(+), 250 deletions(-) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java index de76da0ec..8c9efe9ac 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java @@ -19,6 +19,9 @@ package org.apache.curator.x.async.modeled.details; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.framework.listen.Listenable; @@ -34,256 +37,251 @@ import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework; import org.apache.zookeeper.data.Stat; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.function.Supplier; - public class InitializedCachedModeledFramework implements CachedModeledFramework { - private final CachedModeledFramework framework; - private final ModelStage init; - - private InitializedCachedModeledFramework(CachedModeledFramework framework, ModelStage init) { - this.framework = framework; - this.init = init; - } - - InitializedCachedModeledFramework(CachedModeledFramework framework) { - this.framework = framework; - init = ModelStage.make(); - listenable().addListener(new ModeledCacheListener() { - - @Override - public void accept(Type type, ZPath path, Stat stat, Object model) { - // NOP - } - - @Override - public void initialized() { - init.complete(null); - ModeledCacheListener.super.initialized(); - } - - @Override - public void handleException(Exception e) { - init.completeExceptionally(e); - ModeledCacheListener.super.handleException(e); - } - }); - } - - @Override - public ModeledCache cache() { - return framework.cache(); - } - - @Override - public void start() { - framework.start(); - } - - @Override - public void close() { - framework.close(); - } - - @Override - public Listenable> listenable() { - return framework.listenable(); - } - - @Override - public AsyncStage>> childrenAsZNodes() { - return internalRead(framework::childrenAsZNodes); - } - - @Override - public CuratorOp createOp(T model) { - return framework.createOp(model); - } - - @Override - public CuratorOp updateOp(T model) { - return framework.updateOp(model); - } - - @Override - public CuratorOp updateOp(T model, int version) { - return framework.updateOp(model, version); - } - - @Override - public CuratorOp deleteOp() { - return framework.deleteOp(); - } - - @Override - public CuratorOp deleteOp(int version) { - return framework.deleteOp(version); - } - - @Override - public CuratorOp checkExistsOp() { - return framework.checkExistsOp(); - } - - @Override - public CuratorOp checkExistsOp(int version) { - return framework.checkExistsOp(version); - } - - @Override - public AsyncStage> inTransaction(List operations) { - return framework.inTransaction(operations); - } - - @Override - public CachedModeledFramework cached() { - throw new UnsupportedOperationException("Already a cached instance"); - } - - @Override - public CachedModeledFramework cached(ExecutorService executor) { - throw new UnsupportedOperationException("Already a cached instance"); - } - - @Override - public VersionedModeledFramework versioned() { - return new VersionedModeledFrameworkImpl<>(this); - } - - @Override - public AsyncCuratorFramework unwrap() { - return framework.unwrap(); - } - - @Override - public ModelSpec modelSpec() { - return framework.modelSpec(); - } - - @Override - public CachedModeledFramework child(Object child) { - return new InitializedCachedModeledFramework<>(framework.child(child), init); - } - - @Override - public ModeledFramework parent() { - throw new UnsupportedOperationException( - "Not supported for CachedModeledFramework. Instead, call parent() on the ModeledFramework before calling cached()"); - } - - @Override - public CachedModeledFramework withPath(ZPath path) { - return new InitializedCachedModeledFramework<>(framework.withPath(path), init); - } - - @Override - public AsyncStage set(T model) { - return framework.set(model); - } - - @Override - public AsyncStage set(T model, int version) { - return framework.set(model, version); - } - - @Override - public AsyncStage set(T model, Stat storingStatIn) { - return framework.set(model, storingStatIn); - } - - @Override - public AsyncStage set(T model, Stat storingStatIn, int version) { - return framework.set(model, storingStatIn, version); - } - - @Override - public AsyncStage read() { - return internalRead(framework::read); - } - - @Override - public AsyncStage read(Stat storingStatIn) { - return internalRead(() -> framework.read(storingStatIn)); - } - - @Override - public AsyncStage> readAsZNode() { - return internalRead(framework::readAsZNode); - } - - @Override - public AsyncStage update(T model) { - return framework.update(model); - } - - @Override - public AsyncStage update(T model, int version) { - return framework.update(model, version); - } - - @Override - public AsyncStage delete() { - return framework.delete(); - } - - @Override - public AsyncStage delete(int version) { - return framework.delete(version); - } - - @Override - public AsyncStage checkExists() { - return framework.checkExists(); - } - - @Override - public AsyncStage> children() { - return internalRead(framework::children); - } - - @Override - public AsyncStage readThrough() { - return internalRead(framework::readThrough); - } - - @Override - public AsyncStage readThrough(Stat storingStatIn) { - return internalRead(() -> framework.readThrough(storingStatIn)); - } - - @Override - public AsyncStage> readThroughAsZNode() { - return internalRead(framework::readThroughAsZNode); - } - - @Override - public AsyncStage> list() { - return internalRead(framework::list); - } - - @Override - public CachedModeledFramework initialized() { - throw new UnsupportedOperationException("Already an initialized instance"); - } - - private AsyncStage internalRead(Supplier> innerSupplier) { - ModelStage stage = ModelStage.make(); - init.whenComplete((data, throwable) -> { - if (throwable == null) { - innerSupplier.get().whenComplete((data1, throwable1) -> { - if (throwable1 == null) { - stage.complete(data1); - } else { - stage.completeExceptionally(throwable1); - } - } - ); - } else { - stage.completeExceptionally(throwable); - } + private final CachedModeledFramework framework; + private final ModelStage init; + + private InitializedCachedModeledFramework(CachedModeledFramework framework, ModelStage init) { + this.framework = framework; + this.init = init; + } + + InitializedCachedModeledFramework(CachedModeledFramework framework) { + this.framework = framework; + init = ModelStage.make(); + listenable().addListener(new ModeledCacheListener() { + + @Override + public void accept(Type type, ZPath path, Stat stat, Object model) { + // NOP + } + + @Override + public void initialized() { + init.complete(null); + ModeledCacheListener.super.initialized(); + } + + @Override + public void handleException(Exception e) { + init.completeExceptionally(e); + ModeledCacheListener.super.handleException(e); + } + }); + } + + @Override + public ModeledCache cache() { + return framework.cache(); + } + + @Override + public void start() { + framework.start(); + } + + @Override + public void close() { + framework.close(); + } + + @Override + public Listenable> listenable() { + return framework.listenable(); + } + + @Override + public AsyncStage>> childrenAsZNodes() { + return internalRead(framework::childrenAsZNodes); + } + + @Override + public CuratorOp createOp(T model) { + return framework.createOp(model); + } + + @Override + public CuratorOp updateOp(T model) { + return framework.updateOp(model); + } + + @Override + public CuratorOp updateOp(T model, int version) { + return framework.updateOp(model, version); + } + + @Override + public CuratorOp deleteOp() { + return framework.deleteOp(); + } + + @Override + public CuratorOp deleteOp(int version) { + return framework.deleteOp(version); + } + + @Override + public CuratorOp checkExistsOp() { + return framework.checkExistsOp(); + } + + @Override + public CuratorOp checkExistsOp(int version) { + return framework.checkExistsOp(version); + } + + @Override + public AsyncStage> inTransaction(List operations) { + return framework.inTransaction(operations); + } + + @Override + public CachedModeledFramework cached() { + throw new UnsupportedOperationException("Already a cached instance"); + } + + @Override + public CachedModeledFramework cached(ExecutorService executor) { + throw new UnsupportedOperationException("Already a cached instance"); + } + + @Override + public VersionedModeledFramework versioned() { + return new VersionedModeledFrameworkImpl<>(this); + } + + @Override + public AsyncCuratorFramework unwrap() { + return framework.unwrap(); + } + + @Override + public ModelSpec modelSpec() { + return framework.modelSpec(); + } + + @Override + public CachedModeledFramework child(Object child) { + return new InitializedCachedModeledFramework<>(framework.child(child), init); + } + + @Override + public ModeledFramework parent() { + throw new UnsupportedOperationException( + "Not supported for CachedModeledFramework. Instead, call parent() on the ModeledFramework before calling cached()"); + } + + @Override + public CachedModeledFramework withPath(ZPath path) { + return new InitializedCachedModeledFramework<>(framework.withPath(path), init); + } + + @Override + public AsyncStage set(T model) { + return framework.set(model); + } + + @Override + public AsyncStage set(T model, int version) { + return framework.set(model, version); + } + + @Override + public AsyncStage set(T model, Stat storingStatIn) { + return framework.set(model, storingStatIn); + } + + @Override + public AsyncStage set(T model, Stat storingStatIn, int version) { + return framework.set(model, storingStatIn, version); + } + + @Override + public AsyncStage read() { + return internalRead(framework::read); + } + + @Override + public AsyncStage read(Stat storingStatIn) { + return internalRead(() -> framework.read(storingStatIn)); + } + + @Override + public AsyncStage> readAsZNode() { + return internalRead(framework::readAsZNode); + } + + @Override + public AsyncStage update(T model) { + return framework.update(model); + } + + @Override + public AsyncStage update(T model, int version) { + return framework.update(model, version); + } + + @Override + public AsyncStage delete() { + return framework.delete(); + } + + @Override + public AsyncStage delete(int version) { + return framework.delete(version); + } + + @Override + public AsyncStage checkExists() { + return framework.checkExists(); + } + + @Override + public AsyncStage> children() { + return internalRead(framework::children); + } + + @Override + public AsyncStage readThrough() { + return internalRead(framework::readThrough); + } + + @Override + public AsyncStage readThrough(Stat storingStatIn) { + return internalRead(() -> framework.readThrough(storingStatIn)); + } + + @Override + public AsyncStage> readThroughAsZNode() { + return internalRead(framework::readThroughAsZNode); + } + + @Override + public AsyncStage> list() { + return internalRead(framework::list); + } + + @Override + public CachedModeledFramework initialized() { + throw new UnsupportedOperationException("Already an initialized instance"); + } + + private AsyncStage internalRead(Supplier> innerSupplier) { + ModelStage stage = ModelStage.make(); + init.whenComplete((data, throwable) -> { + if (throwable == null) { + innerSupplier.get().whenComplete((data1, throwable1) -> { + if (throwable1 == null) { + stage.complete(data1); + } else { + stage.completeExceptionally(throwable1); + } + }); + } else { + stage.completeExceptionally(throwable); + } }); - return stage; - } + return stage; + } } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index c82947107..160b75f0b 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -213,7 +213,8 @@ void testEmptyNodeRawDeserialization(CachedModeledFrameworkType type) { verifyEmptyNodeDeserialization(byteModel, byteModelSpec, type); } - private void verifyEmptyNodeDeserialization(T model, ModelSpec parentModelSpec, CachedModeledFrameworkType type) { + private void verifyEmptyNodeDeserialization( + T model, ModelSpec parentModelSpec, CachedModeledFrameworkType type) { // The sub-path is the ZNode that will be removed that does not contain any model data. Their should be no // attempt to deserialize this empty ZNode. final String subPath = parentModelSpec.path().toString() + "/sub"; From 5fd441ba60b93381ce24299a19a0c44134b98f33 Mon Sep 17 00:00:00 2001 From: lkotzaniewsk Date: Thu, 27 Feb 2025 16:38:25 -0500 Subject: [PATCH 3/3] framework -> client --- .../InitializedCachedModeledFramework.java | 78 +++++++++---------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java index 8c9efe9ac..144476e1a 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java @@ -39,16 +39,16 @@ public class InitializedCachedModeledFramework implements CachedModeledFramework { - private final CachedModeledFramework framework; + private final CachedModeledFramework client; private final ModelStage init; - private InitializedCachedModeledFramework(CachedModeledFramework framework, ModelStage init) { - this.framework = framework; + private InitializedCachedModeledFramework(CachedModeledFramework client, ModelStage init) { + this.client = client; this.init = init; } - InitializedCachedModeledFramework(CachedModeledFramework framework) { - this.framework = framework; + InitializedCachedModeledFramework(CachedModeledFramework client) { + this.client = client; init = ModelStage.make(); listenable().addListener(new ModeledCacheListener() { @@ -73,67 +73,67 @@ public void handleException(Exception e) { @Override public ModeledCache cache() { - return framework.cache(); + return client.cache(); } @Override public void start() { - framework.start(); + client.start(); } @Override public void close() { - framework.close(); + client.close(); } @Override public Listenable> listenable() { - return framework.listenable(); + return client.listenable(); } @Override public AsyncStage>> childrenAsZNodes() { - return internalRead(framework::childrenAsZNodes); + return internalRead(client::childrenAsZNodes); } @Override public CuratorOp createOp(T model) { - return framework.createOp(model); + return client.createOp(model); } @Override public CuratorOp updateOp(T model) { - return framework.updateOp(model); + return client.updateOp(model); } @Override public CuratorOp updateOp(T model, int version) { - return framework.updateOp(model, version); + return client.updateOp(model, version); } @Override public CuratorOp deleteOp() { - return framework.deleteOp(); + return client.deleteOp(); } @Override public CuratorOp deleteOp(int version) { - return framework.deleteOp(version); + return client.deleteOp(version); } @Override public CuratorOp checkExistsOp() { - return framework.checkExistsOp(); + return client.checkExistsOp(); } @Override public CuratorOp checkExistsOp(int version) { - return framework.checkExistsOp(version); + return client.checkExistsOp(version); } @Override public AsyncStage> inTransaction(List operations) { - return framework.inTransaction(operations); + return client.inTransaction(operations); } @Override @@ -153,17 +153,17 @@ public VersionedModeledFramework versioned() { @Override public AsyncCuratorFramework unwrap() { - return framework.unwrap(); + return client.unwrap(); } @Override public ModelSpec modelSpec() { - return framework.modelSpec(); + return client.modelSpec(); } @Override public CachedModeledFramework child(Object child) { - return new InitializedCachedModeledFramework<>(framework.child(child), init); + return new InitializedCachedModeledFramework<>(client.child(child), init); } @Override @@ -174,92 +174,92 @@ public ModeledFramework parent() { @Override public CachedModeledFramework withPath(ZPath path) { - return new InitializedCachedModeledFramework<>(framework.withPath(path), init); + return new InitializedCachedModeledFramework<>(client.withPath(path), init); } @Override public AsyncStage set(T model) { - return framework.set(model); + return client.set(model); } @Override public AsyncStage set(T model, int version) { - return framework.set(model, version); + return client.set(model, version); } @Override public AsyncStage set(T model, Stat storingStatIn) { - return framework.set(model, storingStatIn); + return client.set(model, storingStatIn); } @Override public AsyncStage set(T model, Stat storingStatIn, int version) { - return framework.set(model, storingStatIn, version); + return client.set(model, storingStatIn, version); } @Override public AsyncStage read() { - return internalRead(framework::read); + return internalRead(client::read); } @Override public AsyncStage read(Stat storingStatIn) { - return internalRead(() -> framework.read(storingStatIn)); + return internalRead(() -> client.read(storingStatIn)); } @Override public AsyncStage> readAsZNode() { - return internalRead(framework::readAsZNode); + return internalRead(client::readAsZNode); } @Override public AsyncStage update(T model) { - return framework.update(model); + return client.update(model); } @Override public AsyncStage update(T model, int version) { - return framework.update(model, version); + return client.update(model, version); } @Override public AsyncStage delete() { - return framework.delete(); + return client.delete(); } @Override public AsyncStage delete(int version) { - return framework.delete(version); + return client.delete(version); } @Override public AsyncStage checkExists() { - return framework.checkExists(); + return client.checkExists(); } @Override public AsyncStage> children() { - return internalRead(framework::children); + return internalRead(client::children); } @Override public AsyncStage readThrough() { - return internalRead(framework::readThrough); + return internalRead(client::readThrough); } @Override public AsyncStage readThrough(Stat storingStatIn) { - return internalRead(() -> framework.readThrough(storingStatIn)); + return internalRead(() -> client.readThrough(storingStatIn)); } @Override public AsyncStage> readThroughAsZNode() { - return internalRead(framework::readThroughAsZNode); + return internalRead(client::readThroughAsZNode); } @Override public AsyncStage> list() { - return internalRead(framework::list); + return internalRead(client::list); } @Override