diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index bed5749895e5..73f9c88b5f29 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -130,10 +130,20 @@ public int expire() { // Only clean the snapshot in changelog dir maxExclusive = Math.min(maxExclusive, latestChangelogId); + if (maxExclusive <= earliestChangelogId) { + // This happens when retainMin >= total changelog count + // (e.g. latestSnapshotId - retainMin + 1 <= earliestChangelogId) + return 0; + } + for (long id = min; id <= maxExclusive; id++) { - if (changelogManager.longLivedChangelogExists(id) - && olderThanMills <= changelogManager.longLivedChangelog(id).timeMillis()) { - return expireUntil(earliestChangelogId, id); + try { + if (olderThanMills <= changelogManager.tryGetChangelog(id).timeMillis()) { + return expireUntil(earliestChangelogId, id); + } + } catch (FileNotFoundException e) { + // ignore + // snapshot may have been deleted by another process } } return expireUntil(earliestChangelogId, maxExclusive); @@ -141,29 +151,42 @@ public int expire() { public int expireUntil(long earliestId, long endExclusiveId) { if (LOG.isDebugEnabled()) { - LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")"); + LOG.debug("Changelog expire range is [{}, {})", earliestId, endExclusiveId); } List taggedSnapshots = tagManager.taggedSnapshots(); List skippingSnapshots = findSkippingTags(taggedSnapshots, earliestId, endExclusiveId); - skippingSnapshots.add(changelogManager.changelog(endExclusiveId)); + try { + skippingSnapshots.add(changelogManager.tryGetChangelog(endExclusiveId)); + } catch (FileNotFoundException e) { + LOG.error( + "The endExclusive changelog #{} not found, skip expiration. Maybe you should use expire_changelogs to delete the separated changelogs.", + endExclusiveId, + e); + return 0; + } skippingSnapshots.add(snapshotManager.earliestSnapshot()); Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { - LOG.debug("Ready to delete changelog files from changelog #" + id); + LOG.debug("Ready to delete changelog files from changelog #{}", id); + } + Changelog changelog; + try { + changelog = changelogManager.tryGetChangelog(id); + } catch (FileNotFoundException e) { + LOG.info("Changelog #{} not found, skip it.", id, e); + continue; } - Changelog changelog = changelogManager.longLivedChangelog(id); Predicate skipper; try { skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id); } catch (Exception e) { LOG.info( - String.format( - "Skip cleaning data files of changelog '%s' due to failed to build skipping set.", - id), + "Skip cleaning data files of changelog #{} due to failed to build skipping set.", + id, e); continue; } @@ -219,13 +242,13 @@ public void expireAll() { Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestChangelogId; id <= latestChangelogId; id++) { - LOG.info("Ready to delete changelog files from changelog #" + id); + LOG.info("Ready to delete changelog files from changelog #{}", id); Changelog changelog; try { changelog = changelogManager.tryGetChangelog(id); } catch (FileNotFoundException e) { - LOG.info("fail to get changelog #" + id); + LOG.info("fail to get changelog #{}", id); continue; } Predicate skipper; @@ -233,9 +256,8 @@ public void expireAll() { skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id); } catch (Exception e) { LOG.info( - String.format( - "Skip cleaning data files of changelog '%s' due to failed to build skipping set.", - id), + "Skip cleaning data files of changelog '{}' due to failed to build skipping set.", + id, e); continue; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java index 1821d927c49e..281dd0f1a6f9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; @@ -41,6 +42,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -143,4 +145,52 @@ public void testChangelogExpireWithUncleanedManifestLists() throws Exception { assertThat(fileIO.exists(pathFactory.toManifestFilePath(manifest.fileName()))).isTrue(); } } + + @Test + public void testExpireWithMiddleChangelogNotFound() throws Exception { + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); + StreamTableWrite write = writeBuilder.newWrite(); + StreamTableCommit commit = writeBuilder.newCommit(); + for (int i = 1; i <= 10; i++) { + write(write, createRow(1, 0, i, i * 10)); + commit.commit(i, write.prepareCommit(true, i)); + } + write.close(); + commit.close(); + + SnapshotManager snapshotManager = table.snapshotManager(); + long latestSnapshotId = snapshotManager.latestSnapshotId(); + + // changelogRetainMax > snapshotRetainMax to ensure changelogDecoupled=true + ExpireConfig expireConfig = + ExpireConfig.builder() + .changelogRetainMax((int) latestSnapshotId) + .changelogRetainMin(1) + .changelogTimeRetain(Duration.ofMillis(0)) + .snapshotRetainMax(1) + .snapshotRetainMin(1) + .build(); + ExpireSnapshotsImpl expireSnapshots = + (ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig); + expireSnapshots.expire(); + + ChangelogManager changelogManager = table.changelogManager(); + FileIO fileIO = table.fileIO(); + long latestChangelogId = changelogManager.latestLongLivedChangelogId(); + long earliestChangelogId = changelogManager.earliestLongLivedChangelogId(); + + // Delete a middle changelog to simulate concurrent deletion + long middleId = (earliestChangelogId + latestChangelogId) / 2; + assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(middleId))).isTrue(); + fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(middleId)); + + ExpireChangelogImpl expire = + (ExpireChangelogImpl) table.newExpireChangelog().config(expireConfig); + + // should not throw even though a middle changelog is missing + assertThatCode(expire::expire).doesNotThrowAnyException(); + + // earliest should be advanced past the deleted range + assertThat(changelogManager.earliestLongLivedChangelogId()).isEqualTo(latestChangelogId); + } }