Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
Expand All @@ -47,11 +46,11 @@
public class CuratorZookeeperClient implements Closeable {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConnectionState state;
private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<>();
private final int connectionTimeoutMs;
private final int waitForShutdownTimeoutMs;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
private final AtomicReference<TracerDriver> tracer = new AtomicReference<>(new DefaultTracerDriver());

/**
*
Expand Down Expand Up @@ -155,13 +154,12 @@ public CuratorZookeeperClient(
RetryPolicy retryPolicy,
boolean canBeReadOnly) {
if (sessionTimeoutMs < connectionTimeoutMs) {
log.warn(String.format(
"session timeout [%d] is less than connection timeout [%d]",
sessionTimeoutMs, connectionTimeoutMs));
log.warn(
"session timeout [{}] is less than connection timeout [{}]", sessionTimeoutMs, connectionTimeoutMs);
}

retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");

this.connectionTimeoutMs = connectionTimeoutMs;
this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
Expand Down Expand Up @@ -213,7 +211,7 @@ public boolean isConnected() {

/**
* This method blocks until the connection to ZK succeeds. Use with caution. The block
* will timeout after the connection timeout (as passed to the constructor) has elapsed
* will time out after the connection timeout (as passed to the constructor) has elapsed
*
* @return true if the connection succeeded, false if not
* @throws InterruptedException interrupted while waiting
Expand All @@ -229,7 +227,7 @@ public boolean blockUntilConnectedOrTimedOut() throws InterruptedException {
trace.commit();

boolean localIsConnected = state.isConnected();
log.debug("blockUntilConnectedOrTimedOut() end. isConnected: " + localIsConnected);
log.debug("blockUntilConnectedOrTimedOut() end. isConnected: {}", localIsConnected);

return localIsConnected;
}
Expand All @@ -252,7 +250,7 @@ public void start() throws Exception {
/**
* Close the client.
*
* Same as {@link #close(int) } using the timeout set at construction time.
* <p>Same as {@link #close(int) } using the timeout set at construction time.</p>
*
* @see #close(int)
*/
Expand Down Expand Up @@ -403,12 +401,7 @@ public void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
throw new IllegalStateException("Client is not started or has been closed");
}
final CountDownLatch latch = new CountDownLatch(1);
Watcher tempWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
latch.countDown();
}
};
final Watcher tempWatcher = event -> latch.countDown();

state.addParentWatcher(tempWatcher);
long startTimeMs = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ protected long getSleepTimeMs(int retryCount, long elapsedTimeMs) {
// copied from Hadoop's RetryPolicies.java
long sleepMs = (long) baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
if (sleepMs > maxSleepMs) {
log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
log.warn("Sleep extension too large ({}). Pinning to {}", sleepMs, maxSleepMs);
sleepMs = maxSleepMs;
}
return sleepMs;
}

