diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index ba4c8bed4b75..e310f398f821 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -3615,4 +3615,4 @@ components: securitySchemes: BearerAuth: type: http - scheme: bearer \ No newline at end of file + scheme: bearer diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index a7a4de0b5cbe..23b34947bcc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -329,6 +329,8 @@ default Range nonNullRowIdRange() { DataFileMeta assignFirstRowId(long firstRowId); + DataFileMeta newFirstRowId(@Nullable Long newFirstRowId); + default List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(this)); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java index 5003974e8ead..9e845b26fe14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java @@ -379,6 +379,31 @@ public PojoDataFileMeta assignFirstRowId(long firstRowId) { writeCols); } + @Override + public PojoDataFileMeta newFirstRowId(@Nullable Long newFirstRowId) { + return new PojoDataFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource, + valueStatsCols, + externalPath, + newFirstRowId, + writeCols); + } + @Override public PojoDataFileMeta copy(List newExtraFiles) { return new PojoDataFileMeta( diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index a2c3e62c36e8..35dfd308df0a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,12 @@ public void fastForward(String branchName) { wrapped.fastForward(branchName); } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + privilegeChecker.assertCanInsert(identifier); + wrapped.mergeBranch(sourceBranch, targetBranch); + } + @Override public ExpireSnapshots newExpireSnapshots() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 522d25c7d379..df4224915927 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -55,6 +55,7 @@ import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.BranchMergeHandler; import org.apache.paimon.utils.CatalogBranchManager; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.DVMetaCache; @@ -737,6 +738,11 @@ public void fastForward(String branchName) { branchManager().fastForward(branchName); } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + branchManager().mergeBranch(sourceBranch, targetBranch); + } + @Override public TagManager tagManager() { return new TagManager(fileIO, path, currentBranch(), coreOptions()); @@ -749,7 +755,12 @@ public BranchManager branchManager() { return new CatalogBranchManager(catalogEnvironment.catalogLoader(), identifier()); } return new FileSystemBranchManager( - fileIO, path, snapshotManager(), tagManager(), schemaManager()); + fileIO, + path, + snapshotManager(), + tagManager(), + schemaManager(), + new BranchMergeHandler(this::switchToBranch)); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 1f234d81ff6d..7ba4bc20a9d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -295,6 +295,11 @@ public void fastForward(String branchName) { wrapped.fastForward(branchName); } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + wrapped.mergeBranch(sourceBranch, targetBranch); + } + @Override public ExpireSnapshots newExpireSnapshots() { return wrapped.newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index 7f13faf75314..aee60f65ee3c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -427,6 +427,11 @@ default void fastForward(String branchName) { throw new UnsupportedOperationException(); } + @Override + default void mergeBranch(String sourceBranch, String targetBranch) { + throw new UnsupportedOperationException(); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index b685f86e3e78..8d7add5bc50f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -281,6 +281,14 @@ default void fastForward(String branchName) { this.getClass().getSimpleName())); } + @Override + default void mergeBranch(String sourceBranch, String targetBranch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support mergeBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 98c00c010141..f58259b1fcb2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -210,6 +210,10 @@ default void deleteBranches(String branchNames) { @Experimental void fastForward(String branchName); + /** Merge source branch into target branch (append-only tables only). */ + @Experimental + void mergeBranch(String sourceBranch, String targetBranch); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index c830a799bff5..085f04747ca1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -59,6 +59,8 @@ public interface BranchManager { void fastForward(String branchName); + void mergeBranch(String sourceBranch, String targetBranch); + void renameBranch(String fromBranch, String toBranch); List branches(); @@ -98,6 +100,21 @@ static void validateBranch(String branchName) { branchName); } + static void mergeValidate(String sourceBranch, String targetBranch) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(sourceBranch), + "Source branch name '%s' is blank.", + sourceBranch); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(targetBranch), + "Target branch name '%s' is blank.", + targetBranch); + checkArgument( + !sourceBranch.equals(targetBranch), + "Cannot merge branch '%s' into itself.", + sourceBranch); + } + static void fastForwardValidate(String branchName, String currentBranch) { checkArgument( !branchName.equals(DEFAULT_MAIN_BRANCH), diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java new file mode 100644 index 000000000000..0327fd0bcfcb --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.function.Function; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Branch merge handler backed by {@link FileStoreTable}. */ +public class BranchMergeHandler { + + private final Function branchTableFactory; + + public BranchMergeHandler(Function branchTableFactory) { + this.branchTableFactory = branchTableFactory; + } + + public Map readBranchFiles(String branch) { + FileStoreTable branchTable = branchTableFactory.apply(branch); + Snapshot snapshot = branchTable.snapshotManager().latestSnapshot(); + checkArgument( + snapshot != null, + "Cannot read branch '%s', because it does not have any snapshot.", + branch); + ManifestList manifestList = branchTable.store().manifestListFactory().create(); + ManifestFile manifestFile = branchTable.store().manifestFileFactory().create(); + Map files = new LinkedHashMap<>(); + FileEntry.mergeEntries(manifestFile, manifestList.readDataManifests(snapshot), files, null); + return files; + } + + public void commit(String targetBranch, List filesToMerge) { + FileStoreTable branchTable = branchTableFactory.apply(targetBranch); + boolean rowTrackingEnabled = + new CoreOptions(branchTable.schema().options()).rowTrackingEnabled(); + + Map> grouped = new LinkedHashMap<>(); + for (ManifestEntry entry : filesToMerge) { + DataFileMeta file = prepareFileForTargetCommit(entry.file(), rowTrackingEnabled); + grouped.computeIfAbsent( + new MergeKey( + entry.partition().copy(), entry.bucket(), entry.totalBuckets()), + k -> new ArrayList<>()) + .add(file); + } + + String commitUser = UUID.randomUUID().toString(); + ManifestCommittable committable = new ManifestCommittable(0); + for (Map.Entry> e : grouped.entrySet()) { + MergeKey key = e.getKey(); + CommitMessageImpl message = + new CommitMessageImpl( + key.partition, + key.bucket, + key.totalBuckets, + new DataIncrement( + e.getValue(), Collections.emptyList(), Collections.emptyList()), + CompactIncrement.emptyIncrement()); + committable.addFileCommittable(message); + } + + try (FileStoreCommit commit = branchTable.store().newCommit(commitUser, branchTable)) { + commit.appendCommitCheckConflict(true).commit(committable, true); + } + } + + private DataFileMeta prepareFileForTargetCommit(DataFileMeta file, boolean rowTrackingEnabled) { + if (rowTrackingEnabled && file.firstRowId() != null) { + // Source files already have row ids assigned in their branch. Clear them so the + // target branch commit path assigns fresh, non-overlapping row ids. + return file.newFirstRowId(null); + } + return file; + } + + private static class MergeKey { + final BinaryRow partition; + final int bucket; + final int totalBuckets; + + MergeKey(BinaryRow partition, int bucket, int totalBuckets) { + this.partition = partition; + this.bucket = bucket; + this.totalBuckets = totalBuckets; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MergeKey)) { + return false; + } + MergeKey that = (MergeKey) o; + return bucket == that.bucket + && totalBuckets == that.totalBuckets + && Objects.equals(partition, that.partition); + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket, totalBuckets); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java index 4ba9c39475ae..9f97cf3a097d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java @@ -108,6 +108,11 @@ public void fastForward(String branchName) { }); } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + throw new UnsupportedOperationException("Branch merge is not supported via catalog."); + } + @Override public void renameBranch(String fromBranch, String toBranch) { executePost(catalog -> catalog.renameBranch(identifier, fromBranch, toBranch)); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java index 9c09637d1c37..aee178785028 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java @@ -18,9 +18,13 @@ package org.apache.paimon.utils; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.tag.Tag; @@ -29,8 +33,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -47,18 +54,21 @@ public class FileSystemBranchManager implements BranchManager { private final SnapshotManager snapshotManager; private final TagManager tagManager; private final SchemaManager schemaManager; + private final BranchMergeHandler mergeHandler; public FileSystemBranchManager( FileIO fileIO, Path path, SnapshotManager snapshotManager, TagManager tagManager, - SchemaManager schemaManager) { + SchemaManager schemaManager, + BranchMergeHandler mergeHandler) { this.fileIO = fileIO; this.tablePath = path; this.snapshotManager = snapshotManager; this.tagManager = tagManager; this.schemaManager = schemaManager; + this.mergeHandler = mergeHandler; } /** Return the root Directory of branch. */ @@ -210,6 +220,197 @@ public void fastForward(String branchName) { } } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + BranchManager.mergeValidate(sourceBranch, targetBranch); + validateMergeBranches(sourceBranch, targetBranch); + validateAppendOnlyHistory(sourceBranch, targetBranch); + validateAppendOnly(sourceBranch, targetBranch); + validateNoDataEvolution(sourceBranch, targetBranch); + validateRowTrackingConsistent(sourceBranch, targetBranch); + validateLatestSchema(sourceBranch, targetBranch); + + List filesToMerge = computeMergeDiff(sourceBranch, targetBranch); + if (filesToMerge.isEmpty()) { + return; + } + + validateMergeFileSchemas(sourceBranch, targetBranch, filesToMerge); + mergeHandler.commit(targetBranch, filesToMerge); + } + + private void validateMergeBranches(String sourceBranch, String targetBranch) { + if (!BranchManager.isMainBranch(sourceBranch)) { + checkArgument(branchExists(sourceBranch), "Branch '%s' doesn't exist.", sourceBranch); + } + if (!BranchManager.isMainBranch(targetBranch)) { + checkArgument(branchExists(targetBranch), "Branch '%s' doesn't exist.", targetBranch); + } + + SnapshotManager sourceSm = snapshotManager.copyWithBranch(sourceBranch); + checkArgument( + sourceSm.latestSnapshotId() != null, + "Cannot merge branch '%s', because it does not have snapshot.", + sourceBranch); + + SnapshotManager targetSm = snapshotManager.copyWithBranch(targetBranch); + checkArgument( + targetSm.latestSnapshotId() != null, + "Cannot merge into branch '%s', because it does not have snapshot.", + targetBranch); + } + + private void validateLatestSchema(String sourceBranch, String targetBranch) { + SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, sourceBranch); + SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, targetBranch); + TableSchema sourceSchema = sourceSchemaMgr.latest().get(); + TableSchema targetSchema = targetSchemaMgr.latest().get(); + checkArgument( + sourceSchema.fields().equals(targetSchema.fields()), + "Cannot merge branch '%s' into '%s', schema mismatch.", + sourceBranch, + targetBranch); + } + + private void validateMergeFileSchemas( + String sourceBranch, String targetBranch, List filesToMerge) { + SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, sourceBranch); + SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, targetBranch); + + for (Long schemaId : + filesToMerge.stream() + .map(entry -> entry.file().schemaId()) + .collect(Collectors.toSet())) { + checkArgument( + targetSchemaMgr.schemaExists(schemaId), + "Cannot merge branch '%s' into '%s', because target branch does not contain " + + "schema id %s required by source files.", + sourceBranch, + targetBranch, + schemaId); + + TableSchema sourceSchema = sourceSchemaMgr.schema(schemaId); + TableSchema targetSchema = targetSchemaMgr.schema(schemaId); + checkArgument( + sourceSchema.equals(targetSchema), + "Cannot merge branch '%s' into '%s', schema history mismatch for schema id %s.", + sourceBranch, + targetBranch, + schemaId); + } + } + + private void validateAppendOnly(String sourceBranch, String targetBranch) { + SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, sourceBranch); + TableSchema sourceSchema = sourceSchemaMgr.latest().get(); + checkArgument( + sourceSchema.primaryKeys().isEmpty(), + "Branch merge is only supported for append-only tables, " + + "but branch '%s' has primary keys.", + sourceBranch); + + SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, targetBranch); + TableSchema targetSchema = targetSchemaMgr.latest().get(); + checkArgument( + targetSchema.primaryKeys().isEmpty(), + "Branch merge is only supported for append-only tables, " + + "but branch '%s' has primary keys.", + targetBranch); + } + + // Branch merge is implemented as a conservative file-level merge. Without persisted branch + // lineage metadata, we cannot reliably infer a fork point after snapshots expire. To preserve + // correctness, both branches must retain complete append-only history from the first snapshot. + // This restriction can be relaxed in the future if branch fork metadata is introduced. + private void validateAppendOnlyHistory(String sourceBranch, String targetBranch) { + validateCompleteAppendOnly(snapshotManager.copyWithBranch(sourceBranch), sourceBranch); + validateCompleteAppendOnly(snapshotManager.copyWithBranch(targetBranch), targetBranch); + } + + private void validateCompleteAppendOnly(SnapshotManager sm, String branch) { + Long earliest = sm.earliestSnapshotId(); + Long latest = sm.latestSnapshotId(); + if (earliest == null || latest == null) { + return; + } + if (earliest != Snapshot.FIRST_SNAPSHOT_ID) { + throw new IllegalArgumentException( + String.format( + "Cannot merge: branch '%s' does not start at snapshot %d " + + "(earliest is %d). " + + "Branch merge requires complete append-only snapshot history.", + branch, Snapshot.FIRST_SNAPSHOT_ID, earliest)); + } + for (long id = Snapshot.FIRST_SNAPSHOT_ID; id <= latest; id++) { + if (!sm.snapshotExists(id)) { + throw new IllegalArgumentException( + String.format( + "Cannot merge: snapshot %d is missing on branch '%s'. " + + "Branch merge requires complete append-only snapshot history.", + id, branch)); + } + Snapshot snapshot = sm.snapshot(id); + Snapshot.CommitKind kind = snapshot.commitKind(); + if (kind != Snapshot.CommitKind.APPEND && kind != Snapshot.CommitKind.ANALYZE) { + throw new IllegalArgumentException( + String.format( + "Cannot merge: snapshot %d on branch '%s' has commit kind '%s'. " + + "Branch merge requires complete append-only snapshot history.", + id, branch, kind)); + } + } + } + + private void validateNoDataEvolution(String sourceBranch, String targetBranch) { + for (String branch : new String[] {sourceBranch, targetBranch}) { + SchemaManager sm = new SchemaManager(fileIO, tablePath, branch); + TableSchema schema = sm.latest().get(); + CoreOptions opts = new CoreOptions(schema.options()); + checkArgument( + !opts.dataEvolutionEnabled(), + "Branch merge is not supported for data-evolution tables (branch '%s').", + branch); + } + } + + private void validateRowTrackingConsistent(String sourceBranch, String targetBranch) { + boolean sourceEnabled = isRowTrackingEnabled(sourceBranch); + boolean targetEnabled = isRowTrackingEnabled(targetBranch); + checkArgument( + sourceEnabled == targetEnabled, + "Cannot merge branch '%s' into '%s': row-tracking settings must match " + + "(source=%s, target=%s).", + sourceBranch, + targetBranch, + sourceEnabled, + targetEnabled); + } + + private boolean isRowTrackingEnabled(String branch) { + SchemaManager schemaManager = new SchemaManager(fileIO, tablePath, branch); + TableSchema schema = schemaManager.latest().get(); + return new CoreOptions(schema.options()).rowTrackingEnabled(); + } + + private List computeMergeDiff(String sourceBranch, String targetBranch) { + Set targetFileIds = + mergeHandler.readBranchFiles(targetBranch).keySet(); + + Map sourceFiles = + mergeHandler.readBranchFiles(sourceBranch); + + List filesToMerge = new ArrayList<>(); + for (Map.Entry entry : sourceFiles.entrySet()) { + if (!targetFileIds.contains(entry.getKey())) { + ManifestEntry manifestEntry = entry.getValue(); + if (manifestEntry.kind() == FileKind.ADD) { + filesToMerge.add(manifestEntry); + } + } + } + return filesToMerge; + } + /** Check if a branch exists. */ public boolean branchExists(String branchName) { Path branchPath = branchPath(branchName); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index 30ccc652a0c9..c91dbb4c8e4f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.bucket.DefaultBucketFunction; import org.apache.paimon.data.BinaryRow; @@ -37,6 +38,8 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.BundleRecords; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; @@ -73,6 +76,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchMergeHandler; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat; @@ -86,8 +90,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -117,6 +123,7 @@ import static org.apache.paimon.io.DataFileTestUtils.row; import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1699,4 +1706,1101 @@ protected FileStoreTable createUnawareBucketFileStoreTable( "")); return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + + protected FileStoreTable createBranchMergeTable() throws Exception { + return createFileStoreTable(); + } + + protected FileStoreTable createBranchMergeTable(Consumer extraConfigure) + throws Exception { + return createFileStoreTable(extraConfigure); + } + + @Test + public void testMergeBranch() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write data to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write more data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + // Merge branch into main + table.mergeBranch(BRANCH_NAME, "main"); + + // Verify main has data from both sides + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchMultipleTimes() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // First write to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // First merge + table.mergeBranch(BRANCH_NAME, "main"); + + // Second write to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 3)); + } + + // Second merge + table.mergeBranch(BRANCH_NAME, "main"); + + // Verify no duplicates: main has all 3 rows exactly once + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchFailsOnStaleDuplicateCommit() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + BranchMergeHandler handler = new BranchMergeHandler(table::switchToBranch); + Map sourceFiles = + handler.readBranchFiles(BRANCH_NAME); + Map targetFiles = + handler.readBranchFiles("main"); + List filesToMerge = + sourceFiles.entrySet().stream() + .filter(entry -> !targetFiles.containsKey(entry.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + + handler.commit("main", filesToMerge); + assertThatThrownBy(() -> handler.commit("main", filesToMerge)) + .satisfies(anyCauseMatches(RuntimeException.class, "Trying to add file")); + } + + @Test + public void testMergeBranchBidirectional() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write shared data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + // Merge branch -> main + table.mergeBranch(BRANCH_NAME, "main"); + + // Merge main -> branch + table.mergeBranch("main", BRANCH_NAME); + + // Verify both have the same data without duplicates + List mainData = + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING); + List branchData = + getResult( + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING); + + assertThat(mainData) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(branchData).containsExactlyInAnyOrderElementsOf(mainData); + } + + @Test + public void testMergeBranchEmptyDiff() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch from tag (has same data as main) + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + // Merge should be a no-op (no exception, no new snapshot) + long snapshotBefore = table.snapshotManager().latestSnapshotId(); + table.mergeBranch(BRANCH_NAME, "main"); + long snapshotAfter = table.snapshotManager().latestSnapshotId(); + assertThat(snapshotAfter).isEqualTo(snapshotBefore); + } + + @Test + public void testMergeBranchSchemaConflict() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + // Modify schema on main (add a column) + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + schemaManager.commitChanges(SchemaChange.addColumn("new_col", DataTypes.INT())); + + // Merge should fail due to schema mismatch + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Cannot merge branch 'branch1' into 'main', schema mismatch.")); + } + + @Test + public void testMergeBranchSchemaHistoryConflict() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + SchemaManager branchSchemaManager = + new SchemaManager(table.fileIO(), table.location(), BRANCH_NAME); + branchSchemaManager.commitChanges(SchemaChange.addColumn("source_col", DataTypes.INT())); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + try (BatchTableWrite write = + tableBranch.newBatchWriteBuilder().newWrite().withWriteType(ROW_TYPE); + BatchTableCommit commit = tableBranch.newBatchWriteBuilder().newCommit()) { + write.write(rowData(1, 10, 100L)); + commit.commit(write.prepareCommit()); + } + branchSchemaManager.commitChanges( + Collections.singletonList(SchemaChange.dropColumn("source_col"))); + + SchemaManager mainSchemaManager = new SchemaManager(table.fileIO(), table.location()); + mainSchemaManager.commitChanges(SchemaChange.addColumn("target_col", DataTypes.INT())); + mainSchemaManager.commitChanges( + Collections.singletonList(SchemaChange.dropColumn("target_col"))); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "schema history mismatch for schema id 1")); + } + + @Test + public void testMergeBranchRowTrackingTable() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + assertRowIdRangesNonOverlapping(table); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(3L); + } + + @Test + public void testMergeBranchRowTrackingMultipleTimes() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // First write to branch + merge + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + table.mergeBranch(BRANCH_NAME, "main"); + + // Second write to branch + merge + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 3)); + } + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + assertRowIdRangesNonOverlapping(table); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(3L); + } + + @Test + public void testMergeBranchRowTrackingAfterTargetWrites() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write 2 rows to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 11, 101L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write 3 rows to main independently (advances main nextRowId) + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + write.write(rowData(2, 21, 201L)); + write.write(rowData(2, 22, 202L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .hasSize(6); + + assertRowIdRangesNonOverlapping(table); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(6L); + } + + @Test + public void testMergeBranchRowTrackingBetweenNonMainBranches() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch("branchA", "tag1"); + table.createBranch("branchB", "tag1"); + FileStoreTable tableA = table.switchToBranch("branchA"); + FileStoreTable tableB = table.switchToBranch("branchB"); + + try (StreamTableWrite write = tableA.newWrite(commitUser); + StreamTableCommit commit = tableA.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + try (StreamTableWrite write = tableB.newWrite(commitUser); + StreamTableCommit commit = tableB.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + table.mergeBranch("branchA", "branchB"); + + tableB = table.switchToBranch("branchB"); + assertThat( + getResult( + tableB.newRead(), + toSplits(tableB.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + assertRowIdRangesNonOverlapping(tableB); + Snapshot snapshot = tableB.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(3L); + } + + @Test + public void testMergeBranchRowTrackingMismatch() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Directly write a new schema to the branch with row-tracking disabled + SchemaManager branchSchemaManager = + new SchemaManager(table.fileIO(), table.location(), BRANCH_NAME); + TableSchema branchSchema = branchSchemaManager.latest().get(); + Map newOptions = new HashMap<>(branchSchema.options()); + newOptions.remove("row-tracking.enabled"); + TableSchema mismatchedSchema = + new TableSchema( + branchSchema.version(), + branchSchema.id() + 1, + branchSchema.fields(), + branchSchema.highestFieldId(), + branchSchema.partitionKeys(), + branchSchema.primaryKeys(), + newOptions, + branchSchema.comment(), + branchSchema.timeMillis()); + branchSchemaManager.commit(mismatchedSchema); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "row-tracking settings must match")); + } + + @Test + public void testMergeBranchRowTrackingStaleMerge() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write to main multiple times to advance nextRowId + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(3, 30, 300L)); + write.write(rowData(3, 31, 301L)); + commit.commit(3, write.prepareCommit(false, 3)); + } + + // Merge: branch file should get firstRowId after all main files + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .hasSize(5); + + assertRowIdRangesNonOverlapping(table); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(5L); + + // Write more to main, then merge again (branch has no new data, should be no-op) + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(4, 40, 400L)); + commit.commit(4, write.prepareCommit(false, 4)); + } + + long snapshotIdBefore = table.snapshotManager().latestSnapshotId(); + table.mergeBranch(BRANCH_NAME, "main"); + long snapshotIdAfter = table.snapshotManager().latestSnapshotId(); + + // Second merge should be no-op (branch file already in target) + assertThat(snapshotIdAfter).isEqualTo(snapshotIdBefore); + + assertRowIdRangesNonOverlapping(table); + Snapshot finalSnapshot = table.snapshotManager().latestSnapshot(); + assertThat(finalSnapshot.nextRowId()).isEqualTo(6L); + } + + private void assertRowIdRangesNonOverlapping(FileStoreTable table) { + ManifestList manifestList = table.store().manifestListFactory().create(); + ManifestFile manifestFile = table.store().manifestFileFactory().create(); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + Map files = new LinkedHashMap<>(); + FileEntry.mergeEntries(manifestFile, manifestList.readDataManifests(snapshot), files, null); + + List ranges = new ArrayList<>(); + for (ManifestEntry entry : files.values()) { + if (entry.file().firstRowId() != null) { + long start = entry.file().firstRowId(); + long end = start + entry.file().rowCount() - 1; + ranges.add(new long[] {start, end}); + } + } + ranges.sort(Comparator.comparingLong(r -> r[0])); + for (int i = 1; i < ranges.size(); i++) { + assertTrue( + ranges.get(i)[0] > ranges.get(i - 1)[1], + String.format( + "Row-id ranges overlap: [%d, %d] and [%d, %d]", + ranges.get(i - 1)[0], + ranges.get(i - 1)[1], + ranges.get(i)[0], + ranges.get(i)[1])); + } + } + + @Test + public void testMergeBranchSameBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, BRANCH_NAME)) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Cannot merge branch 'branch1' into itself.")); + } + + @Test + public void testMergeBranchSamePartition() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main (partition pt=0) + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write to branch with same partition pt=0 + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(0, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Merge branch into main + table.mergeBranch(BRANCH_NAME, "main"); + + // Both files coexist in the same partition + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "0|10|100|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchNonExistentBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + assertThatThrownBy(() -> table.mergeBranch("nonexistent", "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch 'nonexistent' doesn't exist.")); + } + + @Test + public void testMergeBranchMultiBucket() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.BUCKET, 2); + }); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + write.write(rowData(0, 1, 1L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 11, 110L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "0|1|1|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "1|11|110|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchNonExistentTargetBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "nonexistent")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch 'nonexistent' doesn't exist.")); + } + + @Test + public void testMergeBranchBetweenNonMainBranches() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create two branches from tag so they share the same base data + table.createTag("tag1", 1); + table.createBranch("branchA", "tag1"); + table.createBranch("branchB", "tag1"); + FileStoreTable tableA = table.switchToBranch("branchA"); + FileStoreTable tableB = table.switchToBranch("branchB"); + + // Write to branchA + try (StreamTableWrite write = tableA.newWrite(commitUser); + StreamTableCommit commit = tableA.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write to branchB + try (StreamTableWrite write = tableB.newWrite(commitUser); + StreamTableCommit commit = tableB.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + // Merge branchA into branchB + table.mergeBranch("branchA", "branchB"); + + // Reload branchB table to see changes + tableB = table.switchToBranch("branchB"); + assertThat( + getResult( + tableB.newRead(), + toSplits(tableB.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchAfterSnapshotExpiration() throws Exception { + FileStoreTable table = + createBranchMergeTable( + conf -> { + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1); + }); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + for (int i = 2; i < 5; i++) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(i, i * 10, (long) i * 100)); + commit.commit(i, write.prepareCommit(false, i + 1)); + } + } + + tableBranch.newExpireSnapshots().config(tableBranch.coreOptions().expireConfig()).expire(); + + // After expiration, baseline snapshot is gone — merge should fail + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsNonAppendHistory() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write data to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(0, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Perform INSERT OVERWRITE on branch — creates an OVERWRITE snapshot + List commitMessages; + try (BatchTableWrite write = + tableBranch.newBatchWriteBuilder().withOverwrite().newWrite()) { + write.write(rowData(0, 20, 200L)); + commitMessages = write.prepareCommit(); + } + try (BatchTableCommit commit = + tableBranch.newBatchWriteBuilder().withOverwrite().newCommit()) { + commit.commit(commitMessages); + } + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsTargetOverwrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(0, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // INSERT OVERWRITE on target (main) + List commitMessages; + try (BatchTableWrite write = table.newBatchWriteBuilder().withOverwrite().newWrite()) { + write.write(rowData(0, 20, 200L)); + commitMessages = write.prepareCommit(); + } + try (BatchTableCommit commit = table.newBatchWriteBuilder().withOverwrite().newCommit()) { + commit.commit(commitMessages); + } + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsSourceOverwrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(0, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // INSERT OVERWRITE on source (branch) + List commitMessages; + try (BatchTableWrite write = + tableBranch.newBatchWriteBuilder().withOverwrite().newWrite()) { + write.write(rowData(0, 20, 200L)); + commitMessages = write.prepareCommit(); + } + try (BatchTableCommit commit = + tableBranch.newBatchWriteBuilder().withOverwrite().newCommit()) { + commit.commit(commitMessages); + } + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsSourceExpiredSnapshots() throws Exception { + FileStoreTable table = + createBranchMergeTable( + conf -> { + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1); + }); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + for (int i = 1; i < 5; i++) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(i, i * 10, (long) i * 100)); + commit.commit(i, write.prepareCommit(false, i + 1)); + } + } + + tableBranch.newExpireSnapshots().config(tableBranch.coreOptions().expireConfig()).expire(); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsTargetExpiredSnapshots() throws Exception { + FileStoreTable table = + createBranchMergeTable( + conf -> { + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1); + }); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write multiple commits on main to allow expiration + for (int i = 2; i < 5; i++) { + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(i, i * 10, (long) i * 100)); + commit.commit(i, write.prepareCommit(false, i + 1)); + } + } + + table.newExpireSnapshots().config(table.coreOptions().expireConfig()).expire(); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchFromTagSucceeds() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergePlainBranchSucceedsWithCompleteHistory() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index 9401cce83275..80cfb975bd9e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -144,6 +144,7 @@ import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING; import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -2672,4 +2673,21 @@ protected FileStoreTable createFileStoreTable(Consumer configure, RowTy "")); return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + + @Test + public void testMergeBranchPrimaryKeyTable() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies(anyCauseMatches(IllegalArgumentException.class, "append-only tables")); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java index c74b53d3f381..109303a9288d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java @@ -69,7 +69,7 @@ void before() throws Exception { tagManager = new TagManager(fileIO, tablePath); branchManager = new FileSystemBranchManager( - fileIO, tablePath, snapshotManager, tagManager, schemaManager); + fileIO, tablePath, snapshotManager, tagManager, schemaManager, null); } @Test