Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
Expand All @@ -48,15 +47,40 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> {
private final ModeledFramework<T> client;
private final ModeledCacheImpl<T> cache;
private final Executor executor;
private final ModelStage<Void> init;

CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor) {
this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
}

private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor) {
this(
client,
new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor),
executor,
ModelStage.make());
listenable().addListener(new ModeledCacheListener<T>() {
@Override
public void accept(Type type, ZPath path, Stat stat, Object model) {
// NOP
}

@Override
public void initialized() {
init.complete(null);
ModeledCacheListener.super.initialized();
}

@Override
public void handleException(Exception e) {
init.completeExceptionally(e);
ModeledCacheListener.super.handleException(e);
}
});
}

private CachedModeledFrameworkImpl(
ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor, ModelStage<Void> init) {
this.client = client;
this.cache = cache;
this.executor = executor;
this.init = init;
}

@Override
Expand Down Expand Up @@ -106,7 +130,7 @@ public ModelSpec<T> modelSpec() {

@Override
public CachedModeledFramework<T> child(Object child) {
return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor);
return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor, init);
}

@Override
Expand All @@ -117,7 +141,7 @@ public ModeledFramework<T> parent() {

@Override
public CachedModeledFramework<T> withPath(ZPath path) {
return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor, init);
}

@Override
Expand All @@ -142,24 +166,22 @@ public AsyncStage<String> set(T model, int version) {

@Override
public AsyncStage<T> read() {
return internalRead(ZNode::model, this::exceptionally);
return internalRead(ZNode::model);
}

@Override
public AsyncStage<T> read(Stat storingStatIn) {
return internalRead(
n -> {
if (storingStatIn != null) {
DataTree.copyStat(n.stat(), storingStatIn);
}
return n.model();
},
this::exceptionally);
return internalRead(n -> {
if (storingStatIn != null) {
DataTree.copyStat(n.stat(), storingStatIn);
}
return n.model();
});
}

@Override
public AsyncStage<ZNode<T>> readAsZNode() {
return internalRead(Function.identity(), this::exceptionally);
return internalRead(Function.identity());
}

@Override
Expand All @@ -179,9 +201,7 @@ public AsyncStage<ZNode<T>> readThroughAsZNode() {

@Override
public AsyncStage<List<T>> list() {
List<T> children =
cache.currentChildren().values().stream().map(ZNode::model).collect(Collectors.toList());
return ModelStage.completed(children);
return internalChildren(entry -> entry.getValue().model());
}

@Override
Expand All @@ -206,28 +226,17 @@ public AsyncStage<Void> delete(int version) {

@Override
public AsyncStage<Stat> checkExists() {
ZPath path = client.modelSpec().path();
Optional<ZNode<T>> data = cache.currentData(path);
return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null));
return internalRead(ZNode::stat, () -> ModelStage.completed(null));
}

@Override
public AsyncStage<List<ZPath>> children() {
List<ZPath> paths = cache.currentChildren(client.modelSpec().path()).keySet().stream()
.filter(path -> !path.isRoot()
&& path.parent().equals(client.modelSpec().path()))
.collect(Collectors.toList());
return completed(paths);
return internalChildren(Map.Entry::getKey);
}

@Override
public AsyncStage<List<ZNode<T>>> childrenAsZNodes() {
List<ZNode<T>> nodes = cache.currentChildren(client.modelSpec().path()).entrySet().stream()
.filter(e -> !e.getKey().isRoot()
&& e.getKey().parent().equals(client.modelSpec().path()))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
return completed(nodes);
return internalChildren(Map.Entry::getValue);
}

@Override
Expand Down Expand Up @@ -270,19 +279,52 @@ public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp>
return client.inTransaction(operations);
}

private <U> AsyncStage<U> completed(U value) {
return ModelStage.completed(value);
}

private <U> AsyncStage<U> exceptionally() {
KeeperException.NoNodeException exception =
new KeeperException.NoNodeException(client.modelSpec().path().fullPath());
return ModelStage.exceptionally(exception);
}

private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, Supplier<AsyncStage<U>> elseProc) {
ZPath path = client.modelSpec().path();
Optional<ZNode<T>> data = cache.currentData(path);
return data.map(node -> completed(resolver.apply(node))).orElseGet(elseProc);
private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver) {
return internalRead(resolver, null);
}

