From c6aa44069102fe9f9793f888f2e6eea2ac4bc457 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 20 May 2026 21:23:52 +0800 Subject: [PATCH 1/5] [core] Changelog expiration skip expired changes to avoid FileNotFoundException --- .../paimon/table/ExpireChangelogImpl.java | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index bed5749895e5..922396f1f704 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -131,9 +131,13 @@ public int expire() { maxExclusive = Math.min(maxExclusive, latestChangelogId); for (long id = min; id <= maxExclusive; id++) { - if (changelogManager.longLivedChangelogExists(id) - && olderThanMills <= changelogManager.longLivedChangelog(id).timeMillis()) { - return expireUntil(earliestChangelogId, id); + try { + if (olderThanMills <= changelogManager.tryGetChangelog(id).timeMillis()) { + return expireUntil(earliestChangelogId, id); + } + } catch (FileNotFoundException e) { + // ignore + // snapshot may have been deleted by another process } } return expireUntil(earliestChangelogId, maxExclusive); @@ -148,23 +152,42 @@ public int expireUntil(long earliestId, long endExclusiveId) { List skippingSnapshots = findSkippingTags(taggedSnapshots, earliestId, endExclusiveId); - skippingSnapshots.add(changelogManager.changelog(endExclusiveId)); + try { + skippingSnapshots.add(changelogManager.tryGetChangelog(endExclusiveId)); + } catch (FileNotFoundException e) { + LOG.info("Changelog #{} not found, skip expiring data files.", endExclusiveId, e); + for (long id = earliestId; id < endExclusiveId; id++) { + changelogManager + .fileIO() + .deleteQuietly(changelogManager.longLivedChangelogPath(id)); + } + writeEarliestHintFile(endExclusiveId); + return (int) (endExclusiveId - earliestId); + } skippingSnapshots.add(snapshotManager.earliestSnapshot()); Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { LOG.debug("Ready to delete changelog files from changelog #" + id); } - Changelog changelog = changelogManager.longLivedChangelog(id); + Changelog changelog; + try { + changelog = changelogManager.tryGetChangelog(id); + } catch (FileNotFoundException e) { + LOG.info("Changelog #{} not found, skip it.", id, e); + continue; + } Predicate skipper; try { skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id); } catch (Exception e) { LOG.info( - String.format( - "Skip cleaning data files of changelog '%s' due to failed to build skipping set.", - id), + "Skip cleaning data files of changelog #{} due to failed to build skipping set.", + id, e); + changelogManager + .fileIO() + .deleteQuietly(changelogManager.longLivedChangelogPath(id)); continue; } From 59c91187a0154c430391de652cf364f099061491 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Thu, 21 May 2026 14:09:47 +0800 Subject: [PATCH 2/5] add tests --- .../paimon/table/ChangelogExpireTest.java | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java index 1821d927c49e..6e591d6e723e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; @@ -41,6 +42,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -143,4 +145,108 @@ public void testChangelogExpireWithUncleanedManifestLists() throws Exception { assertThat(fileIO.exists(pathFactory.toManifestFilePath(manifest.fileName()))).isTrue(); } } + + @Test + public void testExpireWithEndChangelogNotFound() throws Exception { + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); + StreamTableWrite write = writeBuilder.newWrite(); + StreamTableCommit commit = writeBuilder.newCommit(); + for (int i = 1; i <= 10; i++) { + write(write, createRow(1, 0, i, i * 10)); + commit.commit(i, write.prepareCommit(true, i)); + } + write.close(); + commit.close(); + + SnapshotManager snapshotManager = table.snapshotManager(); + long latestSnapshotId = snapshotManager.latestSnapshotId(); + int changelogRetainMin = 3; + + // changelogRetainMax > snapshotRetainMax to ensure changelogDecoupled=true + ExpireConfig expireConfig = + ExpireConfig.builder() + .changelogRetainMax((int) latestSnapshotId) + .changelogRetainMin(changelogRetainMin) + .changelogTimeRetain(Duration.ofMillis(0)) + .snapshotRetainMax(1) + .snapshotRetainMin(1) + .build(); + ExpireSnapshotsImpl expireSnapshots = + (ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig); + expireSnapshots.expire(); + + ChangelogManager changelogManager = table.changelogManager(); + FileIO fileIO = table.fileIO(); + + // expire() will compute maxExclusive = latestSnapshotId - retainMin + 1 + // and call expireUntil(earliest, maxExclusive) + // Delete that changelog to simulate concurrent deletion + long endExclusiveId = latestSnapshotId - changelogRetainMin + 1; + assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(endExclusiveId))).isTrue(); + fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(endExclusiveId)); + + ExpireChangelogImpl expire = + (ExpireChangelogImpl) table.newExpireChangelog().config(expireConfig); + + // should not throw + assertThatCode(expire::expire).doesNotThrowAnyException(); + + // changelog files in [earliest, endExclusiveId) should be deleted + long earliestChangelogId = 1; + for (long id = earliestChangelogId; id < endExclusiveId; id++) { + assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(id))).isFalse(); + } + + // changelogs after endExclusiveId still exist + long latestChangelogId = changelogManager.latestLongLivedChangelogId(); + assertThat(latestChangelogId).isGreaterThan(endExclusiveId); + } + + @Test + public void testExpireWithMiddleChangelogNotFound() throws Exception { + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); + StreamTableWrite write = writeBuilder.newWrite(); + StreamTableCommit commit = writeBuilder.newCommit(); + for (int i = 1; i <= 10; i++) { + write(write, createRow(1, 0, i, i * 10)); + commit.commit(i, write.prepareCommit(true, i)); + } + write.close(); + commit.close(); + + SnapshotManager snapshotManager = table.snapshotManager(); + long latestSnapshotId = snapshotManager.latestSnapshotId(); + + // changelogRetainMax > snapshotRetainMax to ensure changelogDecoupled=true + ExpireConfig expireConfig = + ExpireConfig.builder() + .changelogRetainMax((int) latestSnapshotId) + .changelogRetainMin(1) + .changelogTimeRetain(Duration.ofMillis(0)) + .snapshotRetainMax(1) + .snapshotRetainMin(1) + .build(); + ExpireSnapshotsImpl expireSnapshots = + (ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig); + expireSnapshots.expire(); + + ChangelogManager changelogManager = table.changelogManager(); + FileIO fileIO = table.fileIO(); + long latestChangelogId = changelogManager.latestLongLivedChangelogId(); + long earliestChangelogId = changelogManager.earliestLongLivedChangelogId(); + + // Delete a middle changelog to simulate concurrent deletion + long middleId = (earliestChangelogId + latestChangelogId) / 2; + assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(middleId))).isTrue(); + fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(middleId)); + + ExpireChangelogImpl expire = + (ExpireChangelogImpl) table.newExpireChangelog().config(expireConfig); + + // should not throw even though a middle changelog is missing + assertThatCode(expire::expire).doesNotThrowAnyException(); + + // earliest should be advanced past the deleted range + assertThat(changelogManager.earliestLongLivedChangelogId()).isEqualTo(latestChangelogId); + } } From efb5326eed20599f8dab27858d9d734b517c0cbe Mon Sep 17 00:00:00 2001 From: yuzelin Date: Thu, 21 May 2026 20:06:26 +0800 Subject: [PATCH 3/5] fix --- .../main/java/org/apache/paimon/table/ExpireChangelogImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 922396f1f704..52485a11dde6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -161,7 +161,6 @@ public int expireUntil(long earliestId, long endExclusiveId) { .fileIO() .deleteQuietly(changelogManager.longLivedChangelogPath(id)); } - writeEarliestHintFile(endExclusiveId); return (int) (endExclusiveId - earliestId); } skippingSnapshots.add(snapshotManager.earliestSnapshot()); From 04abcd8cf4fffc46833708af5b446823d657fe20 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 22 May 2026 16:26:04 +0800 Subject: [PATCH 4/5] fix --- .../paimon/table/ExpireChangelogImpl.java | 21 +++---- .../paimon/table/ChangelogExpireTest.java | 56 ------------------- 2 files changed, 11 insertions(+), 66 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 52485a11dde6..6bc754c0511d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -130,6 +130,12 @@ public int expire() { // Only clean the snapshot in changelog dir maxExclusive = Math.min(maxExclusive, latestChangelogId); + if (maxExclusive <= earliestChangelogId) { + // This happens when retainMin >= total changelog count + // (e.g. latestSnapshotId - retainMin + 1 <= earliestChangelogId), + return 0; + } + for (long id = min; id <= maxExclusive; id++) { try { if (olderThanMills <= changelogManager.tryGetChangelog(id).timeMillis()) { @@ -155,13 +161,11 @@ public int expireUntil(long earliestId, long endExclusiveId) { try { skippingSnapshots.add(changelogManager.tryGetChangelog(endExclusiveId)); } catch (FileNotFoundException e) { - LOG.info("Changelog #{} not found, skip expiring data files.", endExclusiveId, e); - for (long id = earliestId; id < endExclusiveId; id++) { - changelogManager - .fileIO() - .deleteQuietly(changelogManager.longLivedChangelogPath(id)); - } - return (int) (endExclusiveId - earliestId); + LOG.error( + "The endExclusive changelog #{} not found, skip expiration. Maybe you should use expire_changelogs to delete the separated changelogs.", + endExclusiveId, + e); + return 0; } skippingSnapshots.add(snapshotManager.earliestSnapshot()); Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); @@ -184,9 +188,6 @@ public int expireUntil(long earliestId, long endExclusiveId) { "Skip cleaning data files of changelog #{} due to failed to build skipping set.", id, e); - changelogManager - .fileIO() - .deleteQuietly(changelogManager.longLivedChangelogPath(id)); continue; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java index 6e591d6e723e..281dd0f1a6f9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java @@ -146,62 +146,6 @@ public void testChangelogExpireWithUncleanedManifestLists() throws Exception { } } - @Test - public void testExpireWithEndChangelogNotFound() throws Exception { - StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); - StreamTableWrite write = writeBuilder.newWrite(); - StreamTableCommit commit = writeBuilder.newCommit(); - for (int i = 1; i <= 10; i++) { - write(write, createRow(1, 0, i, i * 10)); - commit.commit(i, write.prepareCommit(true, i)); - } - write.close(); - commit.close(); - - SnapshotManager snapshotManager = table.snapshotManager(); - long latestSnapshotId = snapshotManager.latestSnapshotId(); - int changelogRetainMin = 3; - - // changelogRetainMax > snapshotRetainMax to ensure changelogDecoupled=true - ExpireConfig expireConfig = - ExpireConfig.builder() - .changelogRetainMax((int) latestSnapshotId) - .changelogRetainMin(changelogRetainMin) - .changelogTimeRetain(Duration.ofMillis(0)) - .snapshotRetainMax(1) - .snapshotRetainMin(1) - .build(); - ExpireSnapshotsImpl expireSnapshots = - (ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig); - expireSnapshots.expire(); - - ChangelogManager changelogManager = table.changelogManager(); - FileIO fileIO = table.fileIO(); - - // expire() will compute maxExclusive = latestSnapshotId - retainMin + 1 - // and call expireUntil(earliest, maxExclusive) - // Delete that changelog to simulate concurrent deletion - long endExclusiveId = latestSnapshotId - changelogRetainMin + 1; - assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(endExclusiveId))).isTrue(); - fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(endExclusiveId)); - - ExpireChangelogImpl expire = - (ExpireChangelogImpl) table.newExpireChangelog().config(expireConfig); - - // should not throw - assertThatCode(expire::expire).doesNotThrowAnyException(); - - // changelog files in [earliest, endExclusiveId) should be deleted - long earliestChangelogId = 1; - for (long id = earliestChangelogId; id < endExclusiveId; id++) { - assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(id))).isFalse(); - } - - // changelogs after endExclusiveId still exist - long latestChangelogId = changelogManager.latestLongLivedChangelogId(); - assertThat(latestChangelogId).isGreaterThan(endExclusiveId); - } - @Test public void testExpireWithMiddleChangelogNotFound() throws Exception { StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); From f41b182389b14a955d5b37e0f11fd11b554bc414 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 22 May 2026 17:45:13 +0800 Subject: [PATCH 5/5] fix comments --- .../apache/paimon/table/ExpireChangelogImpl.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 6bc754c0511d..73f9c88b5f29 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -132,7 +132,7 @@ public int expire() { if (maxExclusive <= earliestChangelogId) { // This happens when retainMin >= total changelog count - // (e.g. latestSnapshotId - retainMin + 1 <= earliestChangelogId), + // (e.g. latestSnapshotId - retainMin + 1 <= earliestChangelogId) return 0; } @@ -151,7 +151,7 @@ public int expire() { public int expireUntil(long earliestId, long endExclusiveId) { if (LOG.isDebugEnabled()) { - LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")"); + LOG.debug("Changelog expire range is [{}, {})", earliestId, endExclusiveId); } List taggedSnapshots = tagManager.taggedSnapshots(); @@ -171,7 +171,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { - LOG.debug("Ready to delete changelog files from changelog #" + id); + LOG.debug("Ready to delete changelog files from changelog #{}", id); } Changelog changelog; try { @@ -242,13 +242,13 @@ public void expireAll() { Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestChangelogId; id <= latestChangelogId; id++) { - LOG.info("Ready to delete changelog files from changelog #" + id); + LOG.info("Ready to delete changelog files from changelog #{}", id); Changelog changelog; try { changelog = changelogManager.tryGetChangelog(id); } catch (FileNotFoundException e) { - LOG.info("fail to get changelog #" + id); + LOG.info("fail to get changelog #{}", id); continue; } Predicate skipper; @@ -256,9 +256,8 @@ public void expireAll() { skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id); } catch (Exception e) { LOG.info( - String.format( - "Skip cleaning data files of changelog '%s' due to failed to build skipping set.", - id), + "Skip cleaning data files of changelog '{}' due to failed to build skipping set.", + id, e); continue; }