Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.curator.framework.api.transaction.CuratorOp;
Expand Down Expand Up @@ -171,12 +172,7 @@ public AsyncStage<T> read() {

@Override
public AsyncStage<T> read(Stat storingStatIn) {
return internalRead(n -> {
if (storingStatIn != null) {
DataTree.copyStat(n.stat(), storingStatIn);
}
return n.model();
});
return internalRead(n -> this.toModelWithStat(n, storingStatIn));
}

@Override
Expand All @@ -191,7 +187,7 @@ public AsyncStage<T> readThrough() {

@Override
public AsyncStage<T> readThrough(Stat storingStatIn) {
return internalRead(ZNode::model, () -> client.read(storingStatIn));
return internalRead(n -> this.toModelWithStat(n, storingStatIn), () -> client.read(storingStatIn));
}

@Override
Expand All @@ -201,7 +197,7 @@ public AsyncStage<ZNode<T>> readThroughAsZNode() {

@Override
public AsyncStage<List<T>> list() {
return internalChildren(entry -> entry.getValue().model());
return this.filteredCacheChildren(entry -> entry.getValue().model(), __ -> true);
}

@Override
Expand Down Expand Up @@ -231,12 +227,12 @@ public AsyncStage<Stat> checkExists() {

@Override
public AsyncStage<List<ZPath>> children() {
return internalChildren(Map.Entry::getKey);
return this.clientPathDirectChildren(Map.Entry::getKey);
}

@Override
public AsyncStage<List<ZNode<T>>> childrenAsZNodes() {
return internalChildren(Map.Entry::getValue);
return this.clientPathDirectChildren(Map.Entry::getValue);
}

@Override
Expand Down Expand Up @@ -279,6 +275,13 @@ public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp>
return client.inTransaction(operations);
}

private T toModelWithStat(ZNode<T> n, Stat storingStatIn) {
if (storingStatIn != null) {
DataTree.copyStat(n.stat(), storingStatIn);
}
return n.model();
}

private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver) {
return internalRead(resolver, null);
}
Expand Down Expand Up @@ -312,13 +315,18 @@ private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, Supplier<
return stage;
}

private <U> ModelStage<List<U>> internalChildren(Function<Map.Entry<ZPath, ZNode<T>>, U> resolver) {
private <U> ModelStage<List<U>> clientPathDirectChildren(Function<Map.Entry<ZPath, ZNode<T>>, U> resolver) {
return filteredCacheChildren(
resolver, e -> e.getKey().parent().equals(client.modelSpec().path()));
}

private <U> ModelStage<List<U>> filteredCacheChildren(
Function<Map.Entry<ZPath, ZNode<T>>, U> resolver, Predicate<Map.Entry<ZPath, ZNode<T>>> filter) {
ModelStage<List<U>> stage = ModelStage.make();
init.whenComplete((__, throwable) -> {
if (throwable == null) {
stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream()
.filter(e -> !e.getKey().isRoot()
&& e.getKey().parent().equals(client.modelSpec().path()))
stage.complete(cache.currentChildren().entrySet().stream()
.filter(filter)
.map(resolver)
.collect(Collectors.toList()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ public Optional<ZNode<T>> currentData(ZPath path) {
return Optional.empty();
}

ZPath basePath() {
return basePath;
}

Map<ZPath, ZNode<T>> currentChildren() {
return currentChildren(basePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ public void testChildren() {
});
});

complete(
client.child("p").childrenAsZNodes(),
(v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2)));
complete(
client.withPath(ZPath.from(path, "p")).childrenAsZNodes(),
(v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2)));
complete(
client.child("p").child("c1").childrenAsZNodes(),
(v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(grandChild1)));
Expand All @@ -157,10 +163,34 @@ public void testChildren() {

complete(
client.child("p").child("c1").list(),
(v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild1)));
(v, e) -> assertEquals(
toSet(v.stream(), Function.identity()),
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
complete(
client.child("p").child("c2").list(),
(v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild2)));
(v, e) -> assertEquals(
toSet(v.stream(), Function.identity()),
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
complete(
client.child("p").list(),
(v, e) -> assertEquals(
toSet(v.stream(), Function.identity()),
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
complete(
client.child("p").child("c2").child("g2").list(),
(v, e) -> assertEquals(
toSet(v.stream(), Function.identity()),
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
}

try (CachedModeledFramework<TestModel> client =
ModeledFramework.wrap(async, modelSpec.withPath(ZPath.root)).cached()) {
client.start();
complete(
client.list(),
(v, e) -> assertEquals(
toSet(v.stream(), Function.identity()),
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
}
}

Expand Down Expand Up @@ -244,6 +274,23 @@ void testNoNodeException() throws InterruptedException, TimeoutException {
}
}

@Test
void testRead() throws InterruptedException, TimeoutException, ExecutionException {
try (CachedModeledFramework<TestModel> client =
ModeledFramework.wrap(async, modelSpec).cached()) {
TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
client.start();
assertEquals(model, timing.getFuture(client.read().toCompletableFuture()));
assertEquals(
model,
timing.getFuture(client.readAsZNode().toCompletableFuture()).model());
Stat stat = new Stat();
assertEquals(model, timing.getFuture(client.read(stat).toCompletableFuture()));
assertTrue(stat.getDataLength() > 0);
}
}

@Test
void testReadThrough() throws InterruptedException, TimeoutException, ExecutionException {
try (CachedModeledFramework<TestModel> client =
Expand All @@ -264,6 +311,13 @@ public void initialized() {
client.start();
assertTrue(timing.acquireSemaphore(semaphore));
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
Stat stat = new Stat();
assertEquals(model, timing.getFuture(client.readThrough(stat).toCompletableFuture()));
assertTrue(stat.getDataLength() > 0);
assertEquals(
model,
timing.getFuture(client.readThroughAsZNode().toCompletableFuture())
.model());
assertEquals(model, timing.getFuture(client.readThrough().toCompletableFuture()));
}
}
Expand Down