Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.curator.framework.recipes.nodes;

import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
Expand All @@ -37,7 +38,7 @@

/**
* <p>
* Manages a {@link PersistentNode} that uses {@link CreateMode#CONTAINER}. Asynchronously
* Manages a {@link PersistentNode} that uses {@link CreateMode#PERSISTENT_WITH_TTL}. Asynchronously
* it creates or updates a child on the persistent node that is marked with a provided TTL.
* </p>
*
Expand All @@ -46,7 +47,7 @@
* a method of having the parent node deleted if the TTL expires. i.e. if the process
* that is running the PersistentTtlNode crashes and the TTL elapses, first the child node
* will be deleted due to the TTL expiration and then the parent node will be deleted as it's
* a container node with no children.
* a TTL node with no children.
* </p>
*
* <p>
Expand Down Expand Up @@ -159,46 +160,46 @@ public PersistentTtlNode(
this.client = Objects.requireNonNull(client, "client cannot be null");
this.ttlMs = ttlMs;
this.touchScheduleFactor = touchScheduleFactor;
node = new PersistentNode(client, CreateMode.CONTAINER, false, path, initData, useParentCreation) {
@Override
protected void deleteNode() {
// NOP
}
};
node =
new PersistentNode(
client, CreateMode.PERSISTENT_WITH_TTL, false, path, initData, ttlMs, useParentCreation) {
@Override
protected void deleteNode() {
// NOP
}
};
this.executorService = Objects.requireNonNull(executorService, "executorService cannot be null");
childPath = ZKPaths.makePath(Objects.requireNonNull(path, "path cannot be null"), childNodeName);
}

@VisibleForTesting
void touch() {
try {
try {
client.setData().forPath(childPath);
} catch (KeeperException.NoNodeException e) {
client.create()
.orSetData()
.withTtl(ttlMs)
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.forPath(childPath);
}
} catch (KeeperException.NoNodeException ignore) {
// ignore
} catch (Exception e) {
if (!ThreadUtils.checkInterrupted(e)) {
log.debug("Could not touch child node", e);
}
}
}

/**
* You must call start() to initiate the persistent ttl node
*/
public void start() {
node.start();

Runnable touchTask = new Runnable() {
@Override
public void run() {
try {
try {
client.setData().forPath(childPath);
} catch (KeeperException.NoNodeException e) {
client.create()
.orSetData()
.withTtl(ttlMs)
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.forPath(childPath);
}
} catch (KeeperException.NoNodeException ignore) {
// ignore
} catch (Exception e) {
if (!ThreadUtils.checkInterrupted(e)) {
log.debug("Could not touch child node", e);
}
}
}
};
Future<?> future = executorService.scheduleAtFixedRate(
touchTask, ttlMs / touchScheduleFactor, ttlMs / touchScheduleFactor, TimeUnit.MILLISECONDS);
this::touch, ttlMs / touchScheduleFactor, ttlMs / touchScheduleFactor, TimeUnit.MILLISECONDS);
futureRef.set(future);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@
import static org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode.BUILD_INITIAL_CACHE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.watch.PersistentWatcher;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -187,4 +193,48 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
assertNull(client.checkExists().forPath("/test"));
}
}

@Test
public void testTouchNodeNotCreated() throws Exception {
final String mainPath = "/parent/main";
final String touchPath = ZKPaths.makePath(mainPath, PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
final long testTtlMs = 500L;
final CountDownLatch mainCreatedLatch = new CountDownLatch(1);
final CountDownLatch mainDeletedLatch = new CountDownLatch(1);
final AtomicBoolean touchCreated = new AtomicBoolean();
try (CuratorFramework client =
CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) {
client.start();
assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
try (PersistentWatcher watcher = new PersistentWatcher(client, mainPath, true)) {
final Watcher listener = event -> {
final String path = event.getPath();
if (mainPath.equals(path)) {
final EventType type = event.getType();
if (EventType.NodeCreated.equals(type)) {
mainCreatedLatch.countDown();
} else if (EventType.NodeDeleted.equals(type)) {
mainDeletedLatch.countDown();
}
} else if (touchPath.equals(path)) {
touchCreated.set(true);
}
};
watcher.getListenable().addListener(listener);
watcher.start();
try (PersistentTtlNode node = new PersistentTtlNode(client, mainPath, testTtlMs, new byte[0]) {
@Override
void touch() {
// NOP
}
}) {
node.start();
assertTrue(mainCreatedLatch.await(1L, TimeUnit.SECONDS));
}
assertNull(client.checkExists().forPath(touchPath));
assertTrue(mainDeletedLatch.await(3L * testTtlMs, TimeUnit.MILLISECONDS));
assertFalse(touchCreated.get()); // Just to control that touch ZNode never created
}
}
}
}