From 8697e6977ea20e24a95d2fad3175440a07d8cce9 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Fri, 29 May 2026 21:30:23 -0700 Subject: [PATCH 1/2] HDDS-15428. Remove purged snapshot YAMLs in one run Generated-by: Codex (GPT-5.4) --- .../snapshot/OmSnapshotLocalDataManager.java | 153 ++++++++- .../TestOmSnapshotLocalDataManager.java | 313 ++++++++++++++++-- 2 files changed, 430 insertions(+), 36 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java index a2203f675d80..1d0165aad2b3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java @@ -35,10 +35,12 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -383,7 +385,7 @@ private void init(OzoneConfiguration configuration, SnapshotChainManager chainMa this.scheduler.scheduleWithFixedDelay( () -> { try { - checkOrphanSnapshotVersions(omMetadataManager, chainManager); + checkQueuedOrphanSnapshotVersions(omMetadataManager, chainManager); } catch (Exception e) { LOG.error("Exception while checking orphan snapshot versions", e); } @@ -392,25 +394,152 @@ private void init(OzoneConfiguration configuration, SnapshotChainManager chainMa } - private void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChainManager chainManager) + /** + * Drains the snapshots currently queued in {@link #snapshotToBeCheckedForOrphans}. + * Each queued snapshot becomes a seed for a same-run ancestor cascade. + */ + private void checkQueuedOrphanSnapshotVersions(OMMetadataManager metadataManager, + SnapshotChainManager chainManager) throws IOException { - for (Map.Entry entry : snapshotToBeCheckedForOrphans.entrySet()) { - UUID snapshotId = entry.getKey(); - int countBeforeCheck = entry.getValue(); - checkOrphanSnapshotVersions(metadataManager, chainManager, snapshotId); - decrementOrphanCheckCount(snapshotId, countBeforeCheck); + Set processedSnapshotIds = new HashSet<>(); + List queuedSnapshotIds = getQueuedSnapshotIdsByCreationTimeDesc(metadataManager, chainManager); + if (queuedSnapshotIds.isEmpty()) { + return; + } + LOG.info("Draining orphan snapshot cleanup queue with {} seed snapshots", + queuedSnapshotIds.size()); + for (UUID snapshotId : queuedSnapshotIds) { + // A descendant processed earlier in this batch may already have checked this + // snapshot as an ancestor. If it re-queued itself for retry, leave that retry + // for the next scheduler iteration instead of checking it twice in one pass. + if (processedSnapshotIds.contains(snapshotId)) { + LOG.debug("Skipping queued snapshot {} because it was already processed earlier in this drain", + snapshotId); + continue; + } + cascadeOrphanSnapshotChecksFrom(metadataManager, chainManager, snapshotId, processedSnapshotIds); } + LOG.debug("Finished orphan snapshot cleanup drain; {} snapshots remain queued for future runs", + snapshotToBeCheckedForOrphans.size()); } - @VisibleForTesting - void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChainManager chainManager, - UUID snapshotId) throws IOException { + /** + * Sorts the currently queued snapshot ids from newest to oldest using snapshot creation + * time when that metadata is available. This is only a seed-order optimization for the + * outer batch drain; the inner cascade still handles ancestors discovered during the run. + */ + private List getQueuedSnapshotIdsByCreationTimeDesc(OMMetadataManager metadataManager, + SnapshotChainManager chainManager) throws IOException { + List snapshotIds = new ArrayList<>(snapshotToBeCheckedForOrphans.keySet()); + if (chainManager == null) { + LOG.debug("Snapshot chain manager unavailable; using queued orphan snapshot iteration order for {} snapshots", + snapshotIds.size()); + return snapshotIds; + } + Table snapshotInfoTable = metadataManager.getSnapshotInfoTable(); + if (snapshotInfoTable == null) { + LOG.debug("Snapshot info table unavailable; using queued orphan snapshot iteration order for {} snapshots", + snapshotIds.size()); + return snapshotIds; + } + + Map creationTimeBySnapshotId = new HashMap<>(); + for (UUID snapshotId : snapshotIds) { + String tableKey = chainManager.getTableKey(snapshotId); + if (tableKey == null) { + LOG.debug("Snapshot {} is queued for orphan cleanup but has no snapshot info table key; " + + "leaving it in fallback iteration order", snapshotId); + continue; + } + SnapshotInfo snapshotInfo = snapshotInfoTable.get(tableKey); + if (snapshotInfo != null) { + creationTimeBySnapshotId.put(snapshotId, snapshotInfo.getCreationTime()); + } else { + LOG.debug("Snapshot {} is queued for orphan cleanup but snapshot info is missing for table key {};" + + " leaving it in fallback iteration order", snapshotId, tableKey); + } + } + + // Prefer newer queued snapshots as seeds so descendants already waiting in the batch + // are more likely to run before their ancestors. + snapshotIds.sort(Comparator.comparingLong( + snapshotId -> creationTimeBySnapshotId.getOrDefault(snapshotId, Long.MIN_VALUE)).reversed()); + LOG.debug("Ordered {} queued orphan snapshot seeds by creation time using metadata for {} snapshots", + snapshotIds.size(), creationTimeBySnapshotId.size()); + return snapshotIds; + } + + /** + * Runs orphan cleanup starting from {@code snapshotId} and cascades toward older snapshots + * in the same call whenever removing data from the current snapshot may have unblocked its + * {@code previousSnapshotId}. + * + *

Callers that want an isolated one-off check should pass a fresh empty + * {@code processedSnapshotIds} set. {@link #checkQueuedOrphanSnapshotVersions( + * OMMetadataManager, SnapshotChainManager)} passes a shared set for the whole scheduler drain + * so a snapshot re-queued for retry is deferred to the next run instead of being checked twice + * in the same batch. + * + *

For example, if the chain is {@code S3 -> S2 -> S1} and checking {@code S3} removes + * orphan versions that make {@code S2} eligible, a single call seeded with {@code S3} + * will check {@code S3}, then {@code S2}, then {@code S1} in that order. + */ + void cascadeOrphanSnapshotChecksFrom(OMMetadataManager metadataManager, + SnapshotChainManager chainManager, UUID snapshotId, Set processedSnapshotIds) + throws IOException { + Deque snapshotsToCheck = new ArrayDeque<>(); + Set queuedSnapshotIds = new HashSet<>(); + queueSnapshotForOrphanCheck(snapshotsToCheck, queuedSnapshotIds, snapshotId); + + while (!snapshotsToCheck.isEmpty()) { + UUID currentSnapshotId = snapshotsToCheck.removeFirst(); + queuedSnapshotIds.remove(currentSnapshotId); + processedSnapshotIds.add(currentSnapshotId); + // Some callers invoke this method directly without first adding the snapshot to the + // orphan-check map, so only consume a queued count when one was present for this pass. + Integer countBeforeCheck = snapshotToBeCheckedForOrphans.get(currentSnapshotId); + if (countBeforeCheck == null) { + LOG.debug("Processing snapshot {} outside the shared orphan queue drain", currentSnapshotId); + } + UUID previousSnapshotIdToCheck = + checkSnapshotForOrphanVersionsOnce(metadataManager, chainManager, currentSnapshotId); + if (countBeforeCheck != null) { + decrementOrphanCheckCount(currentSnapshotId, countBeforeCheck); + } + // Only walk to the ancestor when this pass actually removed data from the current + // snapshot; otherwise we leave the ancestor to a later run or a direct trigger. + if (previousSnapshotIdToCheck != null) { + LOG.debug("Queueing previous snapshot {} after orphan cleanup removed data from snapshot {}", + previousSnapshotIdToCheck, currentSnapshotId); + queueSnapshotForOrphanCheck(snapshotsToCheck, queuedSnapshotIds, previousSnapshotIdToCheck); + } + } + } + + /** + * Adds {@code snapshotId} to the local queue exactly once for the current drain pass. + */ + private void queueSnapshotForOrphanCheck(Deque snapshotsToCheck, Set queuedSnapshotIds, + UUID snapshotId) { + if (snapshotId != null && queuedSnapshotIds.add(snapshotId)) { + snapshotsToCheck.addLast(snapshotId); + } + } + + /** + * Runs orphan cleanup for a single snapshot and returns the previous snapshot id when + * this pass removed at least one version, signaling that the ancestor may now be removable. + */ + private UUID checkSnapshotForOrphanVersionsOnce(OMMetadataManager metadataManager, + SnapshotChainManager chainManager, UUID snapshotId) throws IOException { LOG.info("Checking orphan snapshot versions for snapshot {}", snapshotId); try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider = new WritableOmSnapshotLocalDataProvider( snapshotId)) { OmSnapshotLocalData snapshotLocalData = snapshotLocalDataProvider.getSnapshotLocalData(); + UUID previousSnapshotId = snapshotLocalData.getPreviousSnapshotId(); boolean isSnapshotPurged = OmSnapshotManager.isSnapshotPurged(chainManager, metadataManager, snapshotId, snapshotLocalData.getTransactionInfo()); + boolean removedVersion = false; for (Map.Entry integerLocalDataVersionNodeEntry : getVersionNodeMap() .get(snapshotId).getSnapshotVersions().entrySet()) { LocalDataVersionNode versionEntry = integerLocalDataVersionNodeEntry.getValue(); @@ -427,6 +556,7 @@ void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChai "snapshotPurged: {}, inDegree : {}", snapshotId, versionEntry.getVersion(), snapshotLocalData.getVersion(), isSnapshotPurged, localDataGraph.inDegree(versionEntry)); snapshotLocalDataProvider.removeVersion(versionEntry.getVersion()); + removedVersion = true; } } finally { internalLock.readLock().unlock(); @@ -435,9 +565,12 @@ void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChai // If Snapshot is purged but not flushed completely to disk then this needs to wait for the next iteration // which can be done by incrementing the orphan check count for the snapshotId. if (!snapshotLocalData.getVersionSstFileInfos().isEmpty() && snapshotLocalData.getTransactionInfo() != null) { + LOG.debug("Snapshot {} still has {} local versions after orphan cleanup; queueing it for a future retry", + snapshotId, snapshotLocalData.getVersionSstFileInfos().size()); incrementOrphanCheckCount(snapshotId); } snapshotLocalDataProvider.commit(); + return removedVersion ? previousSnapshotId : null; } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java index c73304fa1229..b02e0821ab9a 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java @@ -56,6 +56,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -94,6 +95,7 @@ import org.apache.hadoop.ozone.upgrade.LayoutFeature; import org.apache.hadoop.ozone.util.YamlSerializer; import org.apache.ozone.rocksdb.util.SstFileInfo; +import org.apache.ozone.test.GenericTestUtils.LogCapturer; import org.apache.ratis.util.function.CheckedFunction; import org.assertj.core.util.Lists; import org.junit.jupiter.api.AfterAll; @@ -496,6 +498,11 @@ private void validateVersions(OmSnapshotLocalDataManager snapshotLocalDataManage } } + /** + * Verifies that removing versions from a descendant immediately exposes orphan + * versions in older snapshots, and that direct cascade checks can clean them in + * the same call whether the middle snapshot is purged or still active. + */ @ParameterizedTest @ValueSource(booleans = {true, false}) public void testOrphanVersionDeletionWithVersionDeletion(boolean purgeSnapshot) throws IOException { @@ -511,13 +518,20 @@ public void testOrphanVersionDeletionWithVersionDeletion(boolean purgeSnapshot) assertEquals(new HashSet<>(snapshotIds), localDataManager.getSnapshotToBeCheckedForOrphans().keySet()); localDataManager.getSnapshotToBeCheckedForOrphans().clear(); purgedSnapshotIdMap.put(secondSnapId, purgeSnapshot); - localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, thirdSnapId); + invokeCascadeOrphanCheck(null, thirdSnapId); try (ReadableOmSnapshotLocalDataProvider snap = localDataManager.getOmSnapshotLocalData(thirdSnapId)) { OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData(); assertEquals(Sets.newHashSet(0, 13), snapshotLocalData.getVersionSstFileInfos().keySet()); } - assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId)); - localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, secondSnapId); + if (purgeSnapshot) { + validateVersions(localDataManager, secondSnapId, 11, Sets.newHashSet(0, 10)); + } else { + validateVersions(localDataManager, secondSnapId, 11, Sets.newHashSet(0, 10, 11)); + } + validateVersions(localDataManager, firstSnapId, 3, Sets.newHashSet(0, 3)); + assertFalse(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId)); + assertFalse(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(firstSnapId)); + invokeCascadeOrphanCheck(null, secondSnapId); try (ReadableOmSnapshotLocalDataProvider snap = localDataManager.getOmSnapshotLocalData(secondSnapId)) { OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData(); if (purgeSnapshot) { @@ -528,6 +542,11 @@ public void testOrphanVersionDeletionWithVersionDeletion(boolean purgeSnapshot) } } + /** + * Verifies that changing a snapshot's previous-snapshot link queues the detached + * middle snapshot for orphan cleanup and that a direct cascade removes its YAML + * only when the snapshot is also purged. + */ @ParameterizedTest @ValueSource(booleans = {true, false}) public void testOrphanVersionDeletionWithChainUpdate(boolean purgeSnapshot) throws IOException { @@ -552,7 +571,7 @@ public void testOrphanVersionDeletionWithChainUpdate(boolean purgeSnapshot) thro } assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId)); - localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, secondSnapId); + invokeCascadeOrphanCheck(null, secondSnapId); if (purgeSnapshot) { assertThrows(NoSuchFileException.class, () -> localDataManager.getOmSnapshotLocalData(secondSnapId)); @@ -1062,7 +1081,7 @@ public void testCheckOrphanSnapshotVersionsWithStaleSnapshotChain() throws IOExc SnapshotChainManager staleChain = mock(SnapshotChainManager.class); when(staleChain.getTableKey(snapshotId)).thenReturn(null); - localDataManager.checkOrphanSnapshotVersions(omMetadataManager, staleChain, snapshotId); + invokeCascadeOrphanCheck(staleChain, snapshotId); // Before the fix: isSnapshotPurged returned true for any null tableKey, so the snapshot // was removed from versionNodeMap. getMeta() then returned null, causing NullPointerException @@ -1108,9 +1127,15 @@ private SnapshotInfo createMockSnapshotInfo(UUID snapshotId, UUID previousSnapsh private SnapshotInfo createMockSnapshotInfo(UUID snapshotId, UUID previousSnapshotId, SnapshotInfo.SnapshotStatus snapshotStatus) { + return createMockSnapshotInfo(snapshotId, previousSnapshotId, snapshotStatus, 0L); + } + + private SnapshotInfo createMockSnapshotInfo(UUID snapshotId, UUID previousSnapshotId, + SnapshotInfo.SnapshotStatus snapshotStatus, long creationTime) { SnapshotInfo.Builder builder = SnapshotInfo.newBuilder() .setSnapshotId(snapshotId) - .setName("snapshot-" + snapshotId); + .setName("snapshot-" + snapshotId) + .setCreationTime(creationTime); builder.setSnapshotStatus(snapshotStatus == null ? SNAPSHOT_ACTIVE : snapshotStatus); if (previousSnapshotId != null) { builder.setPathPreviousSnapshotId(previousSnapshotId); @@ -1162,36 +1187,272 @@ private void writeLocalDataToFile(OmSnapshotLocalData localData, Path filePath) } /** - * Tests the fix for the NoSuchFileException : when a purged snapshot (last in chain) - * has all its versions removed and YAML deleted by orphan check, it must NOT be - * re-added to snapshotToBeCheckedForOrphans. Otherwise the next orphan check run - * would try to load the deleted YAML and throw NoSuchFileException. - * + * Verifies that a purged leaf whose YAML was fully deleted is not re-queued for a + * later orphan-cleanup pass, which would otherwise try to reload a missing file. */ @Test public void testPurgedSnapshotNotReAddedAfterYamlDeleted() throws Exception { localDataManager = getNewOmSnapshotLocalDataManager(); List snapshotIds = createSnapshotLocalData(localDataManager, 2); UUID secondSnapId = snapshotIds.get(1); - // Simulate purge: set transactionInfo on S2's YAML (purge does this before orphan check runs) - try (WritableOmSnapshotLocalDataProvider writableProvider = - localDataManager.getWritableOmSnapshotLocalData(secondSnapId)) { - writableProvider.setTransactionInfo(TransactionInfo.valueOf(1, 1)); - writableProvider.commit(); - } - // S2 is last in chain - mark as purged so all versions get removed - purgedSnapshotIdMap.put(secondSnapId, true); - // Simulate purge adding S2 to orphan check list + markSnapshotsPurged(ImmutableList.of(secondSnapId)); localDataManager.getSnapshotToBeCheckedForOrphans().clear(); localDataManager.getSnapshotToBeCheckedForOrphans().put(secondSnapId, 1); - // Run full orphan check + invokeQueuedOrphanCheck(); + assertFalse(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId), + "Purged snapshot should not be re-added after YAML deleted"); + } + + /** + * Verifies the direct cascade path: once the newest purged snapshot is removed, + * cleanup walks to older snapshots in the same call and deletes all eligible YAMLs. + */ + @Test + public void testPurgedSnapshotCascadeCleanupRemovesAncestorsInSameRun() throws Exception { + localDataManager = getNewOmSnapshotLocalDataManager(); + List snapshotIds = createSnapshotLocalData(localDataManager, 3); + UUID firstSnapId = snapshotIds.get(0); + UUID secondSnapId = snapshotIds.get(1); + UUID thirdSnapId = snapshotIds.get(2); + + markSnapshotsPurged(snapshotIds); + localDataManager.getSnapshotToBeCheckedForOrphans().clear(); + + // Simulate the ancestor being checked earlier in the same run while its descendant still exists. + invokeCascadeOrphanCheck(null, secondSnapId); + assertTrue(localDataManager.getVersionNodeMapUnmodifiable().containsKey(secondSnapId)); + assertTrue(localDataManager.getVersionNodeMapUnmodifiable().containsKey(firstSnapId)); + + // Once the leaf is removed, cleanup should cascade immediately to its ancestors. + invokeCascadeOrphanCheck(null, thirdSnapId); + + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(thirdSnapId)); + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(secondSnapId)); + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(firstSnapId)); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(thirdSnapId)).exists()); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(secondSnapId)).exists()); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(firstSnapId)).exists()); + assertFalse(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId)); + assertFalse(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(firstSnapId)); + } + + /** + * Verifies the queued drain path: even when only the leaf is seeded in the shared + * queue, the drain should cascade through ancestors and delete all eligible YAMLs + * in the same scheduler pass. + */ + @Test + public void testBatchOrphanCheckCascadesFromLeafToAncestorsInSameRun() throws Exception { + localDataManager = getNewOmSnapshotLocalDataManager(); + List snapshotIds = createSnapshotLocalData(localDataManager, 3); + UUID firstSnapId = snapshotIds.get(0); + UUID secondSnapId = snapshotIds.get(1); + UUID thirdSnapId = snapshotIds.get(2); + + markSnapshotsPurged(snapshotIds); + localDataManager.getSnapshotToBeCheckedForOrphans().clear(); + localDataManager.getSnapshotToBeCheckedForOrphans().put(thirdSnapId, 1); + + invokeQueuedOrphanCheck(); + + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(thirdSnapId)); + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(secondSnapId)); + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(firstSnapId)); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(thirdSnapId)).exists()); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(secondSnapId)).exists()); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(firstSnapId)).exists()); + assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().isEmpty()); + } + + /** + * Verifies that stale ancestor seeds left in the original batch snapshot are ignored + * after an earlier descendant cascade has already removed them. + */ + @Test + public void testBatchOrphanCheckSkipsSnapshotsRemovedByEarlierCascade() throws Exception { + localDataManager = getNewOmSnapshotLocalDataManager(); + List snapshotIds = createSnapshotLocalData(localDataManager, 3); + UUID firstSnapId = snapshotIds.get(0); + UUID secondSnapId = snapshotIds.get(1); + UUID thirdSnapId = snapshotIds.get(2); + + markSnapshotsPurged(snapshotIds); + setSnapshotsToBeCheckedForOrphansInOrder(thirdSnapId, secondSnapId, firstSnapId); + + invokeQueuedOrphanCheck(); + + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(thirdSnapId)); + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(secondSnapId)); + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(firstSnapId)); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(thirdSnapId)).exists()); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(secondSnapId)).exists()); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(firstSnapId)).exists()); + assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().isEmpty()); + } + + /** + * Verifies that a purged snapshot which still has referenced local versions is checked + * only once in the current drain and remains queued for the next scheduler run. + */ + @Test + public void testQueuedOrphanCheckDefersRetriedSnapshotToNextRun() throws Exception { + localDataManager = getNewOmSnapshotLocalDataManager(); + List snapshotIds = createSnapshotLocalData(localDataManager, 3); + UUID firstSnapId = snapshotIds.get(0); + UUID secondSnapId = snapshotIds.get(1); + UUID thirdSnapId = snapshotIds.get(2); + + addVersionsToLocalData(localDataManager, firstSnapId, ImmutableMap.of(1, 1, 2, 2, 3, 3)); + addVersionsToLocalData(localDataManager, secondSnapId, ImmutableMap.of(4, 2, 8, 1, 10, 3, 11, 3)); + addVersionsToLocalData(localDataManager, thirdSnapId, ImmutableMap.of(5, 8, 13, 10)); + markSnapshotsPurged(ImmutableList.of(secondSnapId)); + setSnapshotsToBeCheckedForOrphansInOrder(thirdSnapId, secondSnapId); + + LogCapturer logCapturer = LogCapturer.captureLogs(OmSnapshotLocalDataManager.class); + try { + invokeQueuedOrphanCheck(); + } finally { + logCapturer.stopCapturing(); + } + + String checkLog = "Checking orphan snapshot versions for snapshot " + secondSnapId; + assertEquals(1, countOccurrences(logCapturer.getOutput(), checkLog)); + assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId)); + validateVersions(localDataManager, secondSnapId, 11, Sets.newHashSet(0, 10)); + validateVersions(localDataManager, firstSnapId, 3, Sets.newHashSet(0, 3)); + } + + /** + * Verifies that the initial batch seeds are processed from newest to oldest when + * creation-time metadata is available, avoiding an unnecessary early check of an + * older ancestor that is already queued ahead of its descendant. + */ + @Test + public void testQueuedOrphanCheckSortsInitialSeedsByCreationTimeDescending() throws Exception { + localDataManager = getNewOmSnapshotLocalDataManager(); + List snapshotIds = createSnapshotLocalData(localDataManager, 3); + UUID firstSnapId = snapshotIds.get(0); + UUID secondSnapId = snapshotIds.get(1); + UUID thirdSnapId = snapshotIds.get(2); + + markSnapshotsPurged(snapshotIds); + setSnapshotsToBeCheckedForOrphansInOrder(secondSnapId, thirdSnapId); + + SnapshotChainManager chainManager = mockSnapshotInfoChain(snapshotIds); + + LogCapturer logCapturer = LogCapturer.captureLogs(OmSnapshotLocalDataManager.class); + try { + invokeQueuedOrphanCheck(chainManager); + } finally { + logCapturer.stopCapturing(); + } + + String logs = logCapturer.getOutput(); + String secondCheckLog = "Checking orphan snapshot versions for snapshot " + secondSnapId; + String thirdCheckLog = "Checking orphan snapshot versions for snapshot " + thirdSnapId; + assertEquals(1, countOccurrences(logs, secondCheckLog)); + assertTrue(logs.indexOf(thirdCheckLog) < logs.indexOf(secondCheckLog)); + } + + /** + * Regression test for the original six-snapshot incident shape. All queued purged + * snapshots should have their YAMLs deleted in a single drain, and each snapshot + * should be checked only once when the newest-first seed ordering is applied. + */ + @Test + public void testQueuedOrphanCheckRemovesSixPurgedSnapshotsInOneRunWithoutDuplicateChecks() + throws Exception { + localDataManager = getNewOmSnapshotLocalDataManager(); + List snapshotIds = createSnapshotLocalData(localDataManager, 6); + + markSnapshotsPurged(snapshotIds); + setSnapshotsToBeCheckedForOrphansInOrder(snapshotIds.toArray(new UUID[0])); + SnapshotChainManager chainManager = mockSnapshotInfoChain(snapshotIds); + + LogCapturer logCapturer = LogCapturer.captureLogs(OmSnapshotLocalDataManager.class); + try { + invokeQueuedOrphanCheck(chainManager); + } finally { + logCapturer.stopCapturing(); + } + + assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().isEmpty()); + for (UUID snapshotId : snapshotIds) { + assertFalse(localDataManager.getVersionNodeMapUnmodifiable().containsKey(snapshotId)); + assertFalse(new File(localDataManager.getSnapshotLocalPropertyYamlPath(snapshotId)).exists()); + String checkLog = "Checking orphan snapshot versions for snapshot " + snapshotId; + assertEquals(1, countOccurrences(logCapturer.getOutput(), checkLog)); + } + } + + private void markSnapshotsPurged(List snapshotIds) throws IOException { + for (UUID snapshotId : snapshotIds) { + try (WritableOmSnapshotLocalDataProvider writableProvider = + localDataManager.getWritableOmSnapshotLocalData(snapshotId)) { + writableProvider.setTransactionInfo(TransactionInfo.valueOf(1, 1)); + writableProvider.commit(); + } + purgedSnapshotIdMap.put(snapshotId, true); + } + } + + private void setSnapshotsToBeCheckedForOrphansInOrder(UUID... snapshotIds) throws Exception { + LinkedHashMap snapshotsToCheck = new LinkedHashMap<>(); + for (UUID snapshotId : snapshotIds) { + snapshotsToCheck.put(snapshotId, 1); + } + java.lang.reflect.Field field = OmSnapshotLocalDataManager.class.getDeclaredField( + "snapshotToBeCheckedForOrphans"); + field.setAccessible(true); + field.set(localDataManager, snapshotsToCheck); + } + + private SnapshotChainManager mockSnapshotInfoChain(List snapshotIds) throws Exception { + Table table = new StringInMemoryTestTable<>(); + when(omMetadataManager.getSnapshotInfoTable()).thenReturn(table); + SnapshotChainManager chainManager = mock(SnapshotChainManager.class); + UUID previousSnapshotId = null; + long creationTime = 1L; + for (UUID snapshotId : snapshotIds) { + putSnapshotInfo(table, chainManager, snapshotId, previousSnapshotId, creationTime++); + previousSnapshotId = snapshotId; + } + return chainManager; + } + + private void putSnapshotInfo(Table table, SnapshotChainManager chainManager, + UUID snapshotId, UUID previousSnapshotId, long creationTime) throws Exception { + String tableKey = "table-key-" + snapshotId; + table.put(tableKey, createMockSnapshotInfo(snapshotId, previousSnapshotId, + SNAPSHOT_ACTIVE, creationTime)); + when(chainManager.getTableKey(snapshotId)).thenReturn(tableKey); + } + + private void invokeQueuedOrphanCheck(SnapshotChainManager chainManager) throws Exception { java.lang.reflect.Method method = OmSnapshotLocalDataManager.class.getDeclaredMethod( - "checkOrphanSnapshotVersions", OMMetadataManager.class, + "checkQueuedOrphanSnapshotVersions", OMMetadataManager.class, org.apache.hadoop.ozone.om.SnapshotChainManager.class); method.setAccessible(true); - method.invoke(localDataManager, omMetadataManager, null); - // S2 should NOT be in the map - assertFalse(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId), - "Purged snapshot should not be re-added after YAML deleted"); + method.invoke(localDataManager, omMetadataManager, chainManager); + } + + private void invokeQueuedOrphanCheck() throws Exception { + invokeQueuedOrphanCheck(null); + } + + private void invokeCascadeOrphanCheck(SnapshotChainManager chainManager, + UUID snapshotId) throws IOException { + localDataManager.cascadeOrphanSnapshotChecksFrom(omMetadataManager, chainManager, + snapshotId, new HashSet<>()); + } + + private int countOccurrences(String content, String needle) { + int count = 0; + int startIndex = 0; + while ((startIndex = content.indexOf(needle, startIndex)) >= 0) { + count++; + startIndex += needle.length(); + } + return count; } } From 251c2f7d4faa18d5b238ba176d68617791b61cda Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Fri, 29 May 2026 21:44:24 -0700 Subject: [PATCH 2/2] Simplify --- .../snapshot/OmSnapshotLocalDataManager.java | 39 +++++-------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java index 1d0165aad2b3..673d048f4b05 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java @@ -35,12 +35,10 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -409,14 +407,6 @@ private void checkQueuedOrphanSnapshotVersions(OMMetadataManager metadataManager LOG.info("Draining orphan snapshot cleanup queue with {} seed snapshots", queuedSnapshotIds.size()); for (UUID snapshotId : queuedSnapshotIds) { - // A descendant processed earlier in this batch may already have checked this - // snapshot as an ancestor. If it re-queued itself for retry, leave that retry - // for the next scheduler iteration instead of checking it twice in one pass. - if (processedSnapshotIds.contains(snapshotId)) { - LOG.debug("Skipping queued snapshot {} because it was already processed earlier in this drain", - snapshotId); - continue; - } cascadeOrphanSnapshotChecksFrom(metadataManager, chainManager, snapshotId, processedSnapshotIds); } LOG.debug("Finished orphan snapshot cleanup drain; {} snapshots remain queued for future runs", @@ -487,14 +477,15 @@ private List getQueuedSnapshotIdsByCreationTimeDesc(OMMetadataManager meta void cascadeOrphanSnapshotChecksFrom(OMMetadataManager metadataManager, SnapshotChainManager chainManager, UUID snapshotId, Set processedSnapshotIds) throws IOException { - Deque snapshotsToCheck = new ArrayDeque<>(); - Set queuedSnapshotIds = new HashSet<>(); - queueSnapshotForOrphanCheck(snapshotsToCheck, queuedSnapshotIds, snapshotId); - - while (!snapshotsToCheck.isEmpty()) { - UUID currentSnapshotId = snapshotsToCheck.removeFirst(); - queuedSnapshotIds.remove(currentSnapshotId); - processedSnapshotIds.add(currentSnapshotId); + for (UUID currentSnapshotId = snapshotId; currentSnapshotId != null;) { + // A descendant processed earlier in the same drain may already have checked + // this snapshot as an ancestor. If it re-queued itself for retry, leave that + // retry for the next scheduler iteration instead of checking it twice now. + if (!processedSnapshotIds.add(currentSnapshotId)) { + LOG.debug("Skipping snapshot {} because it was already processed earlier in this orphan cleanup pass", + currentSnapshotId); + return; + } // Some callers invoke this method directly without first adding the snapshot to the // orphan-check map, so only consume a queued count when one was present for this pass. Integer countBeforeCheck = snapshotToBeCheckedForOrphans.get(currentSnapshotId); @@ -511,18 +502,8 @@ void cascadeOrphanSnapshotChecksFrom(OMMetadataManager metadataManager, if (previousSnapshotIdToCheck != null) { LOG.debug("Queueing previous snapshot {} after orphan cleanup removed data from snapshot {}", previousSnapshotIdToCheck, currentSnapshotId); - queueSnapshotForOrphanCheck(snapshotsToCheck, queuedSnapshotIds, previousSnapshotIdToCheck); } - } - } - - /** - * Adds {@code snapshotId} to the local queue exactly once for the current drain pass. - */ - private void queueSnapshotForOrphanCheck(Deque snapshotsToCheck, Set queuedSnapshotIds, - UUID snapshotId) { - if (snapshotId != null && queuedSnapshotIds.add(snapshotId)) { - snapshotsToCheck.addLast(snapshotId); + currentSnapshotId = previousSnapshotIdToCheck; } }