diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml
index 751e4fd7a..e7b9ea7e1 100644
--- a/curator-x-async/pom.xml
+++ b/curator-x-async/pom.xml
@@ -64,6 +64,12 @@
test
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
org.slf4j
slf4j-log4j12
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
index 9a5285677..c0a2bf128 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
@@ -93,6 +93,13 @@ static ModeledFrameworkBuilder builder() {
*/
CachedModeledFramework cached(ExecutorService executor);
+ /**
+ * Return a cached framework which waits for cache to be initialized before accessing it.
+ *
+ * @return wrapped cached framework that waits for initialization.
+ */
+ CachedModeledFramework initialized();
+
/**
* Return mutator APIs that work with {@link org.apache.curator.x.async.modeled.versioned.Versioned} containers
*
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index acccbfecf..72b08a8e6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -184,6 +184,11 @@ public AsyncStage> list() {
return ModelStage.completed(children);
}
+ @Override
+ public CachedModeledFramework initialized() {
+ return new InitializedCachedModeledFramework<>(this);
+ }
+
@Override
public AsyncStage update(T model) {
return client.update(model);
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java
new file mode 100644
index 000000000..144476e1a
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/InitializedCachedModeledFramework.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.async.modeled.details;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
+import org.apache.curator.x.async.modeled.cached.ModeledCache;
+import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
+import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.data.Stat;
+
+public class InitializedCachedModeledFramework implements CachedModeledFramework {
+
+ private final CachedModeledFramework client;
+ private final ModelStage init;
+
+ private InitializedCachedModeledFramework(CachedModeledFramework client, ModelStage init) {
+ this.client = client;
+ this.init = init;
+ }
+
+ InitializedCachedModeledFramework(CachedModeledFramework client) {
+ this.client = client;
+ init = ModelStage.make();
+ listenable().addListener(new ModeledCacheListener() {
+
+ @Override
+ public void accept(Type type, ZPath path, Stat stat, Object model) {
+ // NOP
+ }
+
+ @Override
+ public void initialized() {
+ init.complete(null);
+ ModeledCacheListener.super.initialized();
+ }
+
+ @Override
+ public void handleException(Exception e) {
+ init.completeExceptionally(e);
+ ModeledCacheListener.super.handleException(e);
+ }
+ });
+ }
+
+ @Override
+ public ModeledCache cache() {
+ return client.cache();
+ }
+
+ @Override
+ public void start() {
+ client.start();
+ }
+
+ @Override
+ public void close() {
+ client.close();
+ }
+
+ @Override
+ public Listenable> listenable() {
+ return client.listenable();
+ }
+
+ @Override
+ public AsyncStage>> childrenAsZNodes() {
+ return internalRead(client::childrenAsZNodes);
+ }
+
+ @Override
+ public CuratorOp createOp(T model) {
+ return client.createOp(model);
+ }
+
+ @Override
+ public CuratorOp updateOp(T model) {
+ return client.updateOp(model);
+ }
+
+ @Override
+ public CuratorOp updateOp(T model, int version) {
+ return client.updateOp(model, version);
+ }
+
+ @Override
+ public CuratorOp deleteOp() {
+ return client.deleteOp();
+ }
+
+ @Override
+ public CuratorOp deleteOp(int version) {
+ return client.deleteOp(version);
+ }
+
+ @Override
+ public CuratorOp checkExistsOp() {
+ return client.checkExistsOp();
+ }
+
+ @Override
+ public CuratorOp checkExistsOp(int version) {
+ return client.checkExistsOp(version);
+ }
+
+ @Override
+ public AsyncStage> inTransaction(List operations) {
+ return client.inTransaction(operations);
+ }
+
+ @Override
+ public CachedModeledFramework cached() {
+ throw new UnsupportedOperationException("Already a cached instance");
+ }
+
+ @Override
+ public CachedModeledFramework cached(ExecutorService executor) {
+ throw new UnsupportedOperationException("Already a cached instance");
+ }
+
+ @Override
+ public VersionedModeledFramework versioned() {
+ return new VersionedModeledFrameworkImpl<>(this);
+ }
+
+ @Override
+ public AsyncCuratorFramework unwrap() {
+ return client.unwrap();
+ }
+
+ @Override
+ public ModelSpec modelSpec() {
+ return client.modelSpec();
+ }
+
+ @Override
+ public CachedModeledFramework child(Object child) {
+ return new InitializedCachedModeledFramework<>(client.child(child), init);
+ }
+
+ @Override
+ public ModeledFramework parent() {
+ throw new UnsupportedOperationException(
+ "Not supported for CachedModeledFramework. Instead, call parent() on the ModeledFramework before calling cached()");
+ }
+
+ @Override
+ public CachedModeledFramework withPath(ZPath path) {
+ return new InitializedCachedModeledFramework<>(client.withPath(path), init);
+ }
+
+ @Override
+ public AsyncStage set(T model) {
+ return client.set(model);
+ }
+
+ @Override
+ public AsyncStage set(T model, int version) {
+ return client.set(model, version);
+ }
+
+ @Override
+ public AsyncStage set(T model, Stat storingStatIn) {
+ return client.set(model, storingStatIn);
+ }
+
+ @Override
+ public AsyncStage set(T model, Stat storingStatIn, int version) {
+ return client.set(model, storingStatIn, version);
+ }
+
+ @Override
+ public AsyncStage read() {
+ return internalRead(client::read);
+ }
+
+ @Override
+ public AsyncStage read(Stat storingStatIn) {
+ return internalRead(() -> client.read(storingStatIn));
+ }
+
+ @Override
+ public AsyncStage> readAsZNode() {
+ return internalRead(client::readAsZNode);
+ }
+
+ @Override
+ public AsyncStage update(T model) {
+ return client.update(model);
+ }
+
+ @Override
+ public AsyncStage update(T model, int version) {
+ return client.update(model, version);
+ }
+
+ @Override
+ public AsyncStage delete() {
+ return client.delete();
+ }
+
+ @Override
+ public AsyncStage delete(int version) {
+ return client.delete(version);
+ }
+
+ @Override
+ public AsyncStage checkExists() {
+ return client.checkExists();
+ }
+
+ @Override
+ public AsyncStage> children() {
+ return internalRead(client::children);
+ }
+
+ @Override
+ public AsyncStage readThrough() {
+ return internalRead(client::readThrough);
+ }
+
+ @Override
+ public AsyncStage readThrough(Stat storingStatIn) {
+ return internalRead(() -> client.readThrough(storingStatIn));
+ }
+
+ @Override
+ public AsyncStage> readThroughAsZNode() {
+ return internalRead(client::readThroughAsZNode);
+ }
+
+ @Override
+ public AsyncStage> list() {
+ return internalRead(client::list);
+ }
+
+ @Override
+ public CachedModeledFramework initialized() {
+ throw new UnsupportedOperationException("Already an initialized instance");
+ }
+
+ private AsyncStage internalRead(Supplier> innerSupplier) {
+ ModelStage stage = ModelStage.make();
+ init.whenComplete((data, throwable) -> {
+ if (throwable == null) {
+ innerSupplier.get().whenComplete((data1, throwable1) -> {
+ if (throwable1 == null) {
+ stage.complete(data1);
+ } else {
+ stage.completeExceptionally(throwable1);
+ }
+ });
+ } else {
+ stage.completeExceptionally(throwable);
+ }
+ });
+ return stage;
+ }
+}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index c02c93127..9a57f2f2f 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -131,6 +131,11 @@ public CachedModeledFramework cached() {
return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
}
+ @Override
+ public CachedModeledFramework initialized() {
+ return new InitializedCachedModeledFramework<>(cached());
+ }
+
@Override
public CachedModeledFramework cached(ExecutorService executor) {
Preconditions.checkState(
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 65c68aba9..160b75f0b 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -32,7 +32,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -41,22 +43,30 @@
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.curator.x.async.modeled.models.TestModel;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
@Tag(CuratorTestBase.zk35TestCompatibilityGroup)
public class TestCachedModeledFramework extends TestModeledFrameworkBase {
- @Test
- public void testDownServer() throws IOException {
+ enum CachedModeledFrameworkType {
+ UNINITIALIZED,
+ INITIALIZED
+ }
+
+ @ParameterizedTest
+ @EnumSource(CachedModeledFrameworkType.class)
+ void testDownServer(CachedModeledFrameworkType type) throws IOException {
Timing timing = new Timing();
TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
- CachedModeledFramework client =
- ModeledFramework.wrap(async, modelSpec).cached();
+ CachedModeledFramework client = build(type, modelSpec);
Semaphore semaphore = new Semaphore(0);
client.listenable().addListener((t, p, s, m) -> semaphore.release());
@@ -83,12 +93,12 @@ public void testDownServer() throws IOException {
}
}
- @Test
- public void testPostInitializedFilter() {
+ @ParameterizedTest
+ @EnumSource(CachedModeledFrameworkType.class)
+ void testPostInitializedFilter(CachedModeledFrameworkType type) {
TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.ONE);
TestModel model2 = new TestModel("d", "e", "f", 1, BigInteger.ONE);
- CachedModeledFramework client =
- ModeledFramework.wrap(async, modelSpec).cached();
+ CachedModeledFramework client = build(type, modelSpec);
Semaphore semaphore = new Semaphore(0);
ModeledCacheListener listener = (t, p, s, m) -> semaphore.release();
client.listenable().addListener(listener.postInitializedOnly());
@@ -105,16 +115,16 @@ public void testPostInitializedFilter() {
}
}
- @Test
- public void testChildren() {
+ @ParameterizedTest
+ @EnumSource(CachedModeledFrameworkType.class)
+ void testChildren(CachedModeledFrameworkType type) {
TestModel parent = new TestModel("a", "b", "c", 20, BigInteger.ONE);
TestModel child1 = new TestModel("d", "e", "f", 1, BigInteger.ONE);
TestModel child2 = new TestModel("g", "h", "i", 1, BigInteger.ONE);
TestModel grandChild1 = new TestModel("j", "k", "l", 10, BigInteger.ONE);
TestModel grandChild2 = new TestModel("m", "n", "0", 5, BigInteger.ONE);
- try (CachedModeledFramework client =
- ModeledFramework.wrap(async, modelSpec).cached()) {
+ try (CachedModeledFramework client = build(type, modelSpec)) {
CountDownLatch latch = new CountDownLatch(5);
client.listenable().addListener((t, p, s, m) -> latch.countDown());
@@ -154,11 +164,11 @@ public void testChildren() {
}
// note: CURATOR-546
- @Test
- public void testAccessCacheDirectly() {
+ @ParameterizedTest
+ @EnumSource(CachedModeledFrameworkType.class)
+ void testAccessCacheDirectly(CachedModeledFrameworkType type) {
TestModel model = new TestModel("a", "b", "c", 20, BigInteger.ONE);
- try (CachedModeledFramework client =
- ModeledFramework.wrap(async, modelSpec).cached()) {
+ try (CachedModeledFramework client = build(type, modelSpec)) {
CountDownLatch latch = new CountDownLatch(1);
client.listenable().addListener((t, p, s, m) -> latch.countDown());
@@ -184,24 +194,27 @@ public void testAccessCacheDirectly() {
// Verify the CachedModeledFramework does not attempt to deserialize empty ZNodes on deletion using the Jackson
// model serializer.
// See: CURATOR-609
- @Test
- public void testEmptyNodeJacksonDeserialization() {
+ @ParameterizedTest
+ @EnumSource(CachedModeledFrameworkType.class)
+ void testEmptyNodeJacksonDeserialization(CachedModeledFrameworkType type) {
final TestModel model = new TestModel("a", "b", "c", 20, BigInteger.ONE);
- verifyEmptyNodeDeserialization(model, modelSpec);
+ verifyEmptyNodeDeserialization(model, modelSpec, type);
}
// Verify the CachedModeledFramework does not attempt to deserialize empty ZNodes on deletion using the raw
// model serializer.
// See: CURATOR-609
- @Test
- public void testEmptyNodeRawDeserialization() {
+ @ParameterizedTest
+ @EnumSource(CachedModeledFrameworkType.class)
+ void testEmptyNodeRawDeserialization(CachedModeledFrameworkType type) {
final byte[] byteModel = {0x01, 0x02, 0x03};
final ModelSpec byteModelSpec =
ModelSpec.builder(path, ModelSerializer.raw).build();
- verifyEmptyNodeDeserialization(byteModel, byteModelSpec);
+ verifyEmptyNodeDeserialization(byteModel, byteModelSpec, type);
}
- private void verifyEmptyNodeDeserialization(T model, ModelSpec parentModelSpec) {
+ private void verifyEmptyNodeDeserialization(
+ T model, ModelSpec parentModelSpec, CachedModeledFrameworkType type) {
// The sub-path is the ZNode that will be removed that does not contain any model data. Their should be no
// attempt to deserialize this empty ZNode.
final String subPath = parentModelSpec.path().toString() + "/sub";
@@ -236,8 +249,7 @@ public void handleException(Exception e) {
final ModelSerializer serializer = parentModelSpec.serializer();
// Create a cache client to watch the parent path.
- try (CachedModeledFramework cacheClient =
- ModeledFramework.wrap(async, parentModelSpec).cached()) {
+ try (CachedModeledFramework cacheClient = build(type, parentModelSpec)) {
cacheClient.listenable().addListener(listener);
ModelSpec testModelSpec =
@@ -268,7 +280,26 @@ public void handleException(Exception e) {
}
}
+ @Test
+ void testInitializedCachedModeledFramework() throws ExecutionException, InterruptedException, TimeoutException {
+ try (CachedModeledFramework client = build(CachedModeledFrameworkType.INITIALIZED, modelSpec)) {
+ TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+ client.set(model);
+ AsyncStage asyncModel = client.read();
+ client.start();
+ assertEquals(model, timing.getFuture(asyncModel.toCompletableFuture()));
+ }
+ }
+
private Set toSet(Stream stream, Function super T, ? extends R> mapper) {
return stream.map(mapper).collect(Collectors.toSet());
}
+
+ private CachedModeledFramework build(CachedModeledFrameworkType type, ModelSpec modelSpec) {
+ if (type == CachedModeledFrameworkType.INITIALIZED) {
+ return ModeledFramework.wrap(async, modelSpec).initialized();
+ } else {
+ return ModeledFramework.wrap(async, modelSpec).cached();
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index d83d2f1ce..a1757d572 100644
--- a/pom.xml
+++ b/pom.xml
@@ -591,6 +591,12 @@
${junit-version}
+
+ org.junit.jupiter
+ junit-jupiter-params
+ ${junit-version}
+
+
org.junit.jupiter
junit-jupiter-engine