private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, Supplier<AsyncStage<U>> defaultSupplier) {
ModelStage<U> stage = ModelStage.make();
init.whenComplete((__, throwable) -> {
if (throwable == null) {
ZPath path = client.modelSpec().path();
ZNode<T> zNode = cache.currentData(path).orElse(null);
if (zNode == null) {
if (defaultSupplier == null) {
stage.completeExceptionally(new KeeperException.NoNodeException(
client.modelSpec().path().fullPath()));
} else {
defaultSupplier.get().whenComplete((elseData, elseThrowable) -> {
if (elseThrowable == null) {
stage.complete(elseData);
} else {
stage.completeExceptionally(elseThrowable);
}
});
}
} else {
stage.complete(resolver.apply(zNode));
}
} else {
stage.completeExceptionally(throwable);
}
});
return stage;
}

private <U> ModelStage<List<U>> internalChildren(Function<Map.Entry<ZPath, ZNode<T>>, U> resolver) {
ModelStage<List<U>> stage = ModelStage.make();
init.whenComplete((__, throwable) -> {
if (throwable == null) {
stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream()
.filter(e -> !e.getKey().isRoot()
&& e.getKey().parent().equals(client.modelSpec().path()))
.map(resolver)
.collect(Collectors.toList()));
} else {
stage.completeExceptionally(throwable);
}
});
return stage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

package org.apache.curator.x.async.modeled;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.math.BigInteger;
Expand All @@ -32,7 +28,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand All @@ -41,9 +39,11 @@
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.curator.x.async.modeled.models.TestModel;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -78,6 +78,10 @@ public void testDownServer() throws IOException {
assertNotNull(value);
assertNull(e);
}));
complete(client.child(model).checkExists().whenComplete((value, e) -> {
assertNotNull(value);
assertNull(e);
}));
} finally {
client.close();
}
Expand Down Expand Up @@ -150,6 +154,13 @@ public void testChildren() {
complete(
client.child("p").child("c2").childrenAsZNodes(),
(v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(grandChild2)));

complete(
client.child("p").child("c1").list(),
(v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild1)));
complete(
client.child("p").child("c2").list(),
(v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild2)));
}
}

Expand Down Expand Up @@ -201,6 +212,78 @@ public void testEmptyNodeRawDeserialization() {
verifyEmptyNodeDeserialization(byteModel, byteModelSpec);
}

@Test
void testInitializedCachedModeledFramework() throws ExecutionException, InterruptedException, TimeoutException {
try (CachedModeledFramework<TestModel> client =
ModeledFramework.wrap(async, modelSpec).cached()) {
TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
AsyncStage<TestModel> asyncModel = client.read();
client.start();
assertEquals(model, timing.getFuture(asyncModel.toCompletableFuture()));
}
}

@Test
void testNoNodeException() throws InterruptedException, TimeoutException {
try (CachedModeledFramework<TestModel> client =
ModeledFramework.wrap(async, modelSpec).cached()) {
AsyncStage<TestModel> asyncModel = client.read();
client.start();
try {
timing.getFuture(asyncModel.toCompletableFuture());
fail("This test should result in a NoNodeException");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof KeeperException.NoNodeException);
}

complete(client.checkExists().whenComplete((value, e) -> {
assertNull(value);
assertNull(e);
}));
}
}

@Test
void testReadThrough() throws InterruptedException, TimeoutException, ExecutionException {
try (CachedModeledFramework<TestModel> client =
ModeledFramework.wrap(async, modelSpec).cached()) {
TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
Semaphore semaphore = new Semaphore(0);
client.listenable().addListener(new ModeledCacheListener<TestModel>() {
@Override
public void accept(Type type, ZPath path, Stat stat, TestModel model) {
// NOP
}

@Override
public void initialized() {
semaphore.release();
}
});
client.start();
assertTrue(timing.acquireSemaphore(semaphore));
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
assertEquals(model, timing.getFuture(client.readThrough().toCompletableFuture()));
}
}

@Test
void testReadThroughFailure() throws InterruptedException, TimeoutException, ExecutionException {
try (CachedModeledFramework<TestModel> client =
ModeledFramework.wrap(async, modelSpec).cached()) {
client.start();
assertNull(timing.getFuture(client.delete().toCompletableFuture()));
complete(client.readThrough(), (d, e) -> {
if (e == null) {
fail("This test should result in a NoNodeException");
} else {
assertEquals(KeeperException.NoNodeException.class, e.getClass());
}
});
}
}

private <T> void verifyEmptyNodeDeserialization(T model, ModelSpec<T> parentModelSpec) {
// The sub-path is the ZNode that will be removed that does not contain any model data. Their should be no
// attempt to deserialize this empty ZNode.
Expand Down