From d6474f837b937b03139a1ab619a24c8c80618a79 Mon Sep 17 00:00:00 2001 From: pmuthari_LinkedIn Date: Thu, 7 Aug 2025 12:42:47 +0530 Subject: [PATCH 1/2] [LIZK-3903] Optimizing snapshot creation --- .../org/apache/jute/BinaryOutputArchive.java | 9 ++ .../java/org/apache/jute/OutputArchive.java | 2 + .../apache/jute/ToStringOutputArchive.java | 5 + .../org/apache/zookeeper/server/DataTree.java | 122 ++++++++++++++---- .../zookeeper/server/SegmentedList.java | 73 +++++++++++ .../server/SerializationPerfTest.java | 25 +++- .../zookeeper/test/InvalidSnapshotTest.java | 52 ++++++++ .../src/test/resources/logback.xml | 2 +- 8 files changed, 260 insertions(+), 30 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/SegmentedList.java diff --git a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java index 1a1749d8645..4821f0ca8a0 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java @@ -135,6 +135,15 @@ public void writeString(String s, String tag) throws IOException { out.write(bb.array(), bb.position(), bb.limit()); } + public void writeBytes(byte[] b, String string) throws IOException { + if (b == null) { + out.writeInt(-1); + return; + } + out.write(b, 0, b.length); + } + + public void writeBuffer(byte[] barr, String tag) throws IOException { if (barr == null) { diff --git a/zookeeper-jute/src/main/java/org/apache/jute/OutputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/OutputArchive.java index b076ca18b2e..703193b5667 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/OutputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/OutputArchive.java @@ -42,6 +42,8 @@ public interface OutputArchive { void writeString(String s, String tag) throws IOException; + public void writeBytes(byte[] b, String string) throws IOException; + void writeBuffer(byte[] buf, String tag) throws IOException; diff --git a/zookeeper-jute/src/main/java/org/apache/jute/ToStringOutputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/ToStringOutputArchive.java index a144011d8df..c383a71954f 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/ToStringOutputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/ToStringOutputArchive.java @@ -90,6 +90,11 @@ public void writeString(String s, String tag) throws IOException { throwExceptionOnError(tag); } + public void writeBytes(byte[] b, String string) throws IOException { + writeBuffer(b, string); + } + + public void writeBuffer(byte[] buf, String tag) throws IOException { printCommaUnlessFirst(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 1a5d1304120..79f782093cb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server; +import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.PrintWriter; @@ -35,6 +36,9 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.jute.BinaryInputArchive; @@ -1583,33 +1587,103 @@ private void setupQuota() { * @throws IOException */ void serializeNode(OutputArchive oa, StringBuilder path) throws IOException { - String pathString = path.toString(); - DataNode node = getNode(pathString); - if (node == null) { - return; + long startTime = System.currentTimeMillis(); + ForkJoinPool customThreadPool = new ForkJoinPool(8); + + Map>> sortMap = new ConcurrentHashMap<>(); + // Submit task and wait for its completion + try { + customThreadPool.submit(() -> + nodes.entrySet().parallelStream().forEach(entry -> { + int keyCount = countOccurrences(entry.getKey(), '/'); + // Use thread-safe computeIfAbsent and synchronized list or ConcurrentLinkedQueue + sortMap.computeIfAbsent(keyCount, k -> new SegmentedList<>(64)) + .add(entry); + }) + ).get(); // wait for completion + + long midTime = System.currentTimeMillis(); + + ByteArrayOutputStream[] baos = new ByteArrayOutputStream[8]; + BinaryOutputArchive[] localArchive = new BinaryOutputArchive[8]; + for(int i = 0; i < baos.length; i++) { + baos[i] = new ByteArrayOutputStream(256 * 1024); + localArchive[i] = BinaryOutputArchive.getArchive(baos[i]); + } + + for(int j = 0; j < sortMap.size(); j++) { + List> nodeList = sortMap.get(j).toListSnapshot(); + + ArrayList> tasks = new ArrayList<>(); + for (int i = 0; i < 8; i++) { + int taskId = i; + tasks.add(customThreadPool.submit(() -> { + // Task logic for thread + // System.out.println("Executing task " + taskId + " on thread " + Thread.currentThread().getName()); + for(int k= taskId; k < nodeList.size(); k += 8) { + DataNode nodeCopy; + DataNode node = nodeList.get(k).getValue(); + synchronized (node) { + StatPersisted statCopy = new StatPersisted(); + copyStatPersisted(node.stat, statCopy); + //we do not need to make a copy of node.data because the contents + //are never changed + nodeCopy = new DataNode(node.data, node.acl, statCopy); + } + if(nodeList.get(k).getKey().compareTo("/") != 0) { + try { + serializeNodeData(localArchive[taskId], nodeList.get(k).getKey(), nodeCopy); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if(baos[taskId].size() > 128 * 1024) { + try { + flushBuffer(oa, baos[taskId]); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + try { + flushBuffer(oa, baos[taskId]); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + // Wait for all tasks to complete + for (ForkJoinTask task : tasks) { + task.join(); + } +// System.out.println("Size of level " + i + " = " + sortMap.get(i).size()); + } + + long endTime = System.currentTimeMillis(); + LOG.error("serialize took: " + (midTime - startTime) + " ms, writing: " + (endTime - midTime) + " ms"); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + customThreadPool.shutdown(); } - String[] children = null; - DataNode nodeCopy; - synchronized (node) { - StatPersisted statCopy = new StatPersisted(); - copyStatPersisted(node.stat, statCopy); - //we do not need to make a copy of node.data because the contents - //are never changed - nodeCopy = new DataNode(node.data, node.acl, statCopy); - Set childs = node.getChildren(); - children = childs.toArray(new String[childs.size()]); + } + + public synchronized void flushBuffer(OutputArchive oa, ByteArrayOutputStream baos) throws IOException { + if(baos.size() > 0) { + oa.writeBytes(baos.toByteArray(), "batch"); + baos.reset(); } - serializeNodeData(oa, pathString, nodeCopy); - path.append('/'); - int off = path.length(); - for (String child : children) { - // since this is single buffer being resused - // we need - // to truncate the previous bytes of string. - path.delete(off, Integer.MAX_VALUE); - path.append(child); - serializeNode(oa, path); + } + + public static int countOccurrences(String haystack, char needle) { + int count = 0; + char[] chars = haystack.toCharArray(); + for (char c : chars) { + if (c == needle) { + count++; + } } + return count; } // visiable for test diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SegmentedList.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SegmentedList.java new file mode 100644 index 00000000000..c06a010d203 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SegmentedList.java @@ -0,0 +1,73 @@ +package org.apache.zookeeper.server; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +public class SegmentedList { + + private final int segmentCount; + private final List[] segments; + private final ReentrantLock[] locks; + + @SuppressWarnings("unchecked") + public SegmentedList(int segmentCount) { + this.segmentCount = segmentCount; + this.segments = (List[]) new List[segmentCount]; + this.locks = new ReentrantLock[segmentCount]; + + for (int i = 0; i < segmentCount; i++) { + segments[i] = new ArrayList<>(); + locks[i] = new ReentrantLock(); + } + } + + private int segmentIndex(Object element) { + return (element == null ? 0 : (element.hashCode() & 0x7fffffff) % segmentCount); + } + + public void add(E element) { + int seg = segmentIndex(element); + locks[seg].lock(); + try { + segments[seg].add(element); + } finally { + locks[seg].unlock(); + } + } + + public List toListSnapshot() { + // snapshot copied under all locks to ensure consistency + List snapshot = new ArrayList<>(); + for (int i = 0; i < segmentCount; i++) { + locks[i].lock(); + } + try { + for (List segment : segments) { + snapshot.addAll(segment); + } + } finally { + for (int i = segmentCount - 1; i >= 0; i--) { + locks[i].unlock(); + } + } + return snapshot; + } + + public int size() { + int size = 0; + for (int i = 0; i < segmentCount; i++) { + locks[i].lock(); + } + try { + for (List segment : segments) { + size += segment.size(); + } + } finally { + for (int i = segmentCount - 1; i >= 0; i--) { + locks[i].unlock(); + } + } + return size; + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SerializationPerfTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SerializationPerfTest.java index 1ff0a76f6d4..5c85c74bc33 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SerializationPerfTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SerializationPerfTest.java @@ -18,11 +18,21 @@ package org.apache.zookeeper.server; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.persistence.FileSnap; +import org.apache.zookeeper.server.util.SerializeUtils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,14 +68,19 @@ static int createNodes(DataTree tree, String path, int depth, int childcount, in } private static void serializeTree(int depth, int width, int len) throws InterruptedException, IOException, KeeperException.NodeExistsException, KeeperException.NoNodeException { - DataTree tree = new DataTree(); - createNodes(tree, "/", depth, width, tree.getNode("/").stat.getCversion(), new byte[len]); - int count = tree.getNodeCount(); + BinaryInputArchive ia = BinaryInputArchive.getArchive( + Files.newInputStream(Paths.get("src/test/resources/data/snapshot.31a6097741"))); + FileSnap snap = new FileSnap(new File("src/test/resources/data/snapshot.31a6097741")); + ConcurrentHashMap sessionsWithTimeouts = new ConcurrentHashMap<>(); + DataTree dataTree = new DataTree(); + snap.deserialize(dataTree, sessionsWithTimeouts, ia); + int count = dataTree.getNodeCount(); BinaryOutputArchive oa = BinaryOutputArchive.getArchive(new NullOutputStream()); System.gc(); + LOG.info("Created the tree, now serializing"); long start = System.nanoTime(); - tree.serialize(oa, "test"); + dataTree.serialize(oa, "test"); long end = System.nanoTime(); long durationms = (end - start) / 1000000L; long pernodeus = ((end - start) / 1000L) / count; @@ -116,7 +131,7 @@ public void test40Wide4DeepSerialize() throws InterruptedException, IOException, @Test public void test300Wide3DeepSerialize() throws InterruptedException, IOException, KeeperException.NodeExistsException, KeeperException.NoNodeException { - serializeTree(3, 300, 20); + serializeTree(3, 300, 1024); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/InvalidSnapshotTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/InvalidSnapshotTest.java index 01ccefdef96..828b804c17b 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/InvalidSnapshotTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/InvalidSnapshotTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.io.FileUtils; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.ZKTestCase; @@ -128,4 +129,55 @@ public void testSnapshot() throws Exception { } + @Test + public void testSnapshotBenchmark() throws Exception { + File origSnapDir = new File(testData, "prod-test"); + + // This test otherwise updates the resources directory. + File snapDir = ClientBase.createEmptyTestDir(); + FileUtils.copyDirectory(origSnapDir, snapDir); + + LOG.info("Snapshot directory contents: {}", Arrays.toString(snapDir.listFiles())); + ZooKeeperServer zks = new ZooKeeperServer(snapDir, snapDir, 3000); + SyncRequestProcessor.setSnapCount(100); + final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); + ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); + f.startup(zks); + LOG.info("starting up the zookeeper server .. waiting"); + assertTrue("waiting for server being up", ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); + ZooKeeper zk = ClientBase.createZKClient(HOSTPORT); + + long total = 0L; + + try { + for (int i=0;i<10;i++) { + long start = System.currentTimeMillis(); + zks.takeSnapshot(); + long end = System.currentTimeMillis(); + //System.out.printf("\n=======\nSnapshot file count: {}", snapDir.listFiles().length); + LOG.error("Snapshot file count: {}", snapDir.listFiles().length); + total += (end - start); + LOG.error("Snapshot took {} ms", (end - start)); + } + + LOG.error("Average snapshot time: {} ms", (total / 10)); + + } catch (Exception e) { + LOG.error("Failed to take snapshot", e); + fail("Snapshot failed: " + e.getMessage()); + } + + try { + // we know this from the data files + // this node is the last node in the snapshot + + assertTrue(zk.exists("/kafka-kac", false) != null); + } finally { + zk.close(); + } + f.shutdown(); + zks.shutdown(); + assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT)); + + } } diff --git a/zookeeper-server/src/test/resources/logback.xml b/zookeeper-server/src/test/resources/logback.xml index a1e627724a5..fe7c60d038e 100644 --- a/zookeeper-server/src/test/resources/logback.xml +++ b/zookeeper-server/src/test/resources/logback.xml @@ -30,7 +30,7 @@ - + From 7d1516d736ad798dcf91854e103e415368f567ec Mon Sep 17 00:00:00 2001 From: pmuthari_LinkedIn Date: Thu, 7 Aug 2025 22:52:50 +0530 Subject: [PATCH 2/2] Separating regular snapshots and leader boot up snapshot --- .../org/apache/zookeeper/server/DataTree.java | 56 +++++++++++++++++-- .../server/SyncRequestProcessor.java | 2 +- .../apache/zookeeper/server/ZKDatabase.java | 2 +- .../zookeeper/server/ZooKeeperServer.java | 10 ++-- .../server/persistence/FileSnap.java | 10 ++-- .../server/persistence/FileTxnSnapLog.java | 6 +- .../server/persistence/SnapShot.java | 2 +- .../zookeeper/server/quorum/Learner.java | 4 +- .../zookeeper/server/util/SerializeUtils.java | 4 +- .../apache/zookeeper/server/DataTreeTest.java | 10 ++-- .../server/DeserializationPerfTest.java | 2 +- .../apache/zookeeper/server/PurgeTxnTest.java | 2 +- .../server/SerializationPerfTest.java | 2 +- .../zookeeper/server/SnapshotDigestTest.java | 8 +-- .../server/backup/RestorationToolTest.java | 2 +- .../server/persistence/EmptySnapshotTest.java | 4 +- .../persistence/FileTxnSnapLogTest.java | 6 +- .../zookeeper/server/quorum/Zab1_0Test.java | 8 +-- .../zookeeper/test/InvalidSnapshotTest.java | 2 +- .../zookeeper/test/LoadFromLogTest.java | 6 +- .../apache/zookeeper/test/TruncateTest.java | 2 +- 21 files changed, 98 insertions(+), 52 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 79f782093cb..d40e562230b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -1586,7 +1586,7 @@ private void setupQuota() { * a string builder. * @throws IOException */ - void serializeNode(OutputArchive oa, StringBuilder path) throws IOException { + void serializeNode(OutputArchive oa) throws IOException { long startTime = System.currentTimeMillis(); ForkJoinPool customThreadPool = new ForkJoinPool(8); @@ -1686,7 +1686,47 @@ public static int countOccurrences(String haystack, char needle) { return count; } - // visiable for test + /** + * this method uses a stringbuilder to create a new path for children. This + * is faster than string appends ( str1 + str2). + * + * @param oa + * OutputArchive to write to. + * @param path + * a string builder. + * @throws IOException + */ + void serializeNode(OutputArchive oa, StringBuilder path) throws IOException { + String pathString = path.toString(); + DataNode node = getNode(pathString); + if (node == null) { + return; + } + String[] children = null; + DataNode nodeCopy; + synchronized (node) { + StatPersisted statCopy = new StatPersisted(); + copyStatPersisted(node.stat, statCopy); + //we do not need to make a copy of node.data because the contents + //are never changed + nodeCopy = new DataNode(node.data, node.acl, statCopy); + Set childs = node.getChildren(); + children = childs.toArray(new String[childs.size()]); + } + serializeNodeData(oa, pathString, nodeCopy); + path.append('/'); + int off = path.length(); + for (String child : children) { + // since this is single buffer being resused + // we need + // to truncate the previous bytes of string. + path.delete(off, Integer.MAX_VALUE); + path.append(child); + serializeNode(oa, path); + } + } + + // visible for test public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException { oa.writeString(path, "path"); oa.writeRecord(node, "node"); @@ -1696,8 +1736,12 @@ public void serializeAcls(OutputArchive oa) throws IOException { aclCache.serialize(oa); } - public void serializeNodes(OutputArchive oa) throws IOException { - serializeNode(oa, new StringBuilder()); + public void serializeNodes(OutputArchive oa, boolean isLeaderBootupSnapshot) throws IOException { + if(isLeaderBootupSnapshot) { + serializeNode(oa); + } else { + serializeNode(oa, new StringBuilder()); + } // / marks end of stream // we need to check if clear had been called in between the snapshot. if (root != null) { @@ -1705,9 +1749,9 @@ public void serializeNodes(OutputArchive oa) throws IOException { } } - public void serialize(OutputArchive oa, String tag) throws IOException { + public void serialize(OutputArchive oa, String tag, boolean isLeaderBootupSnapshot) throws IOException { serializeAcls(oa); - serializeNodes(oa); + serializeNodes(oa, isLeaderBootupSnapshot); } public void deserialize(InputArchive ia, String tag) throws IOException { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 7c5e5ef6627..04fb97135e3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -190,7 +190,7 @@ public void run() { new ZooKeeperThread("Snapshot Thread") { public void run() { try { - zks.takeSnapshot(); + zks.takeSnapshot(false); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index 84beab90295..5377be97e2f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -645,7 +645,7 @@ public void deserializeSnapshot(InputArchive ia) throws IOException { * @throws InterruptedException */ public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException { - SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts()); + SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts(), false); } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index c0963695899..674eced3eaa 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -530,17 +530,17 @@ public void loadData() throws IOException, InterruptedException { } // Make a clean snapshot - takeSnapshot(); + takeSnapshot(true); } - public void takeSnapshot() { - takeSnapshot(false); + public void takeSnapshot(boolean isLeaderBootupSnapshot) { + takeSnapshot(false, isLeaderBootupSnapshot); } - public void takeSnapshot(boolean syncSnap) { + public void takeSnapshot(boolean syncSnap, boolean isLeaderBootupSnapshot) { long start = Time.currentElapsedTime(); try { - txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); + txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap, isLeaderBootupSnapshot); } catch (IOException e) { LOG.error("Severe unrecoverable error, exiting", e); // This is a severe error that we cannot recover from, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java index 8c5111cee8b..763fd4ed195 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java @@ -240,14 +240,15 @@ protected void serialize( DataTree dt, Map sessions, OutputArchive oa, - FileHeader header) throws IOException { + FileHeader header, + boolean isLeaderBootupSnapshot) throws IOException { // this is really a programmatic error and not something that can // happen at runtime if (header == null) { throw new IllegalStateException("Snapshot's not open for writing: uninitialized header"); } header.serialize(oa, "fileheader"); - SerializeUtils.serializeSnapshot(dt, oa, sessions); + SerializeUtils.serializeSnapshot(dt, oa, sessions, isLeaderBootupSnapshot); } /** @@ -261,12 +262,13 @@ public synchronized void serialize( DataTree dt, Map sessions, File snapShot, - boolean fsync) throws IOException { + boolean fsync, + boolean isLeaderBootupSnapshot) throws IOException { if (!close) { try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) { OutputArchive oa = BinaryOutputArchive.getArchive(snapOS); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); - serialize(dt, sessions, oa, header); + serialize(dt, sessions, oa, header, isLeaderBootupSnapshot); SnapStream.sealStream(snapOS, oa); // Digest feature was added after the CRC to make it backward diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index c66f91dad25..923c20f56e6 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -297,7 +297,7 @@ public long restore(DataTree dt, Map sessions, PlayBackListener l if (trustEmptyDB) { /* TODO: (br33d) we should either put a ConcurrentHashMap on restore() * or use Map on save() */ - save(dt, (ConcurrentHashMap) sessions, false); + save(dt, (ConcurrentHashMap) sessions, false, false); /* return a zxid of 0, since we know the database is empty */ return 0L; @@ -482,12 +482,12 @@ public TxnHeader getLastLoggedTxnHeader() { public void save( DataTree dataTree, ConcurrentHashMap sessionsWithTimeouts, - boolean syncSnap) throws IOException { + boolean syncSnap, boolean isLeaderBootupSnapshot) throws IOException { long lastZxid = dataTree.lastProcessedZxid; File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); try { - snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); + snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap, isLeaderBootupSnapshot); } catch (IOException e) { if (snapshotFile.length() == 0) { /* This may be caused by a full disk. In such a case, the server diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java index f5660c7df44..d76521fde35 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java @@ -48,7 +48,7 @@ public interface SnapShot { * @param fsync sync the snapshot immediately after write * @throws IOException */ - void serialize(DataTree dt, Map sessions, File name, boolean fsync) throws IOException; + void serialize(DataTree dt, Map sessions, File name, boolean fsync, boolean isLeaderBootupSnapshot) throws IOException; /** * find the most recent snapshot file diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 44205e83e86..956cd48fe0d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -712,7 +712,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } } if (isPreZAB1_0) { - zk.takeSnapshot(syncSnapshot); + zk.takeSnapshot(syncSnapshot, false); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); @@ -732,7 +732,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } if (snapshotNeeded) { - zk.takeSnapshot(syncSnapshot); + zk.takeSnapshot(syncSnapshot, false); } self.setCurrentEpoch(newEpoch); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java index 85f294c44a1..ef2c5f68d1e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java @@ -163,14 +163,14 @@ public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map sessions) throws IOException { + public static void serializeSnapshot(DataTree dt, OutputArchive oa, Map sessions, boolean isLeaderBootupSnapshot) throws IOException { HashMap sessSnap = new HashMap(sessions); oa.writeInt(sessSnap.size(), "count"); for (Entry entry : sessSnap.entrySet()) { oa.writeLong(entry.getKey().longValue(), "id"); oa.writeInt(entry.getValue().intValue(), "timeout"); } - dt.serialize(oa, "tree"); + dt.serialize(oa, "tree", isLeaderBootupSnapshot); } public static byte[] serializeRequest(Request request) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java index 2aad3b79e8c..c29a3d48aad 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java @@ -247,7 +247,7 @@ public void testPathTrieClearOnDeserialize() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); - tree.serialize(oa, "test"); + tree.serialize(oa, "test", false); baos.flush(); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); @@ -305,7 +305,7 @@ public void run() { } }; - tree.serialize(oa, "test"); + tree.serialize(oa, "test", false); //Let's make sure that we hit the code that ran the real assertion above assertTrue("Didn't find the expected node", ranTestCase.get()); @@ -566,7 +566,7 @@ public void testDeserializeDoesntLockACLCacheWhileReading() throws Exception { DataOutputStream out = new DataOutputStream(baos); BinaryOutputArchive oa = new BinaryOutputArchive(out); - tree.serialize(oa, "test"); + tree.serialize(oa, "test", false); DataTree tree2 = new DataTree(); DataInputStream in = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); @@ -656,7 +656,7 @@ public void run() { } }; - tree.serialize(oa, "test"); + tree.serialize(oa, "test", false); //Let's make sure that we hit the code that ran the real assertion above assertTrue("Didn't find the expected node", ranTestCase.get()); @@ -678,7 +678,7 @@ public void testReconfigACLClearOnDeserialize() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); - tree.serialize(oa, "test"); + tree.serialize(oa, "test", false); baos.flush(); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DeserializationPerfTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DeserializationPerfTest.java index 064b53e0898..ca5582cec29 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DeserializationPerfTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DeserializationPerfTest.java @@ -44,7 +44,7 @@ private static void deserializeTree(int depth, int width, int len) throws Interr ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); - tree.serialize(oa, "test"); + tree.serialize(oa, "test", false); baos.flush(); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PurgeTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PurgeTxnTest.java index 804e23726ee..e1bc618117a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PurgeTxnTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PurgeTxnTest.java @@ -447,7 +447,7 @@ public void testPurgeDoesNotDeleteOverlappingLogFile() throws Exception { for (int i = 0; i < 100; i++, unique++) { zk.create("/snap-" + unique, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - zks.takeSnapshot(); + zks.takeSnapshot(false); } // Create some additional znodes without taking a snapshot afterwards. for (int i = 0; i < 100; i++, unique++) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SerializationPerfTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SerializationPerfTest.java index 5c85c74bc33..fa8a18b66e4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SerializationPerfTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SerializationPerfTest.java @@ -80,7 +80,7 @@ private static void serializeTree(int depth, int width, int len) throws Interrup System.gc(); LOG.info("Created the tree, now serializing"); long start = System.nanoTime(); - dataTree.serialize(oa, "test"); + dataTree.serialize(oa, "test" , false); long end = System.nanoTime(); long durationms = (end - start) / 1000000L; long pernodeus = ((end - start) / 1000L) / count; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java index d40debfa7ad..06360e8b5f8 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java @@ -82,7 +82,7 @@ public void cleanUpCustomizedEnv() { public void testSnapshotDigest() throws Exception { // take a empty snapshot without creating any txn and make sure // there is no digest mismatch issue - server.takeSnapshot(); + server.takeSnapshot(false); reloadSnapshotAndCheckDigest(); // trigger various write requests @@ -111,7 +111,7 @@ public void testSnapshotDigest() throws Exception { // Take a snapshot and test the logic when loading a non-fuzzy snapshot server = serverFactory.getZooKeeperServer(); - server.takeSnapshot(); + server.takeSnapshot(false); reloadSnapshotAndCheckDigest(); } @@ -133,7 +133,7 @@ public void testDifferentDigestVersion() throws Exception { zk.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // take a full snapshot - server.takeSnapshot(); + server.takeSnapshot(false); //increment the digest version int newVersion = currentVersion + 1; @@ -176,7 +176,7 @@ private void testCompatibleHelper(Boolean enabledBefore, Boolean enabledAfter) t zk.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // take a full snapshot - server.takeSnapshot(); + server.takeSnapshot(false); ZooKeeperServer.setDigestEnabled(enabledAfter); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/backup/RestorationToolTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/backup/RestorationToolTest.java index 0c916918938..3a30af28ea6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/backup/RestorationToolTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/backup/RestorationToolTest.java @@ -136,7 +136,7 @@ public void setup() throws Exception { backupManager.getLogBackup().run(1); } if (getRandomBoolean(0.2f)) { - zks.takeSnapshot(); + zks.takeSnapshot(false); } // Record a timestamp that's in the valid backup timestamp range, used to test restoration to timestamp if (i == txnCnt / 2) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/EmptySnapshotTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/EmptySnapshotTest.java index 7eb6f6003ff..ecec907d0da 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/EmptySnapshotTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/EmptySnapshotTest.java @@ -64,7 +64,7 @@ public void testNoEmptySnapshot() throws Exception { DataTree tree = new DataTree(); tree.createNode("/empty-snapshot-test-1", "data".getBytes(), null, -1, -1, 1, 1); try { - snapLog.save(tree, new ConcurrentHashMap<>(), false); + snapLog.save(tree, new ConcurrentHashMap<>(), false, false); fail("Should have thrown an IOException"); } catch (IOException e) { // no op @@ -73,7 +73,7 @@ public void testNoEmptySnapshot() throws Exception { assertEquals(0, ((FileSnap) snapLog.snapLog).findNRecentSnapshots(10).size()); snapLog.snapLog = new FileSnap(snapLog.dataDir); - snapLog.save(tree, new ConcurrentHashMap<>(), false); + snapLog.save(tree, new ConcurrentHashMap<>(), false, false); assertEquals(1, ((FileSnap) snapLog.snapLog).findNRecentSnapshots(10).size()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java index 34d57f415dd..1390e97c5d8 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java @@ -339,7 +339,7 @@ public void testACLCreatedDuringFuzzySnapshotSync() throws IOException { leaderDataTree.processTxn(hdr1, txn1); // Finish the snapshot. - leaderDataTree.serializeNodes(oa); + leaderDataTree.serializeNodes(oa, false); os.close(); // Simulate restore on follower and replay. @@ -364,7 +364,7 @@ public void testEmptySnapshotSerialization() throws IOException { ConcurrentHashMap sessions = new ConcurrentHashMap<>(); ZooKeeperServer.setDigestEnabled(true); - snaplog.save(dataTree, sessions, true); + snaplog.save(dataTree, sessions, true, false); snaplog.restore(dataTree, sessions, (hdr, rec, digest) -> { }); assertNull(dataTree.getDigestFromLoadedSnapshot()); @@ -390,7 +390,7 @@ void testSnapshotSerializationCompatibility(Boolean digestEnabled, Boolean snapp CreateTxn txn = new CreateTxn("/" + 1, "data".getBytes(), null, false, 1); Request request = new Request(1, 1, 1, txnHeader, txn, 1); dataTree.processTxn(request.getHdr(), request.getTxn()); - snaplog.save(dataTree, sessions, true); + snaplog.save(dataTree, sessions, true, false); int expectedNodeCount = dataTree.getNodeCount(); ZooKeeperServer.setDigestEnabled(!digestEnabled); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index b8630ae1319..4280c10c34c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -369,7 +369,7 @@ public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversa assertTrue(zxid > ZxidUtils.makeZxid(1, 0)); // Generate snapshot and close files. - snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false); + snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false, false); snapLog.close(); QuorumPeer peer = createQuorumPeer(tmpDir); @@ -623,7 +623,7 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) oa.writeString("BenWasHere", null); Thread.sleep(10); //Give it some time to process the snap //No Snapshot taken yet, the SNAP was applied in memory - verify(f.zk, never()).takeSnapshot(); + verify(f.zk, never()).takeSnapshot(false); qp.setType(Leader.NEWLEADER); qp.setZxid(ZxidUtils.makeZxid(1, 0)); @@ -801,7 +801,7 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts()); assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); //Snapshot was never taken during very simple sync - verify(f.zk, never()).takeSnapshot(); + verify(f.zk, never()).takeSnapshot(false); } finally { TestUtils.deleteFileRecursively(tmpDir); } @@ -1195,7 +1195,7 @@ public void testInitialAcceptedCurrent() throws Exception { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); - logFactory.save(new DataTree(), new ConcurrentHashMap(), false); + logFactory.save(new DataTree(), new ConcurrentHashMap(), false, false); long zxid = ZxidUtils.makeZxid(3, 3); logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), new ErrorTxn(1), zxid)); logFactory.commit(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/InvalidSnapshotTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/InvalidSnapshotTest.java index 828b804c17b..dac20b74232 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/InvalidSnapshotTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/InvalidSnapshotTest.java @@ -152,7 +152,7 @@ public void testSnapshotBenchmark() throws Exception { try { for (int i=0;i<10;i++) { long start = System.currentTimeMillis(); - zks.takeSnapshot(); + zks.takeSnapshot(true); long end = System.currentTimeMillis(); //System.out.printf("\n=======\nSnapshot file count: {}", snapDir.listFiles().length); LOG.error("Snapshot file count: {}", snapDir.listFiles().length); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LoadFromLogTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LoadFromLogTest.java index 949cb0f961e..fd58364d8fe 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LoadFromLogTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LoadFromLogTest.java @@ -185,7 +185,7 @@ public void testRestore() throws Exception { zks.getZKDatabase().setlastProcessedZxid(zks.getZKDatabase().getDataTreeLastProcessedZxid() - 10); LOG.info("Set lastProcessedZxid to {}", zks.getZKDatabase().getDataTreeLastProcessedZxid()); // Force snapshot and restore - zks.takeSnapshot(); + zks.takeSnapshot(false); zks.shutdown(); stopServer(); @@ -245,7 +245,7 @@ public void testRestoreWithTransactionErrors() throws Exception { LOG.info("Set lastProcessedZxid to {}", zks.getZKDatabase().getDataTreeLastProcessedZxid()); // Force snapshot and restore - zks.takeSnapshot(); + zks.takeSnapshot(false); zks.shutdown(); stopServer(); @@ -292,7 +292,7 @@ public void testReloadSnapshotWithMissingParent() throws Exception { zks.getZKDatabase().setlastProcessedZxid(createZxId); LOG.info("Set lastProcessedZxid to {}", zks.getZKDatabase().getDataTreeLastProcessedZxid()); // Force snapshot and restore - zks.takeSnapshot(); + zks.takeSnapshot(false); zks.shutdown(); stopServer(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java index 089a7644aab..8fae4ffe1a7 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java @@ -79,7 +79,7 @@ public void testTruncationStreamReset() throws Exception { ZKDatabase zkdb = new ZKDatabase(snaplog); // make sure to snapshot, so that we have something there when // truncateLog reloads the db - snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false); + snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false, true); for (int i = 1; i <= 100; i++) { append(zkdb, i);