private static int validateMaxRetries(int maxRetries) {
if (maxRetries > MAX_RETRIES_LIMIT) {
log.warn(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
log.warn("maxRetries too large ({}). Pinning to {}", maxRetries, MAX_RETRIES_LIMIT);
maxRetries = MAX_RETRIES_LIMIT;
}
return maxRetries;
Expand Down
9 changes: 4 additions & 5 deletions curator-examples/src/main/java/cache/CuratorCacheExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ public static void main(String[] args) throws Exception {
// there are several ways to set a listener on a CuratorCache. You can watch for individual events
// or for all events. Here, we'll use the builder to log individual cache actions
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreates(node -> System.out.println(String.format("Node created: [%s]", node)))
.forChanges((oldNode, node) -> System.out.println(
String.format("Node changed. Old: [%s] New: [%s]", oldNode, node)))
.forDeletes(oldNode ->
System.out.println(String.format("Node deleted. Old value: [%s]", oldNode)))
.forCreates(node -> System.out.printf("Node created: [%s]%n", node))
.forChanges((oldNode, node) ->
System.out.printf("Node changed. Old: [%s] New: [%s]%n", oldNode, node))
.forDeletes(oldNode -> System.out.printf("Node deleted. Old value: [%s]%n", oldNode))
.forInitialized(() -> System.out.println("Cache initialized"))
.build();

Expand Down
10 changes: 5 additions & 5 deletions curator-examples/src/main/java/pubsub/SubPubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private void publishSomething(Publisher publisher) {
.mapToObj(__ ->
new Instance(nextId(), random(InstanceType.values()), random(hostnames), random(ports)))
.collect(Collectors.toList());
System.out.println(String.format("Publishing %d instances", instances.size()));
System.out.printf("Publishing %d instances%n", instances.size());
publisher.publishInstances(instances);
break;
}
Expand All @@ -161,7 +161,7 @@ private void publishSomething(Publisher publisher) {
.mapToObj(__ -> new LocationAvailable(
nextId(), random(Priority.values()), random(locations), random(durations)))
.collect(Collectors.toList());
System.out.println(String.format("Publishing %d locationsAvailable", locationsAvailable.size()));
System.out.printf("Publishing %d locationsAvailable%n", locationsAvailable.size());
publisher.publishLocationsAvailable(random(groups), locationsAvailable);
break;
}
Expand All @@ -179,16 +179,16 @@ private void publishSomething(Publisher publisher) {
.mapToObj(__ -> new UserCreated(
nextId(), random(Priority.values()), random(locations), random(positions)))
.collect(Collectors.toList());
System.out.println(String.format("Publishing %d usersCreated", usersCreated.size()));
System.out.printf("Publishing %d usersCreated%n", usersCreated.size());
publisher.publishUsersCreated(random(groups), usersCreated);
break;
}
}
}

private <T> ModeledCacheListener<T> generalListener() {
return (type, path, stat, model) -> System.out.println(
String.format("Subscribed %s @ %s", model.getClass().getSimpleName(), path));
return (type, path, stat, model) ->
System.out.printf("Subscribed %s @ %s%n", model.getClass().getSimpleName(), path);
}

@SafeVarargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ final ZooKeeper getZooKeeper() throws Exception {

protected final void internalSync(CuratorFrameworkBase impl, String path, Object context) {
BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
processBackgroundOperation(new OperationAndData(operation, path, null, null, context, null), null);
processBackgroundOperation(new OperationAndData<>(operation, path, null, null, context, null), null);
}

abstract byte[] getDefaultData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void forEach(Consumer<V> function) {
function.accept(entry.listener);
} catch (Throwable e) {
ThreadUtils.checkInterrupted(e);
log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
log.error("Listener ({}) threw an exception", entry.listener, e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.Closeable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -57,13 +56,13 @@ public class ConnectionStateManager implements Closeable {
}

private final Logger log = LoggerFactory.getLogger(getClass());
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
private final CuratorFramework client;
private final int sessionTimeoutMs;
private final int sessionExpirationPercent;
private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
private final ExecutorService service;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
private final UnaryListenerManager<ConnectionStateListener> listeners;

// guarded by sync
Expand All @@ -80,9 +79,9 @@ private enum State {
}

/**
* @param client the client
* @param threadFactory thread factory to use or null for a default
* @param sessionTimeoutMs the ZK session timeout in milliseconds
* @param client the client
* @param threadFactory thread factory to use or null for a default
* @param sessionTimeoutMs the ZK session timeout in milliseconds
* @param sessionExpirationPercent percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all
*/
public ConnectionStateManager(
Expand All @@ -96,11 +95,11 @@ public ConnectionStateManager(
}

/**
* @param client the client
* @param threadFactory thread factory to use or null for a default
* @param sessionTimeoutMs the ZK session timeout in milliseconds
* @param client the client
* @param threadFactory thread factory to use or null for a default
* @param sessionTimeoutMs the ZK session timeout in milliseconds
* @param sessionExpirationPercent percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all
* @param managerFactory manager factory to use
* @param managerFactory manager factory to use
*/
public ConnectionStateManager(
CuratorFramework client,
Expand All @@ -124,12 +123,9 @@ public ConnectionStateManager(
public void start() {
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

service.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
processEvents();
return null;
}
service.submit(() -> {
processEvents();
return null;
});
}

Expand Down Expand Up @@ -173,7 +169,7 @@ public synchronized boolean setToSuspended() {

/**
* Post a state change. If the manager is already in that state the change
* is ignored. Otherwise the change is queued for listeners.
* is ignored. Otherwise, the change is queued for listeners.
*
* @param newConnectionState new state
* @return true if the state actually changed, false if it was already at that state
Expand Down Expand Up @@ -228,7 +224,7 @@ public synchronized boolean isConnected() {
}

private void postState(ConnectionState state) {
log.info("State change: " + state);
log.info("State change: {}", state);

notifyAll();

Expand Down Expand Up @@ -265,7 +261,7 @@ private void processEvents() {
&& client.getZookeeperClient().isConnected()) {
// CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been
// repaired
// this "hack" fixes it by forcing the state to RECONNECTED
// this "hack" fixes it by forcing the state to "RECONNECTED"
log.warn("ConnectionState is LOST but isConnected() is true. Forcing RECONNECTED.");
addStateChange(ConnectionState.RECONNECTED);
}
Expand All @@ -286,9 +282,10 @@ private void checkSessionExpiration() {
startOfSuspendedEpoch =
System.currentTimeMillis(); // reset startOfSuspendedEpoch to avoid spinning on this session
// expiration injection CURATOR-405
log.warn(String.format(
"Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d",
elapsedMs, useSessionTimeoutMs));
log.warn(
"Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: {}. Adjusted session timeout ms: {}",
elapsedMs,
useSessionTimeoutMs);
try {
if (lastExpiredInstanceIndex == client.getZookeeperClient().getInstanceIndex()) {
// last expiration didn't work for this instance, so event thread is dead and a reset is needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public DistributedAtomicValue(
* @throws Exception ZooKeeper errors
*/
public AtomicValue<byte[]> get() throws Exception {
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
MutableAtomicValue<byte[]> result = new MutableAtomicValue<>(null, null, false);
getCurrentValue(result, new Stat());
result.postValue = result.preValue;
result.succeeded = true;
Expand Down Expand Up @@ -119,16 +119,14 @@ public void forceSet(byte[] newValue) throws Exception {
*/
public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception {
Stat stat = new Stat();
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
MutableAtomicValue<byte[]> result = new MutableAtomicValue<>(null, null, false);
boolean createIt = getCurrentValue(result, stat);
if (!createIt && Arrays.equals(expectedValue, result.preValue)) {
try {
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
result.succeeded = true;
result.postValue = newValue;
} catch (KeeperException.BadVersionException dummy) {
result.succeeded = false;
} catch (KeeperException.NoNodeException dummy) {
} catch (KeeperException.BadVersionException | KeeperException.NoNodeException dummy) {
result.succeeded = false;
}
} else {
Expand All @@ -146,14 +144,9 @@ public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue)
* @throws Exception ZooKeeper errors
*/
public AtomicValue<byte[]> trySet(final byte[] newValue) throws Exception {
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
MutableAtomicValue<byte[]> result = new MutableAtomicValue<>(null, null, false);

MakeValue makeValue = new MakeValue() {
@Override
public byte[] makeFrom(byte[] previous) {
return newValue;
}
};
MakeValue makeValue = previous -> newValue;
tryOptimistic(result, makeValue);
if (!result.succeeded() && (mutex != null)) {
tryWithMutex(result, makeValue);
Expand Down Expand Up @@ -181,7 +174,7 @@ public boolean initialize(byte[] value) throws Exception {
}

AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception {
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
MutableAtomicValue<byte[]> result = new MutableAtomicValue<>(null, null, false);

tryOptimistic(result, makeValue);
if (!result.succeeded() && (mutex != null)) {
Expand All @@ -204,7 +197,7 @@ RuntimeException createCorruptionException(byte[] bytes) {
str.append("0x").append(Integer.toHexString((b & 0xff)));
}
str.append(']');
return new RuntimeException(String.format("Corrupted data for node \"%s\": %s", path, str.toString()));
return new RuntimeException(String.format("Corrupted data for node \"%s\": %s", path, str));
}

private boolean getCurrentValue(MutableAtomicValue<byte[]> result, Stat stat) throws Exception {
Expand Down Expand Up @@ -284,11 +277,9 @@ private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue)
}
result.postValue = Arrays.copyOf(newValue, newValue.length);
success = true;
} catch (KeeperException.NodeExistsException e) {
// do Retry
} catch (KeeperException.BadVersionException e) {
// do Retry
} catch (KeeperException.NoNodeException e) {
} catch (KeeperException.NodeExistsException
| KeeperException.BadVersionException
| KeeperException.NoNodeException e) {
// do Retry
}

Expand Down
Loading