Skip to content

Commit e4a4756

Browse files
authored
GH 1256. Return all descendants of basePath for CachedModeledFramework::list (#1257)
#1250 introduced a bug where CachedModeledFramework::list is no longer returning all descendants of the basePath of the underlying cache and is instead returning the direct children of the current path of the client.
1 parent 190cd65 commit e4a4756

3 files changed

Lines changed: 78 additions & 20 deletions

File tree

curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.Executor;
2525
import java.util.concurrent.ExecutorService;
2626
import java.util.function.Function;
27+
import java.util.function.Predicate;
2728
import java.util.function.Supplier;
2829
import java.util.stream.Collectors;
2930
import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -171,12 +172,7 @@ public AsyncStage<T> read() {
171172

172173
@Override
173174
public AsyncStage<T> read(Stat storingStatIn) {
174-
return internalRead(n -> {
175-
if (storingStatIn != null) {
176-
DataTree.copyStat(n.stat(), storingStatIn);
177-
}
178-
return n.model();
179-
});
175+
return internalRead(n -> this.toModelWithStat(n, storingStatIn));
180176
}
181177

182178
@Override
@@ -191,7 +187,7 @@ public AsyncStage<T> readThrough() {
191187

192188
@Override
193189
public AsyncStage<T> readThrough(Stat storingStatIn) {
194-
return internalRead(ZNode::model, () -> client.read(storingStatIn));
190+
return internalRead(n -> this.toModelWithStat(n, storingStatIn), () -> client.read(storingStatIn));
195191
}
196192

197193
@Override
@@ -201,7 +197,7 @@ public AsyncStage<ZNode<T>> readThroughAsZNode() {
201197

202198
@Override
203199
public AsyncStage<List<T>> list() {
204-
return internalChildren(entry -> entry.getValue().model());
200+
return this.filteredCacheChildren(entry -> entry.getValue().model(), __ -> true);
205201
}
206202

207203
@Override
@@ -231,12 +227,12 @@ public AsyncStage<Stat> checkExists() {
231227

232228
@Override
233229
public AsyncStage<List<ZPath>> children() {
234-
return internalChildren(Map.Entry::getKey);
230+
return this.clientPathDirectChildren(Map.Entry::getKey);
235231
}
236232

237233
@Override
238234
public AsyncStage<List<ZNode<T>>> childrenAsZNodes() {
239-
return internalChildren(Map.Entry::getValue);
235+
return this.clientPathDirectChildren(Map.Entry::getValue);
240236
}
241237

242238
@Override
@@ -279,6 +275,13 @@ public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp>
279275
return client.inTransaction(operations);
280276
}
281277

278+
private T toModelWithStat(ZNode<T> n, Stat storingStatIn) {
279+
if (storingStatIn != null) {
280+
DataTree.copyStat(n.stat(), storingStatIn);
281+
}
282+
return n.model();
283+
}
284+
282285
private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver) {
283286
return internalRead(resolver, null);
284287
}
@@ -312,13 +315,18 @@ private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, Supplier<
312315
return stage;
313316
}
314317

315-
private <U> ModelStage<List<U>> internalChildren(Function<Map.Entry<ZPath, ZNode<T>>, U> resolver) {
318+
private <U> ModelStage<List<U>> clientPathDirectChildren(Function<Map.Entry<ZPath, ZNode<T>>, U> resolver) {
319+
return filteredCacheChildren(
320+
resolver, e -> e.getKey().parent().equals(client.modelSpec().path()));
321+
}
322+
323+
private <U> ModelStage<List<U>> filteredCacheChildren(
324+
Function<Map.Entry<ZPath, ZNode<T>>, U> resolver, Predicate<Map.Entry<ZPath, ZNode<T>>> filter) {
316325
ModelStage<List<U>> stage = ModelStage.make();
317326
init.whenComplete((__, throwable) -> {
318327
if (throwable == null) {
319-
stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream()
320-
.filter(e -> !e.getKey().isRoot()
321-
&& e.getKey().parent().equals(client.modelSpec().path()))
328+
stage.complete(cache.currentChildren().entrySet().stream()
329+
.filter(filter)
322330
.map(resolver)
323331
.collect(Collectors.toList()));
324332
} else {

curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,6 @@ public Optional<ZNode<T>> currentData(ZPath path) {
118118
return Optional.empty();
119119
}
120120

121-
ZPath basePath() {
122-
return basePath;
123-
}
124-
125121
Map<ZPath, ZNode<T>> currentChildren() {
126122
return currentChildren(basePath);
127123
}

curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,12 @@ public void testChildren() {
148148
});
149149
});
150150

151+
complete(
152+
client.child("p").childrenAsZNodes(),
153+
(v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2)));
154+
complete(
155+
client.withPath(ZPath.from(path, "p")).childrenAsZNodes(),
156+
(v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(child1, child2)));
151157
complete(
152158
client.child("p").child("c1").childrenAsZNodes(),
153159
(v, e) -> assertEquals(toSet(v.stream(), ZNode::model), Sets.newHashSet(grandChild1)));
@@ -157,10 +163,34 @@ public void testChildren() {
157163

158164
complete(
159165
client.child("p").child("c1").list(),
160-
(v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild1)));
166+
(v, e) -> assertEquals(
167+
toSet(v.stream(), Function.identity()),
168+
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
161169
complete(
162170
client.child("p").child("c2").list(),
163-
(v, e) -> assertEquals(toSet(v.stream(), Function.identity()), Sets.newHashSet(grandChild2)));
171+
(v, e) -> assertEquals(
172+
toSet(v.stream(), Function.identity()),
173+
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
174+
complete(
175+
client.child("p").list(),
176+
(v, e) -> assertEquals(
177+
toSet(v.stream(), Function.identity()),
178+
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
179+
complete(
180+
client.child("p").child("c2").child("g2").list(),
181+
(v, e) -> assertEquals(
182+
toSet(v.stream(), Function.identity()),
183+
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
184+
}
185+
186+
try (CachedModeledFramework<TestModel> client =
187+
ModeledFramework.wrap(async, modelSpec.withPath(ZPath.root)).cached()) {
188+
client.start();
189+
complete(
190+
client.list(),
191+
(v, e) -> assertEquals(
192+
toSet(v.stream(), Function.identity()),
193+
Sets.newHashSet(parent, child1, child2, grandChild1, grandChild2)));
164194
}
165195
}
166196

@@ -244,6 +274,23 @@ void testNoNodeException() throws InterruptedException, TimeoutException {
244274
}
245275
}
246276

277+
@Test
278+
void testRead() throws InterruptedException, TimeoutException, ExecutionException {
279+
try (CachedModeledFramework<TestModel> client =
280+
ModeledFramework.wrap(async, modelSpec).cached()) {
281+
TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
282+
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
283+
client.start();
284+
assertEquals(model, timing.getFuture(client.read().toCompletableFuture()));
285+
assertEquals(
286+
model,
287+
timing.getFuture(client.readAsZNode().toCompletableFuture()).model());
288+
Stat stat = new Stat();
289+
assertEquals(model, timing.getFuture(client.read(stat).toCompletableFuture()));
290+
assertTrue(stat.getDataLength() > 0);
291+
}
292+
}
293+
247294
@Test
248295
void testReadThrough() throws InterruptedException, TimeoutException, ExecutionException {
249296
try (CachedModeledFramework<TestModel> client =
@@ -264,6 +311,13 @@ public void initialized() {
264311
client.start();
265312
assertTrue(timing.acquireSemaphore(semaphore));
266313
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
314+
Stat stat = new Stat();
315+
assertEquals(model, timing.getFuture(client.readThrough(stat).toCompletableFuture()));
316+
assertTrue(stat.getDataLength() > 0);
317+
assertEquals(
318+
model,
319+
timing.getFuture(client.readThroughAsZNode().toCompletableFuture())
320+
.model());
267321
assertEquals(model, timing.getFuture(client.readThrough().toCompletableFuture()));
268322
}
269323
}

0 commit comments

Comments
 (0)