Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,20 @@ public int expire() {
// Only clean the snapshot in changelog dir
maxExclusive = Math.min(maxExclusive, latestChangelogId);

if (maxExclusive <= earliestChangelogId) {
// This happens when retainMin >= total changelog count
// (e.g. latestSnapshotId - retainMin + 1 <= earliestChangelogId),
return 0;
}

for (long id = min; id <= maxExclusive; id++) {
if (changelogManager.longLivedChangelogExists(id)
&& olderThanMills <= changelogManager.longLivedChangelog(id).timeMillis()) {
return expireUntil(earliestChangelogId, id);
try {
if (olderThanMills <= changelogManager.tryGetChangelog(id).timeMillis()) {
return expireUntil(earliestChangelogId, id);
}
} catch (FileNotFoundException e) {
// ignore
// snapshot may have been deleted by another process
}
}
return expireUntil(earliestChangelogId, maxExclusive);
Expand All @@ -148,22 +158,35 @@ public int expireUntil(long earliestId, long endExclusiveId) {

List<Snapshot> skippingSnapshots =
findSkippingTags(taggedSnapshots, earliestId, endExclusiveId);
skippingSnapshots.add(changelogManager.changelog(endExclusiveId));
try {
skippingSnapshots.add(changelogManager.tryGetChangelog(endExclusiveId));
} catch (FileNotFoundException e) {
LOG.error(
"The endExclusive changelog #{} not found, skip expiration. Maybe you should use expire_changelogs to delete the separated changelogs.",
endExclusiveId,
e);
return 0;
}
skippingSnapshots.add(snapshotManager.earliestSnapshot());
Set<String> manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = earliestId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete changelog files from changelog #" + id);
}
Changelog changelog = changelogManager.longLivedChangelog(id);
Changelog changelog;
try {
changelog = changelogManager.tryGetChangelog(id);
} catch (FileNotFoundException e) {
LOG.info("Changelog #{} not found, skip it.", id, e);
continue;
}
Predicate<ExpireFileEntry> skipper;
try {
skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
"Skip cleaning data files of changelog '%s' due to failed to build skipping set.",
id),
"Skip cleaning data files of changelog #{} due to failed to build skipping set.",
id,
e);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -143,4 +145,52 @@ public void testChangelogExpireWithUncleanedManifestLists() throws Exception {
assertThat(fileIO.exists(pathFactory.toManifestFilePath(manifest.fileName()))).isTrue();
}
}

@Test
public void testExpireWithMiddleChangelogNotFound() throws Exception {
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = writeBuilder.newWrite();
StreamTableCommit commit = writeBuilder.newCommit();
for (int i = 1; i <= 10; i++) {
write(write, createRow(1, 0, i, i * 10));
commit.commit(i, write.prepareCommit(true, i));
}
write.close();
commit.close();

SnapshotManager snapshotManager = table.snapshotManager();
long latestSnapshotId = snapshotManager.latestSnapshotId();

// changelogRetainMax > snapshotRetainMax to ensure changelogDecoupled=true
ExpireConfig expireConfig =
ExpireConfig.builder()
.changelogRetainMax((int) latestSnapshotId)
.changelogRetainMin(1)
.changelogTimeRetain(Duration.ofMillis(0))
.snapshotRetainMax(1)
.snapshotRetainMin(1)
.build();
ExpireSnapshotsImpl expireSnapshots =
(ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig);
expireSnapshots.expire();

ChangelogManager changelogManager = table.changelogManager();
FileIO fileIO = table.fileIO();
long latestChangelogId = changelogManager.latestLongLivedChangelogId();
long earliestChangelogId = changelogManager.earliestLongLivedChangelogId();

// Delete a middle changelog to simulate concurrent deletion
long middleId = (earliestChangelogId + latestChangelogId) / 2;
assertThat(fileIO.exists(changelogManager.longLivedChangelogPath(middleId))).isTrue();
fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(middleId));

ExpireChangelogImpl expire =
(ExpireChangelogImpl) table.newExpireChangelog().config(expireConfig);

// should not throw even though a middle changelog is missing
assertThatCode(expire::expire).doesNotThrowAnyException();

// earliest should be advanced past the deleted range
assertThat(changelogManager.earliestLongLivedChangelogId()).isEqualTo(latestChangelogId);
}
}