From 950250c57d654368ed76a55d0a2424d71f2a9ea8 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Sun, 17 May 2026 01:01:20 +0800 Subject: [PATCH 1/5] [api] Add Table and Catalog API for branch merge Add mergeBranch(sourceBranch, targetBranch) to Table and Catalog interfaces, with default/delegate implementations. Add branch-merge.enabled @Immutable table option to CoreOptions. When enabled, the table is restricted to pure-append commits only, guaranteeing that branch merge is always safe. Add BranchMergeHandler interface and extend BranchManager with mergeBranch() and merge validation utilities. --- .../java/org/apache/paimon/CoreOptions.java | 15 ++++++++ .../paimon/catalog/AbstractCatalog.java | 6 ++++ .../org/apache/paimon/catalog/Catalog.java | 13 +++++++ .../paimon/catalog/DelegateCatalog.java | 6 ++++ .../privilege/PrivilegedFileStoreTable.java | 6 ++++ .../paimon/table/AbstractFileStoreTable.java | 13 ++++++- .../paimon/table/DelegatedFileStoreTable.java | 5 +++ .../org/apache/paimon/table/FormatTable.java | 5 +++ .../apache/paimon/table/ReadonlyTable.java | 8 +++++ .../java/org/apache/paimon/table/Table.java | 4 +++ .../apache/paimon/utils/BranchManager.java | 17 +++++++++ .../paimon/utils/BranchMergeHandler.java | 35 +++++++++++++++++++ 12 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index d5a1bb7fb067..8a7aa4387cbf 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2224,6 +2224,17 @@ public InlineElement getDescription() { .defaultValue(false) .withDescription("Whether enable data evolution for row tracking table."); + @Immutable + public static final ConfigOption BRANCH_MERGE_ENABLED = + key("branch-merge.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable branch merge for this table. " + + "When enabled, the table must keep a pure-append history: " + + "compaction and INSERT OVERWRITE are rejected, and deletion vectors " + + "are not supported. This provides the invariant required by branch merge."); + public static final ConfigOption SNAPSHOT_IGNORE_EMPTY_COMMIT = key("snapshot.ignore-empty-commit") .booleanType() @@ -3690,6 +3701,10 @@ public boolean dataEvolutionEnabled() { return options.get(DATA_EVOLUTION_ENABLED); } + public boolean branchMergeEnabled() { + return options.get(BRANCH_MERGE_ENABLED); + } + public boolean prepareCommitWaitCompaction() { if (!needLookup()) { return false; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4a8a8b1b91b6..5c27dcb8cff7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -631,6 +631,12 @@ public void fastForward(Identifier identifier, String branch) throws BranchNotEx throw new UnsupportedOperationException(); } + @Override + public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) + throws BranchNotExistException { + throw new UnsupportedOperationException(); + } + @Override public List listBranches(Identifier identifier) throws TableNotExistException { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 57fa040a2acd..2a44944615b4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -920,6 +920,19 @@ void renameBranch(Identifier identifier, String fromBranch, String toBranch) */ void fastForward(Identifier identifier, String branch) throws BranchNotExistException; + /** + * Merge source branch into target branch. + * + * @param identifier path of the table, cannot be system or branch name. + * @param sourceBranch the source branch name + * @param targetBranch the target branch name + * @throws BranchNotExistException if the source or target branch doesn't exist + * @throws UnsupportedOperationException if the catalog does not {@link + * #supportsVersionManagement()} + */ + void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) + throws BranchNotExistException; + /** * List all branches of the table. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 0f18f7d04540..e24787f9c012 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -263,6 +263,12 @@ public void fastForward(Identifier identifier, String branch) throws BranchNotEx wrapped.fastForward(identifier, branch); } + @Override + public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) + throws BranchNotExistException { + wrapped.mergeBranch(identifier, sourceBranch, targetBranch); + } + @Override public List listBranches(Identifier identifier) throws TableNotExistException { return wrapped.listBranches(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index a2c3e62c36e8..35dfd308df0a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,12 @@ public void fastForward(String branchName) { wrapped.fastForward(branchName); } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + privilegeChecker.assertCanInsert(identifier); + wrapped.mergeBranch(sourceBranch, targetBranch); + } + @Override public ExpireSnapshots newExpireSnapshots() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 522d25c7d379..60f48f4dcd02 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -58,6 +58,7 @@ import org.apache.paimon.utils.CatalogBranchManager; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.DVMetaCache; +import org.apache.paimon.utils.DefaultBranchMergeHandler; import org.apache.paimon.utils.FileSystemBranchManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; @@ -737,6 +738,11 @@ public void fastForward(String branchName) { branchManager().fastForward(branchName); } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + branchManager().mergeBranch(sourceBranch, targetBranch); + } + @Override public TagManager tagManager() { return new TagManager(fileIO, path, currentBranch(), coreOptions()); @@ -749,7 +755,12 @@ public BranchManager branchManager() { return new CatalogBranchManager(catalogEnvironment.catalogLoader(), identifier()); } return new FileSystemBranchManager( - fileIO, path, snapshotManager(), tagManager(), schemaManager()); + fileIO, + path, + snapshotManager(), + tagManager(), + schemaManager(), + new DefaultBranchMergeHandler(this::switchToBranch)); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 1f234d81ff6d..7ba4bc20a9d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -295,6 +295,11 @@ public void fastForward(String branchName) { wrapped.fastForward(branchName); } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + wrapped.mergeBranch(sourceBranch, targetBranch); + } + @Override public ExpireSnapshots newExpireSnapshots() { return wrapped.newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index 7f13faf75314..aee60f65ee3c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -427,6 +427,11 @@ default void fastForward(String branchName) { throw new UnsupportedOperationException(); } + @Override + default void mergeBranch(String sourceBranch, String targetBranch) { + throw new UnsupportedOperationException(); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index b685f86e3e78..8d7add5bc50f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -281,6 +281,14 @@ default void fastForward(String branchName) { this.getClass().getSimpleName())); } + @Override + default void mergeBranch(String sourceBranch, String targetBranch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support mergeBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 98c00c010141..f58259b1fcb2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -210,6 +210,10 @@ default void deleteBranches(String branchNames) { @Experimental void fastForward(String branchName); + /** Merge source branch into target branch (append-only tables only). */ + @Experimental + void mergeBranch(String sourceBranch, String targetBranch); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index c830a799bff5..085f04747ca1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -59,6 +59,8 @@ public interface BranchManager { void fastForward(String branchName); + void mergeBranch(String sourceBranch, String targetBranch); + void renameBranch(String fromBranch, String toBranch); List branches(); @@ -98,6 +100,21 @@ static void validateBranch(String branchName) { branchName); } + static void mergeValidate(String sourceBranch, String targetBranch) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(sourceBranch), + "Source branch name '%s' is blank.", + sourceBranch); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(targetBranch), + "Target branch name '%s' is blank.", + targetBranch); + checkArgument( + !sourceBranch.equals(targetBranch), + "Cannot merge branch '%s' into itself.", + sourceBranch); + } + static void fastForwardValidate(String branchName, String currentBranch) { checkArgument( !branchName.equals(DEFAULT_MAIN_BRANCH), diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java new file mode 100644 index 000000000000..bb009664da66 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.ManifestEntry; + +import java.util.List; +import java.util.Map; + +/** Handler for branch merge data operations (manifest reading, committing). */ +public interface BranchMergeHandler { + + /** Read all active data files from the given branch. */ + Map readBranchFiles(String branch); + + /** Commit the given files to the target branch. */ + void commit(String targetBranch, List filesToMerge); +} From f77c77e62715292fbc4f34e7d306eb7f3c80d774 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Sun, 17 May 2026 01:01:39 +0800 Subject: [PATCH 2/5] [core] Implement branch merge for append-only tables Implement FileSystemBranchManager.mergeBranch() with: - Symmetric branch-merge.enabled validation on both branches - Append-only table validation (no primary keys) - Row-tracking consistency check between branches - Schema compatibility validation (fields and schema history) - Active-file diff to find branch-only files - DefaultBranchMergeHandler for committing merged files Enforce pure-append invariant when branch-merge.enabled=true: - SchemaValidation rejects primary keys, deletion vectors, and data evolution at table creation time - FileStoreCommitImpl rejects compaction and INSERT OVERWRITE at commit time Add Catalog and REST API support (CatalogBranchManager, RESTCatalog, POST /branches/merge endpoint). Add comprehensive tests covering positive merge, row-tracking, snapshot expiration, schema conflicts, and negative cases. Add branch merge documentation and REST OpenAPI spec. --- docs/generated/core_configuration.html | 6 + docs/static/rest-catalog-open-api.yaml | 54 +- .../java/org/apache/paimon/rest/RESTApi.java | 20 + .../org/apache/paimon/rest/ResourcePaths.java | 12 + .../rest/requests/MergeBranchRequest.java | 58 ++ .../org/apache/paimon/io/DataFileMeta.java | 2 + .../apache/paimon/io/PojoDataFileMeta.java | 25 + .../paimon/operation/FileStoreCommitImpl.java | 19 + .../org/apache/paimon/rest/RESTCatalog.java | 12 + .../paimon/schema/SchemaValidation.java | 25 + .../paimon/utils/CatalogBranchManager.java | 9 + .../utils/DefaultBranchMergeHandler.java | 146 +++ .../paimon/utils/FileSystemBranchManager.java | 173 +++- .../apache/paimon/rest/RESTCatalogServer.java | 39 + .../apache/paimon/rest/RESTCatalogTest.java | 37 + .../table/AppendOnlySimpleTableTest.java | 873 ++++++++++++++++++ .../table/PrimaryKeySimpleTableTest.java | 18 + .../utils/FileSystemBranchManagerTest.java | 2 +- 18 files changed, 1527 insertions(+), 3 deletions(-) create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/requests/MergeBranchRequest.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index c11941ea27db..cc9e47157d45 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -92,6 +92,12 @@ MemorySize Target size of a blob file. Default is value of TARGET_FILE_SIZE. + +
branch-merge.enabled
+ false + Boolean + Whether to enable branch merge for this table. When enabled, the table must keep a pure-append history: compaction and INSERT OVERWRITE are rejected, and deletion vectors are not supported. This provides the invariant required by branch merge. +
bucket
-1 diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index ba4c8bed4b75..1b6cbfc86c7a 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -1267,6 +1267,48 @@ paths: $ref: '#/components/responses/BranchNotExistErrorResponse' "500": $ref: '#/components/responses/ServerErrorResponse' + /v1/{prefix}/databases/{database}/tables/{table}/branches/merge: + post: + tags: + - branch + summary: Merge branch + description: >- + Merge data files from source branch into target branch. + The table must be created with 'branch-merge.enabled' = 'true'. This option + enforces a pure-append table history by rejecting compaction and INSERT OVERWRITE, + and it is incompatible with deletion vectors. Requires an append-only table with + consistent row-tracking settings and compatible schema history. + operationId: mergeBranch + parameters: + - name: prefix + in: path + required: true + schema: + type: string + - name: database + in: path + required: true + schema: + type: string + - name: table + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/MergeBranchRequest' + responses: + "200": + description: Success, no content + "401": + $ref: '#/components/responses/UnauthorizedErrorResponse' + "404": + $ref: '#/components/responses/BranchNotExistErrorResponse' + "500": + $ref: '#/components/responses/ServerErrorResponse' /v1/{prefix}/databases/{database}/tables/{table}/tags: get: tags: @@ -3304,6 +3346,16 @@ components: properties: branch: type: string + MergeBranchRequest: + type: object + required: + - sourceBranch + - targetBranch + properties: + sourceBranch: + type: string + targetBranch: + type: string ListBranchesResponse: type: object properties: @@ -3615,4 +3667,4 @@ components: securitySchemes: BearerAuth: type: http - scheme: bearer \ No newline at end of file + scheme: bearer diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index 7433a30388f8..dbfb2f77c261 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -48,6 +48,7 @@ import org.apache.paimon.rest.requests.ForwardBranchRequest; import org.apache.paimon.rest.requests.ListPartitionsByNamesRequest; import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; +import org.apache.paimon.rest.requests.MergeBranchRequest; import org.apache.paimon.rest.requests.RegisterTableRequest; import org.apache.paimon.rest.requests.RenameBranchRequest; import org.apache.paimon.rest.requests.RenameTableRequest; @@ -1025,6 +1026,25 @@ public void fastForward(Identifier identifier, String branch) { restAuthFunction); } + /** + * Merge branch for table. + * + * @param identifier database name and table name. + * @param sourceBranch source branch name + * @param targetBranch target branch name + * @throws NoSuchResourceException Exception thrown on HTTP 404 means the branch or table not + * exists + * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for + * this table + */ + public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) { + MergeBranchRequest request = new MergeBranchRequest(sourceBranch, targetBranch); + client.post( + resourcePaths.mergeBranch(identifier.getDatabaseName(), identifier.getObjectName()), + request, + restAuthFunction); + } + /** * List branches for table. * diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 66a1653232b8..2b00c404977c 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -286,6 +286,18 @@ public String renameBranch(String databaseName, String tableName, String branch) "rename"); } + public String mergeBranch(String databaseName, String tableName) { + return SLASH.join( + V1, + prefix, + DATABASES, + encodeString(databaseName), + TABLES, + encodeString(tableName), + BRANCHES, + "merge"); + } + public String tags(String databaseName, String objectName) { return SLASH.join( V1, diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/MergeBranchRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/MergeBranchRequest.java new file mode 100644 index 000000000000..f011086e3699 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/requests/MergeBranchRequest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.rest.RESTRequest; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Request for merging branch. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class MergeBranchRequest implements RESTRequest { + + private static final String FIELD_SOURCE_BRANCH = "sourceBranch"; + private static final String FIELD_TARGET_BRANCH = "targetBranch"; + + @JsonProperty(FIELD_SOURCE_BRANCH) + private final String sourceBranch; + + @JsonProperty(FIELD_TARGET_BRANCH) + private final String targetBranch; + + @JsonCreator + public MergeBranchRequest( + @JsonProperty(FIELD_SOURCE_BRANCH) String sourceBranch, + @JsonProperty(FIELD_TARGET_BRANCH) String targetBranch) { + this.sourceBranch = sourceBranch; + this.targetBranch = targetBranch; + } + + @JsonGetter(FIELD_SOURCE_BRANCH) + public String sourceBranch() { + return sourceBranch; + } + + @JsonGetter(FIELD_TARGET_BRANCH) + public String targetBranch() { + return targetBranch; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index a7a4de0b5cbe..c06a8f2422da 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -329,6 +329,8 @@ default Range nonNullRowIdRange() { DataFileMeta assignFirstRowId(long firstRowId); + DataFileMeta clearFirstRowId(); + default List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(this)); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java index 5003974e8ead..cbb701ec531b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java @@ -379,6 +379,31 @@ public PojoDataFileMeta assignFirstRowId(long firstRowId) { writeCols); } + @Override + public PojoDataFileMeta clearFirstRowId() { + return new PojoDataFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource, + valueStatsCols, + externalPath, + null, + writeCols); + } + @Override public PojoDataFileMeta copy(List newExtraFiles) { return new PojoDataFileMeta( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 0032d69adfab..bb55a0b2b072 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -301,6 +301,9 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { int attempts = 0; ManifestEntryChanges changes = collectChanges(committable.fileCommittables()); + if (options.branchMergeEnabled()) { + validatePureAppendCommit(changes); + } try { List appendSimpleEntries = SimpleFileEntry.from(changes.appendTableFiles); @@ -431,6 +434,13 @@ public int overwritePartition( properties); } + if (options.branchMergeEnabled()) { + throw new UnsupportedOperationException( + String.format( + "INSERT OVERWRITE is not allowed when %s is true.", + CoreOptions.BRANCH_MERGE_ENABLED.key())); + } + long started = System.nanoTime(); int generatedSnapshot = 0; int attempts = 0; @@ -689,6 +699,15 @@ private ManifestEntryChanges collectChanges(List commitMessages) return changes; } + private void validatePureAppendCommit(ManifestEntryChanges changes) { + checkArgument( + changes.compactTableFiles.isEmpty() + && changes.compactChangelog.isEmpty() + && changes.compactIndexFiles.isEmpty(), + "Compaction is not allowed when %s is true.", + CoreOptions.BRANCH_MERGE_ENABLED.key()); + } + private int tryCommit( CommitChangesProvider changesProvider, long identifier, diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 7e2f6cfd2743..e617f31dd287 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -783,6 +783,18 @@ public void fastForward(Identifier identifier, String branch) throws BranchNotEx } } + @Override + public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) + throws BranchNotExistException { + try { + api.mergeBranch(identifier, sourceBranch, targetBranch); + } catch (NoSuchResourceException e) { + throw new BranchNotExistException(identifier, e.resourceName(), e); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); + } + } + @Override public List listBranches(Identifier identifier) throws TableNotExistException { try { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 4ffc3ec0259e..713b945db323 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -307,6 +307,8 @@ public static void validateTableSchema(TableSchema schema) { validateRowTracking(schema, options); + validateBranchMerge(schema, options); + validateIncrementalClustering(schema, options); validateChainTable(schema, options); @@ -794,6 +796,29 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) } } + private static void validateBranchMerge(TableSchema schema, CoreOptions options) { + if (!options.branchMergeEnabled()) { + return; + } + + checkArgument( + schema.primaryKeys().isEmpty(), + "%s is not supported for tables with primary keys.", + CoreOptions.BRANCH_MERGE_ENABLED.key()); + + checkArgument( + !options.deletionVectorsEnabled(), + "%s is incompatible with %s.", + CoreOptions.BRANCH_MERGE_ENABLED.key(), + CoreOptions.DELETION_VECTORS_ENABLED.key()); + + checkArgument( + !options.dataEvolutionEnabled(), + "%s is incompatible with %s.", + CoreOptions.BRANCH_MERGE_ENABLED.key(), + CoreOptions.DATA_EVOLUTION_ENABLED.key()); + } + private static void validateBlobFields(RowType rowType, CoreOptions options) { Set blobFieldNames = rowType.getFields().stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java index 4ba9c39475ae..b2906544ada8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java @@ -108,6 +108,15 @@ public void fastForward(String branchName) { }); } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + executePost( + catalog -> { + BranchManager.mergeValidate(sourceBranch, targetBranch); + catalog.mergeBranch(identifier, sourceBranch, targetBranch); + }); + } + @Override public void renameBranch(String fromBranch, String toBranch) { executePost(catalog -> catalog.renameBranch(identifier, fromBranch, toBranch)); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java b/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java new file mode 100644 index 000000000000..f7e3cfa76b3b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.function.Function; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Default implementation of {@link BranchMergeHandler} backed by {@link FileStoreTable}. */ +public class DefaultBranchMergeHandler implements BranchMergeHandler { + + private final Function branchTableFactory; + + public DefaultBranchMergeHandler(Function branchTableFactory) { + this.branchTableFactory = branchTableFactory; + } + + @Override + public Map readBranchFiles(String branch) { + FileStoreTable branchTable = branchTableFactory.apply(branch); + Snapshot snapshot = branchTable.snapshotManager().latestSnapshot(); + checkArgument( + snapshot != null, + "Cannot read branch '%s', because it does not have any snapshot.", + branch); + ManifestList manifestList = branchTable.store().manifestListFactory().create(); + ManifestFile manifestFile = branchTable.store().manifestFileFactory().create(); + Map files = new LinkedHashMap<>(); + FileEntry.mergeEntries(manifestFile, manifestList.readDataManifests(snapshot), files, null); + return files; + } + + @Override + public void commit(String targetBranch, List filesToMerge) { + FileStoreTable branchTable = branchTableFactory.apply(targetBranch); + boolean rowTrackingEnabled = + new CoreOptions(branchTable.schema().options()).rowTrackingEnabled(); + + Map> grouped = new LinkedHashMap<>(); + for (ManifestEntry entry : filesToMerge) { + DataFileMeta file = prepareFileForTargetCommit(entry.file(), rowTrackingEnabled); + grouped.computeIfAbsent( + new MergeKey( + entry.partition().copy(), entry.bucket(), entry.totalBuckets()), + k -> new ArrayList<>()) + .add(file); + } + + String commitUser = UUID.randomUUID().toString(); + ManifestCommittable committable = new ManifestCommittable(0); + for (Map.Entry> e : grouped.entrySet()) { + MergeKey key = e.getKey(); + CommitMessageImpl message = + new CommitMessageImpl( + key.partition, + key.bucket, + key.totalBuckets, + new DataIncrement( + e.getValue(), Collections.emptyList(), Collections.emptyList()), + CompactIncrement.emptyIncrement()); + committable.addFileCommittable(message); + } + + try (FileStoreCommit commit = branchTable.store().newCommit(commitUser, branchTable)) { + commit.appendCommitCheckConflict(true).commit(committable, true); + } + } + + private DataFileMeta prepareFileForTargetCommit(DataFileMeta file, boolean rowTrackingEnabled) { + if (rowTrackingEnabled && file.firstRowId() != null) { + // Source files already have row ids assigned in their branch. Clear them so the + // target branch commit path assigns fresh, non-overlapping row ids. + return file.clearFirstRowId(); + } + return file; + } + + private static class MergeKey { + final BinaryRow partition; + final int bucket; + final int totalBuckets; + + MergeKey(BinaryRow partition, int bucket, int totalBuckets) { + this.partition = partition; + this.bucket = bucket; + this.totalBuckets = totalBuckets; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MergeKey)) { + return false; + } + MergeKey that = (MergeKey) o; + return bucket == that.bucket + && totalBuckets == that.totalBuckets + && Objects.equals(partition, that.partition); + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket, totalBuckets); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java index 9c09637d1c37..ed4c7444f0e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java @@ -18,9 +18,13 @@ package org.apache.paimon.utils; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.tag.Tag; @@ -29,8 +33,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -47,18 +54,21 @@ public class FileSystemBranchManager implements BranchManager { private final SnapshotManager snapshotManager; private final TagManager tagManager; private final SchemaManager schemaManager; + private final BranchMergeHandler mergeHandler; public FileSystemBranchManager( FileIO fileIO, Path path, SnapshotManager snapshotManager, TagManager tagManager, - SchemaManager schemaManager) { + SchemaManager schemaManager, + BranchMergeHandler mergeHandler) { this.fileIO = fileIO; this.tablePath = path; this.snapshotManager = snapshotManager; this.tagManager = tagManager; this.schemaManager = schemaManager; + this.mergeHandler = mergeHandler; } /** Return the root Directory of branch. */ @@ -210,6 +220,167 @@ public void fastForward(String branchName) { } } + @Override + public void mergeBranch(String sourceBranch, String targetBranch) { + BranchManager.mergeValidate(sourceBranch, targetBranch); + validateMergeBranches(sourceBranch, targetBranch); + validateBranchMergeEnabled(sourceBranch, targetBranch); + validateAppendOnly(sourceBranch, targetBranch); + validateNoDataEvolution(sourceBranch, targetBranch); + validateRowTrackingConsistent(sourceBranch, targetBranch); + validateLatestSchema(sourceBranch, targetBranch); + + List filesToMerge = computeMergeDiff(sourceBranch, targetBranch); + if (filesToMerge.isEmpty()) { + return; + } + + validateMergeFileSchemas(sourceBranch, targetBranch, filesToMerge); + mergeHandler.commit(targetBranch, filesToMerge); + } + + private void validateMergeBranches(String sourceBranch, String targetBranch) { + if (!BranchManager.isMainBranch(sourceBranch)) { + checkArgument(branchExists(sourceBranch), "Branch '%s' doesn't exist.", sourceBranch); + } + if (!BranchManager.isMainBranch(targetBranch)) { + checkArgument(branchExists(targetBranch), "Branch '%s' doesn't exist.", targetBranch); + } + + SnapshotManager sourceSm = snapshotManager.copyWithBranch(sourceBranch); + checkArgument( + sourceSm.latestSnapshotId() != null, + "Cannot merge branch '%s', because it does not have snapshot.", + sourceBranch); + + SnapshotManager targetSm = snapshotManager.copyWithBranch(targetBranch); + checkArgument( + targetSm.latestSnapshotId() != null, + "Cannot merge into branch '%s', because it does not have snapshot.", + targetBranch); + } + + private void validateLatestSchema(String sourceBranch, String targetBranch) { + SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, sourceBranch); + SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, targetBranch); + TableSchema sourceSchema = sourceSchemaMgr.latest().get(); + TableSchema targetSchema = targetSchemaMgr.latest().get(); + checkArgument( + sourceSchema.fields().equals(targetSchema.fields()), + "Cannot merge branch '%s' into '%s', schema mismatch.", + sourceBranch, + targetBranch); + } + + private void validateMergeFileSchemas( + String sourceBranch, String targetBranch, List filesToMerge) { + SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, sourceBranch); + SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, targetBranch); + + for (Long schemaId : + filesToMerge.stream() + .map(entry -> entry.file().schemaId()) + .collect(Collectors.toSet())) { + checkArgument( + targetSchemaMgr.schemaExists(schemaId), + "Cannot merge branch '%s' into '%s', because target branch does not contain " + + "schema id %s required by source files.", + sourceBranch, + targetBranch, + schemaId); + + TableSchema sourceSchema = sourceSchemaMgr.schema(schemaId); + TableSchema targetSchema = targetSchemaMgr.schema(schemaId); + checkArgument( + sourceSchema.equals(targetSchema), + "Cannot merge branch '%s' into '%s', schema history mismatch for schema id %s.", + sourceBranch, + targetBranch, + schemaId); + } + } + + private void validateAppendOnly(String sourceBranch, String targetBranch) { + SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, sourceBranch); + TableSchema sourceSchema = sourceSchemaMgr.latest().get(); + checkArgument( + sourceSchema.primaryKeys().isEmpty(), + "Branch merge is only supported for append-only tables, " + + "but branch '%s' has primary keys.", + sourceBranch); + + SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, targetBranch); + TableSchema targetSchema = targetSchemaMgr.latest().get(); + checkArgument( + targetSchema.primaryKeys().isEmpty(), + "Branch merge is only supported for append-only tables, " + + "but branch '%s' has primary keys.", + targetBranch); + } + + private void validateBranchMergeEnabled(String sourceBranch, String targetBranch) { + for (String branch : new String[] {sourceBranch, targetBranch}) { + SchemaManager sm = new SchemaManager(fileIO, tablePath, branch); + TableSchema schema = sm.latest().get(); + CoreOptions opts = new CoreOptions(schema.options()); + checkArgument( + opts.branchMergeEnabled(), + "Branch merge requires '%s' to be true (branch '%s').", + CoreOptions.BRANCH_MERGE_ENABLED.key(), + branch); + } + } + + private void validateNoDataEvolution(String sourceBranch, String targetBranch) { + for (String branch : new String[] {sourceBranch, targetBranch}) { + SchemaManager sm = new SchemaManager(fileIO, tablePath, branch); + TableSchema schema = sm.latest().get(); + CoreOptions opts = new CoreOptions(schema.options()); + checkArgument( + !opts.dataEvolutionEnabled(), + "Branch merge is not supported for data-evolution tables (branch '%s').", + branch); + } + } + + private void validateRowTrackingConsistent(String sourceBranch, String targetBranch) { + boolean sourceEnabled = isRowTrackingEnabled(sourceBranch); + boolean targetEnabled = isRowTrackingEnabled(targetBranch); + checkArgument( + sourceEnabled == targetEnabled, + "Cannot merge branch '%s' into '%s': row-tracking settings must match " + + "(source=%s, target=%s).", + sourceBranch, + targetBranch, + sourceEnabled, + targetEnabled); + } + + private boolean isRowTrackingEnabled(String branch) { + SchemaManager schemaManager = new SchemaManager(fileIO, tablePath, branch); + TableSchema schema = schemaManager.latest().get(); + return new CoreOptions(schema.options()).rowTrackingEnabled(); + } + + private List computeMergeDiff(String sourceBranch, String targetBranch) { + Set targetFileIds = + mergeHandler.readBranchFiles(targetBranch).keySet(); + + Map sourceFiles = + mergeHandler.readBranchFiles(sourceBranch); + + List filesToMerge = new ArrayList<>(); + for (Map.Entry entry : sourceFiles.entrySet()) { + if (!targetFileIds.contains(entry.getKey())) { + ManifestEntry manifestEntry = entry.getValue(); + if (manifestEntry.kind() == FileKind.ADD) { + filesToMerge.add(manifestEntry); + } + } + } + return filesToMerge; + } + /** Check if a branch exists. */ public boolean branchExists(String branchName) { Path branchPath = branchPath(branchName); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 74e4f4e465a9..76e51fea0afd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -61,6 +61,7 @@ import org.apache.paimon.rest.requests.CreateViewRequest; import org.apache.paimon.rest.requests.ListPartitionsByNamesRequest; import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; +import org.apache.paimon.rest.requests.MergeBranchRequest; import org.apache.paimon.rest.requests.RenameBranchRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.requests.ReplaceTableRequest; @@ -1924,6 +1925,44 @@ private MockResponse branchApiHandle( identifier.getFullName(), tableLatestSnapshotStore.get(branchIdentifier.getFullName())); } + } else if (resources.length == 5 && "merge".equals(resources[4])) { + // Merge branch: /branches/merge + MergeBranchRequest mergeRequest = + RESTApi.fromJson(data, MergeBranchRequest.class); + try { + table.mergeBranch( + mergeRequest.sourceBranch(), mergeRequest.targetBranch()); + } catch (Exception e) { + String msg = e.getMessage(); + if (msg != null && msg.contains("doesn't exist")) { + String branchName = + msg.contains(mergeRequest.sourceBranch()) + ? mergeRequest.sourceBranch() + : mergeRequest.targetBranch(); + response = + new ErrorResponse( + ErrorResponse.RESOURCE_TYPE_BRANCH, + branchName, + msg, + 404); + return mockResponse(response, 404); + } + throw e; + } + String targetBranchName = mergeRequest.targetBranch(); + Identifier targetIdentifier = + BranchManager.isMainBranch(targetBranchName) + ? identifier + : new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + targetBranchName); + FileStoreTable targetTable = + (FileStoreTable) catalog.getTable(targetIdentifier); + Snapshot targetSnapshot = targetTable.snapshotManager().latestSnapshot(); + tableLatestSnapshotStore.put( + targetIdentifier.getFullName(), + new TableSnapshot(targetSnapshot, 0, 0, 0, 0)); } else { CreateBranchRequest requestBody = RESTApi.fromJson(data, CreateBranchRequest.class); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 8cefa2ec7b42..f9bf8a0e7bf8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -374,6 +374,9 @@ void testApiWhenTableNoPermission() throws Exception { assertThrows( Catalog.TableNoPermissionException.class, () -> restCatalog.fastForward(identifier, "test_branch")); + assertThrows( + Catalog.TableNoPermissionException.class, + () -> restCatalog.mergeBranch(identifier, "test_branch", "main")); assertThrows(ForbiddenException.class, () -> restCatalog.api().loadTableToken(identifier)); assertThrows( Catalog.TableNoPermissionException.class, @@ -2134,6 +2137,37 @@ public void testBranchBatchRecordsWrite() throws Exception { .containsExactlyInAnyOrder("+I[5]", "+I[12]", "+I[18]", "+I[2]", "+I[1]", "+I[9]"); } + @Test + public void testMergeBranch() throws Exception { + Identifier tableIdentifier = Identifier.create("merge_db", "merge_table"); + Map options = Maps.newHashMap(); + options.put("branch-merge.enabled", "true"); + createTable(tableIdentifier, options, Collections.emptyList()); + FileStoreTable mainTable = (FileStoreTable) catalog.getTable(tableIdentifier); + batchWrite(mainTable, Lists.newArrayList(1, 2, 3)); + + mainTable = (FileStoreTable) catalog.getTable(tableIdentifier); + mainTable.createTag("tag1", 1); + restCatalog.createBranch(tableIdentifier, "branch1", "tag1"); + + Identifier branchIdentifier = new Identifier("merge_db", "merge_table", "branch1"); + FileStoreTable branchTable = (FileStoreTable) catalog.getTable(branchIdentifier); + batchWrite(branchTable, Lists.newArrayList(7, 8, 9)); + + // Write more data to main (this data should not be in branch) + mainTable = (FileStoreTable) catalog.getTable(tableIdentifier); + batchWrite(mainTable, Lists.newArrayList(4, 5, 6)); + + restCatalog.mergeBranch(tableIdentifier, "branch1", "main"); + + mainTable = (FileStoreTable) catalog.getTable(tableIdentifier); + List result = batchRead(mainTable); + assertThat(result) + .containsExactlyInAnyOrder( + "+I[1]", "+I[2]", "+I[3]", "+I[4]", "+I[5]", "+I[6]", "+I[7]", "+I[8]", + "+I[9]"); + } + @Test void testBranches() throws Exception { String databaseName = "testBranchTable"; @@ -2187,6 +2221,9 @@ void testBranches() throws Exception { assertThrows( Catalog.BranchNotExistException.class, () -> restCatalog.fastForward(identifier, "no_exist_branch")); + assertThrows( + Catalog.BranchNotExistException.class, + () -> restCatalog.mergeBranch(identifier, "no_exist_branch", "main")); assertThat(restCatalog.listBranches(identifier)).isEmpty(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index 30ccc652a0c9..3919eecf5def 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.bucket.DefaultBucketFunction; import org.apache.paimon.data.BinaryRow; @@ -37,6 +38,8 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.BundleRecords; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; @@ -73,6 +76,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DefaultBranchMergeHandler; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat; @@ -86,8 +90,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -117,6 +123,7 @@ import static org.apache.paimon.io.DataFileTestUtils.row; import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1699,4 +1706,870 @@ protected FileStoreTable createUnawareBucketFileStoreTable( "")); return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + + protected FileStoreTable createBranchMergeTable() throws Exception { + return createFileStoreTable(conf -> conf.set(CoreOptions.BRANCH_MERGE_ENABLED, true)); + } + + protected FileStoreTable createBranchMergeTable(Consumer extraConfigure) + throws Exception { + return createFileStoreTable( + conf -> { + conf.set(CoreOptions.BRANCH_MERGE_ENABLED, true); + extraConfigure.accept(conf); + }); + } + + @Test + public void testMergeBranch() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write data to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write more data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + // Merge branch into main + table.mergeBranch(BRANCH_NAME, "main"); + + // Verify main has data from both sides + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchMultipleTimes() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // First write to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // First merge + table.mergeBranch(BRANCH_NAME, "main"); + + // Second write to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 3)); + } + + // Second merge + table.mergeBranch(BRANCH_NAME, "main"); + + // Verify no duplicates: main has all 3 rows exactly once + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchFailsOnStaleDuplicateCommit() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + DefaultBranchMergeHandler handler = new DefaultBranchMergeHandler(table::switchToBranch); + Map sourceFiles = + handler.readBranchFiles(BRANCH_NAME); + Map targetFiles = + handler.readBranchFiles("main"); + List filesToMerge = + sourceFiles.entrySet().stream() + .filter(entry -> !targetFiles.containsKey(entry.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + + handler.commit("main", filesToMerge); + assertThatThrownBy(() -> handler.commit("main", filesToMerge)) + .satisfies(anyCauseMatches(RuntimeException.class, "Trying to add file")); + } + + @Test + public void testMergeBranchBidirectional() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write shared data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + // Merge branch -> main + table.mergeBranch(BRANCH_NAME, "main"); + + // Merge main -> branch + table.mergeBranch("main", BRANCH_NAME); + + // Verify both have the same data without duplicates + List mainData = + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING); + List branchData = + getResult( + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING); + + assertThat(mainData) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(branchData).containsExactlyInAnyOrderElementsOf(mainData); + } + + @Test + public void testMergeBranchEmptyDiff() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch from tag (has same data as main) + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + // Merge should be a no-op (no exception, no new snapshot) + long snapshotBefore = table.snapshotManager().latestSnapshotId(); + table.mergeBranch(BRANCH_NAME, "main"); + long snapshotAfter = table.snapshotManager().latestSnapshotId(); + assertThat(snapshotAfter).isEqualTo(snapshotBefore); + } + + @Test + public void testMergeBranchSchemaConflict() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + // Modify schema on main (add a column) + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + schemaManager.commitChanges(SchemaChange.addColumn("new_col", DataTypes.INT())); + + // Merge should fail due to schema mismatch + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Cannot merge branch 'branch1' into 'main', schema mismatch.")); + } + + @Test + public void testMergeBranchSchemaHistoryConflict() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + SchemaManager branchSchemaManager = + new SchemaManager(table.fileIO(), table.location(), BRANCH_NAME); + branchSchemaManager.commitChanges(SchemaChange.addColumn("source_col", DataTypes.INT())); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + try (BatchTableWrite write = + tableBranch.newBatchWriteBuilder().newWrite().withWriteType(ROW_TYPE); + BatchTableCommit commit = tableBranch.newBatchWriteBuilder().newCommit()) { + write.write(rowData(1, 10, 100L)); + commit.commit(write.prepareCommit()); + } + branchSchemaManager.commitChanges( + Collections.singletonList(SchemaChange.dropColumn("source_col"))); + + SchemaManager mainSchemaManager = new SchemaManager(table.fileIO(), table.location()); + mainSchemaManager.commitChanges(SchemaChange.addColumn("target_col", DataTypes.INT())); + mainSchemaManager.commitChanges( + Collections.singletonList(SchemaChange.dropColumn("target_col"))); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "schema history mismatch for schema id 1")); + } + + @Test + public void testMergeBranchRowTrackingTable() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + assertRowIdRangesNonOverlapping(table); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(3L); + } + + @Test + public void testMergeBranchRowTrackingMultipleTimes() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // First write to branch + merge + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + table.mergeBranch(BRANCH_NAME, "main"); + + // Second write to branch + merge + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 3)); + } + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + assertRowIdRangesNonOverlapping(table); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(3L); + } + + @Test + public void testMergeBranchRowTrackingAfterTargetWrites() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write 2 rows to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 11, 101L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write 3 rows to main independently (advances main nextRowId) + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + write.write(rowData(2, 21, 201L)); + write.write(rowData(2, 22, 202L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .hasSize(6); + + assertRowIdRangesNonOverlapping(table); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(6L); + } + + @Test + public void testMergeBranchRowTrackingBetweenNonMainBranches() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch("branchA", "tag1"); + table.createBranch("branchB", "tag1"); + FileStoreTable tableA = table.switchToBranch("branchA"); + FileStoreTable tableB = table.switchToBranch("branchB"); + + try (StreamTableWrite write = tableA.newWrite(commitUser); + StreamTableCommit commit = tableA.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + try (StreamTableWrite write = tableB.newWrite(commitUser); + StreamTableCommit commit = tableB.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + table.mergeBranch("branchA", "branchB"); + + tableB = table.switchToBranch("branchB"); + assertThat( + getResult( + tableB.newRead(), + toSplits(tableB.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + assertRowIdRangesNonOverlapping(tableB); + Snapshot snapshot = tableB.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(3L); + } + + @Test + public void testMergeBranchRowTrackingMismatch() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Directly write a new schema to the branch with row-tracking disabled + SchemaManager branchSchemaManager = + new SchemaManager(table.fileIO(), table.location(), BRANCH_NAME); + TableSchema branchSchema = branchSchemaManager.latest().get(); + Map newOptions = new HashMap<>(branchSchema.options()); + newOptions.remove("row-tracking.enabled"); + TableSchema mismatchedSchema = + new TableSchema( + branchSchema.version(), + branchSchema.id() + 1, + branchSchema.fields(), + branchSchema.highestFieldId(), + branchSchema.partitionKeys(), + branchSchema.primaryKeys(), + newOptions, + branchSchema.comment(), + branchSchema.timeMillis()); + branchSchemaManager.commit(mismatchedSchema); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "row-tracking settings must match")); + } + + @Test + public void testMergeBranchRowTrackingStaleMerge() throws Exception { + FileStoreTable table = + createBranchMergeTable( + options -> options.set(CoreOptions.ROW_TRACKING_ENABLED, true)); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write to main multiple times to advance nextRowId + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(3, 30, 300L)); + write.write(rowData(3, 31, 301L)); + commit.commit(3, write.prepareCommit(false, 3)); + } + + // Merge: branch file should get firstRowId after all main files + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .hasSize(5); + + assertRowIdRangesNonOverlapping(table); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + assertThat(snapshot.nextRowId()).isEqualTo(5L); + + // Write more to main, then merge again (branch has no new data, should be no-op) + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(4, 40, 400L)); + commit.commit(4, write.prepareCommit(false, 4)); + } + + long snapshotIdBefore = table.snapshotManager().latestSnapshotId(); + table.mergeBranch(BRANCH_NAME, "main"); + long snapshotIdAfter = table.snapshotManager().latestSnapshotId(); + + // Second merge should be no-op (branch file already in target) + assertThat(snapshotIdAfter).isEqualTo(snapshotIdBefore); + + assertRowIdRangesNonOverlapping(table); + Snapshot finalSnapshot = table.snapshotManager().latestSnapshot(); + assertThat(finalSnapshot.nextRowId()).isEqualTo(6L); + } + + private void assertRowIdRangesNonOverlapping(FileStoreTable table) { + ManifestList manifestList = table.store().manifestListFactory().create(); + ManifestFile manifestFile = table.store().manifestFileFactory().create(); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + Map files = new LinkedHashMap<>(); + FileEntry.mergeEntries(manifestFile, manifestList.readDataManifests(snapshot), files, null); + + List ranges = new ArrayList<>(); + for (ManifestEntry entry : files.values()) { + if (entry.file().firstRowId() != null) { + long start = entry.file().firstRowId(); + long end = start + entry.file().rowCount() - 1; + ranges.add(new long[] {start, end}); + } + } + ranges.sort(Comparator.comparingLong(r -> r[0])); + for (int i = 1; i < ranges.size(); i++) { + assertTrue( + ranges.get(i)[0] > ranges.get(i - 1)[1], + String.format( + "Row-id ranges overlap: [%d, %d] and [%d, %d]", + ranges.get(i - 1)[0], + ranges.get(i - 1)[1], + ranges.get(i)[0], + ranges.get(i)[1])); + } + } + + @Test + public void testMergeBranchSameBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, BRANCH_NAME)) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Cannot merge branch 'branch1' into itself.")); + } + + @Test + public void testMergeBranchSamePartition() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + // Write data to main (partition pt=0) + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create branch + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write to branch with same partition pt=0 + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(0, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Merge branch into main + table.mergeBranch(BRANCH_NAME, "main"); + + // Both files coexist in the same partition + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "0|10|100|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchNonExistentBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + assertThatThrownBy(() -> table.mergeBranch("nonexistent", "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch 'nonexistent' doesn't exist.")); + } + + @Test + public void testMergeBranchMultiBucket() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CoreOptions.BUCKET, 2); + conf.set(CoreOptions.BRANCH_MERGE_ENABLED, true); + }); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + write.write(rowData(0, 1, 1L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 11, 110L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "0|1|1|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "1|11|110|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchNonExistentTargetBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "nonexistent")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch 'nonexistent' doesn't exist.")); + } + + @Test + public void testMergeBranchBetweenNonMainBranches() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + // Create two branches from tag so they share the same base data + table.createTag("tag1", 1); + table.createBranch("branchA", "tag1"); + table.createBranch("branchB", "tag1"); + FileStoreTable tableA = table.switchToBranch("branchA"); + FileStoreTable tableB = table.switchToBranch("branchB"); + + // Write to branchA + try (StreamTableWrite write = tableA.newWrite(commitUser); + StreamTableCommit commit = tableA.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Write to branchB + try (StreamTableWrite write = tableB.newWrite(commitUser); + StreamTableCommit commit = tableB.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + // Merge branchA into branchB + table.mergeBranch("branchA", "branchB"); + + // Reload branchB table to see changes + tableB = table.switchToBranch("branchB"); + assertThat( + getResult( + tableB.newRead(), + toSplits(tableB.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testMergeBranchWithoutBranchMergeEnabled() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = createBranchTable(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies(anyCauseMatches(IllegalArgumentException.class, "branch-merge.enabled")); + } + + @Test + public void testMergeBranchAfterSnapshotExpiration() throws Exception { + FileStoreTable table = + createBranchMergeTable( + conf -> { + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1); + }); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + for (int i = 2; i < 5; i++) { + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(i, i * 10, (long) i * 100)); + commit.commit(i, write.prepareCommit(false, i + 1)); + } + } + + table.newExpireSnapshots().config(table.coreOptions().expireConfig()).expire(); + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset", + "3|30|300|binary|varbinary|mapKey:mapVal|multiset", + "4|40|400|binary|varbinary|mapKey:mapVal|multiset"); + } + + @Test + public void testBranchMergeEnabledRejectsOverwrite() throws Exception { + FileStoreTable table = createBranchMergeTable(); + + List commitMessages; + try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite()) { + write.write(rowData(1, 10, 100L)); + commitMessages = write.prepareCommit(); + } + + List finalMessages = commitMessages; + try (BatchTableCommit commit = table.newBatchWriteBuilder().withOverwrite().newCommit()) { + assertThatThrownBy(() -> commit.commit(finalMessages)) + .hasMessageContaining("branch-merge.enabled"); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index 9401cce83275..97db601e73d9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -144,6 +144,7 @@ import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING; import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -2672,4 +2673,21 @@ protected FileStoreTable createFileStoreTable(Consumer configure, RowTy "")); return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + + @Test + public void testMergeBranchPrimaryKeyTable() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies(anyCauseMatches(IllegalArgumentException.class, "branch-merge.enabled")); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java index c74b53d3f381..109303a9288d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java @@ -69,7 +69,7 @@ void before() throws Exception { tagManager = new TagManager(fileIO, tablePath); branchManager = new FileSystemBranchManager( - fileIO, tablePath, snapshotManager, tagManager, schemaManager); + fileIO, tablePath, snapshotManager, tagManager, schemaManager, null); } @Test From e924f1f36cc523594f0d54279f75c739c207f521 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Tue, 19 May 2026 19:23:02 +0800 Subject: [PATCH 3/5] [core] Rename clearFirstRowId to newFirstRowId(@Nullable Long) --- .../src/main/java/org/apache/paimon/io/DataFileMeta.java | 2 +- .../src/main/java/org/apache/paimon/io/PojoDataFileMeta.java | 4 ++-- .../org/apache/paimon/utils/DefaultBranchMergeHandler.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index c06a8f2422da..23b34947bcc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -329,7 +329,7 @@ default Range nonNullRowIdRange() { DataFileMeta assignFirstRowId(long firstRowId); - DataFileMeta clearFirstRowId(); + DataFileMeta newFirstRowId(@Nullable Long newFirstRowId); default List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java index cbb701ec531b..9e845b26fe14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java @@ -380,7 +380,7 @@ public PojoDataFileMeta assignFirstRowId(long firstRowId) { } @Override - public PojoDataFileMeta clearFirstRowId() { + public PojoDataFileMeta newFirstRowId(@Nullable Long newFirstRowId) { return new PojoDataFileMeta( fileName, fileSize, @@ -400,7 +400,7 @@ public PojoDataFileMeta clearFirstRowId() { fileSource, valueStatsCols, externalPath, - null, + newFirstRowId, writeCols); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java b/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java index f7e3cfa76b3b..698126b48f2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java @@ -108,7 +108,7 @@ private DataFileMeta prepareFileForTargetCommit(DataFileMeta file, boolean rowTr if (rowTrackingEnabled && file.firstRowId() != null) { // Source files already have row ids assigned in their branch. Clear them so the // target branch commit path assigns fresh, non-overlapping row ids. - return file.clearFirstRowId(); + return file.newFirstRowId(null); } return file; } From f41c7af91a2237279882c0e1eeb0fe7f377e3d6e Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Wed, 20 May 2026 14:51:50 +0800 Subject: [PATCH 4/5] [core] Validate branch merge with full-history check and remove REST support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove branch-merge.enabled option (CoreOptions, SchemaValidation, FileStoreCommitImpl) — merge is always allowed as an explicit operation - Replace fork-point based validation with conservative full-history check: both branches must retain complete append-only snapshot history from snapshot 1; reject if any snapshot expired or has non-append commit kind - Remove REST API support for branch merge (high overhead file operation): delete MergeBranchRequest, RESTApi.mergeBranch, ResourcePaths.mergeBranch, REST server handler, and OpenAPI endpoint; RESTCatalog.mergeBranch now throws UnsupportedOperationException - Update tests to cover: success with complete history, failure on expired snapshots (source/target), failure on OVERWRITE/COMPACT (source/target), plain branch success with full history --- docs/generated/core_configuration.html | 6 - docs/static/rest-catalog-open-api.yaml | 52 --- .../java/org/apache/paimon/CoreOptions.java | 15 - .../java/org/apache/paimon/rest/RESTApi.java | 20 -- .../org/apache/paimon/rest/ResourcePaths.java | 12 - .../rest/requests/MergeBranchRequest.java | 58 ---- .../paimon/operation/FileStoreCommitImpl.java | 19 -- .../org/apache/paimon/rest/RESTCatalog.java | 8 +- .../paimon/schema/SchemaValidation.java | 25 -- .../paimon/utils/FileSystemBranchManager.java | 52 ++- .../apache/paimon/rest/RESTCatalogServer.java | 39 --- .../apache/paimon/rest/RESTCatalogTest.java | 32 +- .../table/AppendOnlySimpleTableTest.java | 309 +++++++++++++++--- .../table/PrimaryKeySimpleTableTest.java | 2 +- 14 files changed, 319 insertions(+), 330 deletions(-) delete mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/requests/MergeBranchRequest.java diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index cc9e47157d45..c11941ea27db 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -92,12 +92,6 @@ MemorySize Target size of a blob file. Default is value of TARGET_FILE_SIZE. - -
branch-merge.enabled
- false - Boolean - Whether to enable branch merge for this table. When enabled, the table must keep a pure-append history: compaction and INSERT OVERWRITE are rejected, and deletion vectors are not supported. This provides the invariant required by branch merge. -
bucket
-1 diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index 1b6cbfc86c7a..e310f398f821 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -1267,48 +1267,6 @@ paths: $ref: '#/components/responses/BranchNotExistErrorResponse' "500": $ref: '#/components/responses/ServerErrorResponse' - /v1/{prefix}/databases/{database}/tables/{table}/branches/merge: - post: - tags: - - branch - summary: Merge branch - description: >- - Merge data files from source branch into target branch. - The table must be created with 'branch-merge.enabled' = 'true'. This option - enforces a pure-append table history by rejecting compaction and INSERT OVERWRITE, - and it is incompatible with deletion vectors. Requires an append-only table with - consistent row-tracking settings and compatible schema history. - operationId: mergeBranch - parameters: - - name: prefix - in: path - required: true - schema: - type: string - - name: database - in: path - required: true - schema: - type: string - - name: table - in: path - required: true - schema: - type: string - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/MergeBranchRequest' - responses: - "200": - description: Success, no content - "401": - $ref: '#/components/responses/UnauthorizedErrorResponse' - "404": - $ref: '#/components/responses/BranchNotExistErrorResponse' - "500": - $ref: '#/components/responses/ServerErrorResponse' /v1/{prefix}/databases/{database}/tables/{table}/tags: get: tags: @@ -3346,16 +3304,6 @@ components: properties: branch: type: string - MergeBranchRequest: - type: object - required: - - sourceBranch - - targetBranch - properties: - sourceBranch: - type: string - targetBranch: - type: string ListBranchesResponse: type: object properties: diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 8a7aa4387cbf..d5a1bb7fb067 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2224,17 +2224,6 @@ public InlineElement getDescription() { .defaultValue(false) .withDescription("Whether enable data evolution for row tracking table."); - @Immutable - public static final ConfigOption BRANCH_MERGE_ENABLED = - key("branch-merge.enabled") - .booleanType() - .defaultValue(false) - .withDescription( - "Whether to enable branch merge for this table. " - + "When enabled, the table must keep a pure-append history: " - + "compaction and INSERT OVERWRITE are rejected, and deletion vectors " - + "are not supported. This provides the invariant required by branch merge."); - public static final ConfigOption SNAPSHOT_IGNORE_EMPTY_COMMIT = key("snapshot.ignore-empty-commit") .booleanType() @@ -3701,10 +3690,6 @@ public boolean dataEvolutionEnabled() { return options.get(DATA_EVOLUTION_ENABLED); } - public boolean branchMergeEnabled() { - return options.get(BRANCH_MERGE_ENABLED); - } - public boolean prepareCommitWaitCompaction() { if (!needLookup()) { return false; diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index dbfb2f77c261..7433a30388f8 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -48,7 +48,6 @@ import org.apache.paimon.rest.requests.ForwardBranchRequest; import org.apache.paimon.rest.requests.ListPartitionsByNamesRequest; import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; -import org.apache.paimon.rest.requests.MergeBranchRequest; import org.apache.paimon.rest.requests.RegisterTableRequest; import org.apache.paimon.rest.requests.RenameBranchRequest; import org.apache.paimon.rest.requests.RenameTableRequest; @@ -1026,25 +1025,6 @@ public void fastForward(Identifier identifier, String branch) { restAuthFunction); } - /** - * Merge branch for table. - * - * @param identifier database name and table name. - * @param sourceBranch source branch name - * @param targetBranch target branch name - * @throws NoSuchResourceException Exception thrown on HTTP 404 means the branch or table not - * exists - * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for - * this table - */ - public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) { - MergeBranchRequest request = new MergeBranchRequest(sourceBranch, targetBranch); - client.post( - resourcePaths.mergeBranch(identifier.getDatabaseName(), identifier.getObjectName()), - request, - restAuthFunction); - } - /** * List branches for table. * diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 2b00c404977c..66a1653232b8 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -286,18 +286,6 @@ public String renameBranch(String databaseName, String tableName, String branch) "rename"); } - public String mergeBranch(String databaseName, String tableName) { - return SLASH.join( - V1, - prefix, - DATABASES, - encodeString(databaseName), - TABLES, - encodeString(tableName), - BRANCHES, - "merge"); - } - public String tags(String databaseName, String objectName) { return SLASH.join( V1, diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/MergeBranchRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/MergeBranchRequest.java deleted file mode 100644 index f011086e3699..000000000000 --- a/paimon-api/src/main/java/org/apache/paimon/rest/requests/MergeBranchRequest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.rest.requests; - -import org.apache.paimon.rest.RESTRequest; - -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -/** Request for merging branch. */ -@JsonIgnoreProperties(ignoreUnknown = true) -public class MergeBranchRequest implements RESTRequest { - - private static final String FIELD_SOURCE_BRANCH = "sourceBranch"; - private static final String FIELD_TARGET_BRANCH = "targetBranch"; - - @JsonProperty(FIELD_SOURCE_BRANCH) - private final String sourceBranch; - - @JsonProperty(FIELD_TARGET_BRANCH) - private final String targetBranch; - - @JsonCreator - public MergeBranchRequest( - @JsonProperty(FIELD_SOURCE_BRANCH) String sourceBranch, - @JsonProperty(FIELD_TARGET_BRANCH) String targetBranch) { - this.sourceBranch = sourceBranch; - this.targetBranch = targetBranch; - } - - @JsonGetter(FIELD_SOURCE_BRANCH) - public String sourceBranch() { - return sourceBranch; - } - - @JsonGetter(FIELD_TARGET_BRANCH) - public String targetBranch() { - return targetBranch; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index bb55a0b2b072..0032d69adfab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -301,9 +301,6 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { int attempts = 0; ManifestEntryChanges changes = collectChanges(committable.fileCommittables()); - if (options.branchMergeEnabled()) { - validatePureAppendCommit(changes); - } try { List appendSimpleEntries = SimpleFileEntry.from(changes.appendTableFiles); @@ -434,13 +431,6 @@ public int overwritePartition( properties); } - if (options.branchMergeEnabled()) { - throw new UnsupportedOperationException( - String.format( - "INSERT OVERWRITE is not allowed when %s is true.", - CoreOptions.BRANCH_MERGE_ENABLED.key())); - } - long started = System.nanoTime(); int generatedSnapshot = 0; int attempts = 0; @@ -699,15 +689,6 @@ private ManifestEntryChanges collectChanges(List commitMessages) return changes; } - private void validatePureAppendCommit(ManifestEntryChanges changes) { - checkArgument( - changes.compactTableFiles.isEmpty() - && changes.compactChangelog.isEmpty() - && changes.compactIndexFiles.isEmpty(), - "Compaction is not allowed when %s is true.", - CoreOptions.BRANCH_MERGE_ENABLED.key()); - } - private int tryCommit( CommitChangesProvider changesProvider, long identifier, diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index e617f31dd287..b47a8ae0b109 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -786,13 +786,7 @@ public void fastForward(Identifier identifier, String branch) throws BranchNotEx @Override public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) throws BranchNotExistException { - try { - api.mergeBranch(identifier, sourceBranch, targetBranch); - } catch (NoSuchResourceException e) { - throw new BranchNotExistException(identifier, e.resourceName(), e); - } catch (ForbiddenException e) { - throw new TableNoPermissionException(identifier, e); - } + throw new UnsupportedOperationException("Branch merge is not supported via REST catalog."); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 713b945db323..4ffc3ec0259e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -307,8 +307,6 @@ public static void validateTableSchema(TableSchema schema) { validateRowTracking(schema, options); - validateBranchMerge(schema, options); - validateIncrementalClustering(schema, options); validateChainTable(schema, options); @@ -796,29 +794,6 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) } } - private static void validateBranchMerge(TableSchema schema, CoreOptions options) { - if (!options.branchMergeEnabled()) { - return; - } - - checkArgument( - schema.primaryKeys().isEmpty(), - "%s is not supported for tables with primary keys.", - CoreOptions.BRANCH_MERGE_ENABLED.key()); - - checkArgument( - !options.deletionVectorsEnabled(), - "%s is incompatible with %s.", - CoreOptions.BRANCH_MERGE_ENABLED.key(), - CoreOptions.DELETION_VECTORS_ENABLED.key()); - - checkArgument( - !options.dataEvolutionEnabled(), - "%s is incompatible with %s.", - CoreOptions.BRANCH_MERGE_ENABLED.key(), - CoreOptions.DATA_EVOLUTION_ENABLED.key()); - } - private static void validateBlobFields(RowType rowType, CoreOptions options) { Set blobFieldNames = rowType.getFields().stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java index ed4c7444f0e7..aee178785028 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java @@ -224,7 +224,7 @@ public void fastForward(String branchName) { public void mergeBranch(String sourceBranch, String targetBranch) { BranchManager.mergeValidate(sourceBranch, targetBranch); validateMergeBranches(sourceBranch, targetBranch); - validateBranchMergeEnabled(sourceBranch, targetBranch); + validateAppendOnlyHistory(sourceBranch, targetBranch); validateAppendOnly(sourceBranch, targetBranch); validateNoDataEvolution(sourceBranch, targetBranch); validateRowTrackingConsistent(sourceBranch, targetBranch); @@ -318,16 +318,46 @@ private void validateAppendOnly(String sourceBranch, String targetBranch) { targetBranch); } - private void validateBranchMergeEnabled(String sourceBranch, String targetBranch) { - for (String branch : new String[] {sourceBranch, targetBranch}) { - SchemaManager sm = new SchemaManager(fileIO, tablePath, branch); - TableSchema schema = sm.latest().get(); - CoreOptions opts = new CoreOptions(schema.options()); - checkArgument( - opts.branchMergeEnabled(), - "Branch merge requires '%s' to be true (branch '%s').", - CoreOptions.BRANCH_MERGE_ENABLED.key(), - branch); + // Branch merge is implemented as a conservative file-level merge. Without persisted branch + // lineage metadata, we cannot reliably infer a fork point after snapshots expire. To preserve + // correctness, both branches must retain complete append-only history from the first snapshot. + // This restriction can be relaxed in the future if branch fork metadata is introduced. + private void validateAppendOnlyHistory(String sourceBranch, String targetBranch) { + validateCompleteAppendOnly(snapshotManager.copyWithBranch(sourceBranch), sourceBranch); + validateCompleteAppendOnly(snapshotManager.copyWithBranch(targetBranch), targetBranch); + } + + private void validateCompleteAppendOnly(SnapshotManager sm, String branch) { + Long earliest = sm.earliestSnapshotId(); + Long latest = sm.latestSnapshotId(); + if (earliest == null || latest == null) { + return; + } + if (earliest != Snapshot.FIRST_SNAPSHOT_ID) { + throw new IllegalArgumentException( + String.format( + "Cannot merge: branch '%s' does not start at snapshot %d " + + "(earliest is %d). " + + "Branch merge requires complete append-only snapshot history.", + branch, Snapshot.FIRST_SNAPSHOT_ID, earliest)); + } + for (long id = Snapshot.FIRST_SNAPSHOT_ID; id <= latest; id++) { + if (!sm.snapshotExists(id)) { + throw new IllegalArgumentException( + String.format( + "Cannot merge: snapshot %d is missing on branch '%s'. " + + "Branch merge requires complete append-only snapshot history.", + id, branch)); + } + Snapshot snapshot = sm.snapshot(id); + Snapshot.CommitKind kind = snapshot.commitKind(); + if (kind != Snapshot.CommitKind.APPEND && kind != Snapshot.CommitKind.ANALYZE) { + throw new IllegalArgumentException( + String.format( + "Cannot merge: snapshot %d on branch '%s' has commit kind '%s'. " + + "Branch merge requires complete append-only snapshot history.", + id, branch, kind)); + } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 76e51fea0afd..74e4f4e465a9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -61,7 +61,6 @@ import org.apache.paimon.rest.requests.CreateViewRequest; import org.apache.paimon.rest.requests.ListPartitionsByNamesRequest; import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; -import org.apache.paimon.rest.requests.MergeBranchRequest; import org.apache.paimon.rest.requests.RenameBranchRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.requests.ReplaceTableRequest; @@ -1925,44 +1924,6 @@ private MockResponse branchApiHandle( identifier.getFullName(), tableLatestSnapshotStore.get(branchIdentifier.getFullName())); } - } else if (resources.length == 5 && "merge".equals(resources[4])) { - // Merge branch: /branches/merge - MergeBranchRequest mergeRequest = - RESTApi.fromJson(data, MergeBranchRequest.class); - try { - table.mergeBranch( - mergeRequest.sourceBranch(), mergeRequest.targetBranch()); - } catch (Exception e) { - String msg = e.getMessage(); - if (msg != null && msg.contains("doesn't exist")) { - String branchName = - msg.contains(mergeRequest.sourceBranch()) - ? mergeRequest.sourceBranch() - : mergeRequest.targetBranch(); - response = - new ErrorResponse( - ErrorResponse.RESOURCE_TYPE_BRANCH, - branchName, - msg, - 404); - return mockResponse(response, 404); - } - throw e; - } - String targetBranchName = mergeRequest.targetBranch(); - Identifier targetIdentifier = - BranchManager.isMainBranch(targetBranchName) - ? identifier - : new Identifier( - identifier.getDatabaseName(), - identifier.getTableName(), - targetBranchName); - FileStoreTable targetTable = - (FileStoreTable) catalog.getTable(targetIdentifier); - Snapshot targetSnapshot = targetTable.snapshotManager().latestSnapshot(); - tableLatestSnapshotStore.put( - targetIdentifier.getFullName(), - new TableSnapshot(targetSnapshot, 0, 0, 0, 0)); } else { CreateBranchRequest requestBody = RESTApi.fromJson(data, CreateBranchRequest.class); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index f9bf8a0e7bf8..1da72a437fab 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -375,7 +375,7 @@ void testApiWhenTableNoPermission() throws Exception { Catalog.TableNoPermissionException.class, () -> restCatalog.fastForward(identifier, "test_branch")); assertThrows( - Catalog.TableNoPermissionException.class, + UnsupportedOperationException.class, () -> restCatalog.mergeBranch(identifier, "test_branch", "main")); assertThrows(ForbiddenException.class, () -> restCatalog.api().loadTableToken(identifier)); assertThrows( @@ -2138,34 +2138,14 @@ public void testBranchBatchRecordsWrite() throws Exception { } @Test - public void testMergeBranch() throws Exception { + public void testMergeBranchUnsupported() throws Exception { Identifier tableIdentifier = Identifier.create("merge_db", "merge_table"); Map options = Maps.newHashMap(); - options.put("branch-merge.enabled", "true"); createTable(tableIdentifier, options, Collections.emptyList()); - FileStoreTable mainTable = (FileStoreTable) catalog.getTable(tableIdentifier); - batchWrite(mainTable, Lists.newArrayList(1, 2, 3)); - - mainTable = (FileStoreTable) catalog.getTable(tableIdentifier); - mainTable.createTag("tag1", 1); - restCatalog.createBranch(tableIdentifier, "branch1", "tag1"); - - Identifier branchIdentifier = new Identifier("merge_db", "merge_table", "branch1"); - FileStoreTable branchTable = (FileStoreTable) catalog.getTable(branchIdentifier); - batchWrite(branchTable, Lists.newArrayList(7, 8, 9)); - - // Write more data to main (this data should not be in branch) - mainTable = (FileStoreTable) catalog.getTable(tableIdentifier); - batchWrite(mainTable, Lists.newArrayList(4, 5, 6)); - restCatalog.mergeBranch(tableIdentifier, "branch1", "main"); - - mainTable = (FileStoreTable) catalog.getTable(tableIdentifier); - List result = batchRead(mainTable); - assertThat(result) - .containsExactlyInAnyOrder( - "+I[1]", "+I[2]", "+I[3]", "+I[4]", "+I[5]", "+I[6]", "+I[7]", "+I[8]", - "+I[9]"); + assertThrows( + UnsupportedOperationException.class, + () -> restCatalog.mergeBranch(tableIdentifier, "branch1", "main")); } @Test @@ -2222,7 +2202,7 @@ void testBranches() throws Exception { Catalog.BranchNotExistException.class, () -> restCatalog.fastForward(identifier, "no_exist_branch")); assertThrows( - Catalog.BranchNotExistException.class, + UnsupportedOperationException.class, () -> restCatalog.mergeBranch(identifier, "no_exist_branch", "main")); assertThat(restCatalog.listBranches(identifier)).isEmpty(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index 3919eecf5def..2544e85e6343 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -1708,16 +1708,12 @@ protected FileStoreTable createUnawareBucketFileStoreTable( } protected FileStoreTable createBranchMergeTable() throws Exception { - return createFileStoreTable(conf -> conf.set(CoreOptions.BRANCH_MERGE_ENABLED, true)); + return createFileStoreTable(); } protected FileStoreTable createBranchMergeTable(Consumer extraConfigure) throws Exception { - return createFileStoreTable( - conf -> { - conf.set(CoreOptions.BRANCH_MERGE_ENABLED, true); - extraConfigure.accept(conf); - }); + return createFileStoreTable(extraConfigure); } @Test @@ -1731,8 +1727,9 @@ public void testMergeBranch() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - // Create branch - table.createBranch(BRANCH_NAME); + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); // Write data to branch @@ -1775,8 +1772,9 @@ public void testMergeBranchMultipleTimes() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - // Create branch - table.createBranch(BRANCH_NAME); + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); // First write to branch @@ -1856,8 +1854,9 @@ public void testMergeBranchBidirectional() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - // Create branch - table.createBranch(BRANCH_NAME); + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); // Write to branch @@ -1999,7 +1998,8 @@ public void testMergeBranchRowTrackingTable() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - table.createBranch(BRANCH_NAME); + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); try (StreamTableWrite write = tableBranch.newWrite(commitUser); @@ -2043,7 +2043,8 @@ public void testMergeBranchRowTrackingMultipleTimes() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - table.createBranch(BRANCH_NAME); + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); // First write to branch + merge @@ -2089,7 +2090,8 @@ public void testMergeBranchRowTrackingAfterTargetWrites() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - table.createBranch(BRANCH_NAME); + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); // Write 2 rows to branch @@ -2183,7 +2185,8 @@ public void testMergeBranchRowTrackingMismatch() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - table.createBranch(BRANCH_NAME); + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); try (StreamTableWrite write = tableBranch.newWrite(commitUser); @@ -2230,7 +2233,8 @@ public void testMergeBranchRowTrackingStaleMerge() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - table.createBranch(BRANCH_NAME); + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); // Write to branch @@ -2336,8 +2340,9 @@ public void testMergeBranchSamePartition() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - // Create branch - table.createBranch(BRANCH_NAME); + // Create branch from tag + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); // Write to branch with same partition pt=0 @@ -2384,7 +2389,6 @@ public void testMergeBranchMultiBucket() throws Exception { createFileStoreTable( conf -> { conf.set(CoreOptions.BUCKET, 2); - conf.set(CoreOptions.BRANCH_MERGE_ENABLED, true); }); try (StreamTableWrite write = table.newWrite(commitUser); @@ -2394,7 +2398,8 @@ public void testMergeBranchMultiBucket() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - table.createBranch(BRANCH_NAME); + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); try (StreamTableWrite write = tableBranch.newWrite(commitUser); @@ -2485,8 +2490,13 @@ public void testMergeBranchBetweenNonMainBranches() throws Exception { } @Test - public void testMergeBranchWithoutBranchMergeEnabled() throws Exception { - FileStoreTable table = createFileStoreTable(); + public void testMergeBranchAfterSnapshotExpiration() throws Exception { + FileStoreTable table = + createBranchMergeTable( + conf -> { + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1); + }); try (StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser)) { @@ -2494,8 +2504,9 @@ public void testMergeBranchWithoutBranchMergeEnabled() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - table.createBranch(BRANCH_NAME); - FileStoreTable tableBranch = createBranchTable(BRANCH_NAME); + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); try (StreamTableWrite write = tableBranch.newWrite(commitUser); StreamTableCommit commit = tableBranch.newCommit(commitUser)) { @@ -2503,12 +2514,142 @@ public void testMergeBranchWithoutBranchMergeEnabled() throws Exception { commit.commit(1, write.prepareCommit(false, 2)); } + for (int i = 2; i < 5; i++) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(i, i * 10, (long) i * 100)); + commit.commit(i, write.prepareCommit(false, i + 1)); + } + } + + tableBranch.newExpireSnapshots().config(tableBranch.coreOptions().expireConfig()).expire(); + + // After expiration, baseline snapshot is gone — merge should fail assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) - .satisfies(anyCauseMatches(IllegalArgumentException.class, "branch-merge.enabled")); + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); } @Test - public void testMergeBranchAfterSnapshotExpiration() throws Exception { + public void testMergeBranchRejectsNonAppendHistory() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + // Write data to branch + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(0, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Perform INSERT OVERWRITE on branch — creates an OVERWRITE snapshot + List commitMessages; + try (BatchTableWrite write = + tableBranch.newBatchWriteBuilder().withOverwrite().newWrite()) { + write.write(rowData(0, 20, 200L)); + commitMessages = write.prepareCommit(); + } + try (BatchTableCommit commit = + tableBranch.newBatchWriteBuilder().withOverwrite().newCommit()) { + commit.commit(commitMessages); + } + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsTargetOverwrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(0, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // INSERT OVERWRITE on target (main) + List commitMessages; + try (BatchTableWrite write = table.newBatchWriteBuilder().withOverwrite().newWrite()) { + write.write(rowData(0, 20, 200L)); + commitMessages = write.prepareCommit(); + } + try (BatchTableCommit commit = table.newBatchWriteBuilder().withOverwrite().newCommit()) { + commit.commit(commitMessages); + } + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsSourceOverwrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(0, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // INSERT OVERWRITE on source (branch) + List commitMessages; + try (BatchTableWrite write = + tableBranch.newBatchWriteBuilder().withOverwrite().newWrite()) { + write.write(rowData(0, 20, 200L)); + commitMessages = write.prepareCommit(); + } + try (BatchTableCommit commit = + tableBranch.newBatchWriteBuilder().withOverwrite().newCommit()) { + commit.commit(commitMessages); + } + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsSourceExpiredSnapshots() throws Exception { FileStoreTable table = createBranchMergeTable( conf -> { @@ -2522,7 +2663,44 @@ public void testMergeBranchAfterSnapshotExpiration() throws Exception { commit.commit(0, write.prepareCommit(false, 1)); } - table.createBranch(BRANCH_NAME); + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + for (int i = 1; i < 5; i++) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(i, i * 10, (long) i * 100)); + commit.commit(i, write.prepareCommit(false, i + 1)); + } + } + + tableBranch.newExpireSnapshots().config(tableBranch.coreOptions().expireConfig()).expire(); + + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchRejectsTargetExpiredSnapshots() throws Exception { + FileStoreTable table = + createBranchMergeTable( + conf -> { + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); + conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1); + }); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); try (StreamTableWrite write = tableBranch.newWrite(commitUser); @@ -2531,6 +2709,7 @@ public void testMergeBranchAfterSnapshotExpiration() throws Exception { commit.commit(1, write.prepareCommit(false, 2)); } + // Write multiple commits on main to allow expiration for (int i = 2; i < 5; i++) { try (StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser)) { @@ -2541,6 +2720,39 @@ public void testMergeBranchAfterSnapshotExpiration() throws Exception { table.newExpireSnapshots().config(table.coreOptions().expireConfig()).expire(); + assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch merge requires complete append-only snapshot history")); + } + + @Test + public void testMergeBranchFromTagSucceeds() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + table.mergeBranch(BRANCH_NAME, "main"); assertThat( @@ -2551,25 +2763,44 @@ public void testMergeBranchAfterSnapshotExpiration() throws Exception { .containsExactlyInAnyOrder( "0|0|0|binary|varbinary|mapKey:mapVal|multiset", "1|10|100|binary|varbinary|mapKey:mapVal|multiset", - "2|20|200|binary|varbinary|mapKey:mapVal|multiset", - "3|30|300|binary|varbinary|mapKey:mapVal|multiset", - "4|40|400|binary|varbinary|mapKey:mapVal|multiset"); + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); } @Test - public void testBranchMergeEnabledRejectsOverwrite() throws Exception { + public void testMergePlainBranchSucceedsWithCompleteHistory() throws Exception { FileStoreTable table = createBranchMergeTable(); - List commitMessages; - try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite()) { + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createBranch(BRANCH_NAME); + FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME); + + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { write.write(rowData(1, 10, 100L)); - commitMessages = write.prepareCommit(); + commit.commit(1, write.prepareCommit(false, 2)); } - List finalMessages = commitMessages; - try (BatchTableCommit commit = table.newBatchWriteBuilder().withOverwrite().newCommit()) { - assertThatThrownBy(() -> commit.commit(finalMessages)) - .hasMessageContaining("branch-merge.enabled"); + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(2, write.prepareCommit(false, 2)); } + + table.mergeBranch(BRANCH_NAME, "main"); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index 97db601e73d9..80cfb975bd9e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -2688,6 +2688,6 @@ public void testMergeBranchPrimaryKeyTable() throws Exception { table.createBranch(BRANCH_NAME, "tag1"); assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main")) - .satisfies(anyCauseMatches(IllegalArgumentException.class, "branch-merge.enabled")); + .satisfies(anyCauseMatches(IllegalArgumentException.class, "append-only tables")); } } From d61e8477da05f1e046f4dab863121fac84df6dd7 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Wed, 20 May 2026 18:49:27 +0800 Subject: [PATCH 5/5] [core] Remove branch merge from catalog API --- .../paimon/catalog/AbstractCatalog.java | 6 - .../org/apache/paimon/catalog/Catalog.java | 13 -- .../paimon/catalog/DelegateCatalog.java | 6 - .../org/apache/paimon/rest/RESTCatalog.java | 6 - .../paimon/table/AbstractFileStoreTable.java | 4 +- .../paimon/utils/BranchMergeHandler.java | 121 ++++++++++++++- .../paimon/utils/CatalogBranchManager.java | 6 +- .../utils/DefaultBranchMergeHandler.java | 146 ------------------ .../apache/paimon/rest/RESTCatalogTest.java | 17 -- .../table/AppendOnlySimpleTableTest.java | 4 +- 10 files changed, 120 insertions(+), 209 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 5c27dcb8cff7..4a8a8b1b91b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -631,12 +631,6 @@ public void fastForward(Identifier identifier, String branch) throws BranchNotEx throw new UnsupportedOperationException(); } - @Override - public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) - throws BranchNotExistException { - throw new UnsupportedOperationException(); - } - @Override public List listBranches(Identifier identifier) throws TableNotExistException { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 2a44944615b4..57fa040a2acd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -920,19 +920,6 @@ void renameBranch(Identifier identifier, String fromBranch, String toBranch) */ void fastForward(Identifier identifier, String branch) throws BranchNotExistException; - /** - * Merge source branch into target branch. - * - * @param identifier path of the table, cannot be system or branch name. - * @param sourceBranch the source branch name - * @param targetBranch the target branch name - * @throws BranchNotExistException if the source or target branch doesn't exist - * @throws UnsupportedOperationException if the catalog does not {@link - * #supportsVersionManagement()} - */ - void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) - throws BranchNotExistException; - /** * List all branches of the table. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index e24787f9c012..0f18f7d04540 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -263,12 +263,6 @@ public void fastForward(Identifier identifier, String branch) throws BranchNotEx wrapped.fastForward(identifier, branch); } - @Override - public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) - throws BranchNotExistException { - wrapped.mergeBranch(identifier, sourceBranch, targetBranch); - } - @Override public List listBranches(Identifier identifier) throws TableNotExistException { return wrapped.listBranches(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index b47a8ae0b109..7e2f6cfd2743 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -783,12 +783,6 @@ public void fastForward(Identifier identifier, String branch) throws BranchNotEx } } - @Override - public void mergeBranch(Identifier identifier, String sourceBranch, String targetBranch) - throws BranchNotExistException { - throw new UnsupportedOperationException("Branch merge is not supported via REST catalog."); - } - @Override public List listBranches(Identifier identifier) throws TableNotExistException { try { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 60f48f4dcd02..df4224915927 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -55,10 +55,10 @@ import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.BranchMergeHandler; import org.apache.paimon.utils.CatalogBranchManager; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.DVMetaCache; -import org.apache.paimon.utils.DefaultBranchMergeHandler; import org.apache.paimon.utils.FileSystemBranchManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; @@ -760,7 +760,7 @@ public BranchManager branchManager() { snapshotManager(), tagManager(), schemaManager(), - new DefaultBranchMergeHandler(this::switchToBranch)); + new BranchMergeHandler(this::switchToBranch)); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java index bb009664da66..0327fd0bcfcb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java @@ -18,18 +18,127 @@ package org.apache.paimon.utils; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.function.Function; -/** Handler for branch merge data operations (manifest reading, committing). */ -public interface BranchMergeHandler { +import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Read all active data files from the given branch. */ - Map readBranchFiles(String branch); +/** Branch merge handler backed by {@link FileStoreTable}. */ +public class BranchMergeHandler { - /** Commit the given files to the target branch. */ - void commit(String targetBranch, List filesToMerge); + private final Function branchTableFactory; + + public BranchMergeHandler(Function branchTableFactory) { + this.branchTableFactory = branchTableFactory; + } + + public Map readBranchFiles(String branch) { + FileStoreTable branchTable = branchTableFactory.apply(branch); + Snapshot snapshot = branchTable.snapshotManager().latestSnapshot(); + checkArgument( + snapshot != null, + "Cannot read branch '%s', because it does not have any snapshot.", + branch); + ManifestList manifestList = branchTable.store().manifestListFactory().create(); + ManifestFile manifestFile = branchTable.store().manifestFileFactory().create(); + Map files = new LinkedHashMap<>(); + FileEntry.mergeEntries(manifestFile, manifestList.readDataManifests(snapshot), files, null); + return files; + } + + public void commit(String targetBranch, List filesToMerge) { + FileStoreTable branchTable = branchTableFactory.apply(targetBranch); + boolean rowTrackingEnabled = + new CoreOptions(branchTable.schema().options()).rowTrackingEnabled(); + + Map> grouped = new LinkedHashMap<>(); + for (ManifestEntry entry : filesToMerge) { + DataFileMeta file = prepareFileForTargetCommit(entry.file(), rowTrackingEnabled); + grouped.computeIfAbsent( + new MergeKey( + entry.partition().copy(), entry.bucket(), entry.totalBuckets()), + k -> new ArrayList<>()) + .add(file); + } + + String commitUser = UUID.randomUUID().toString(); + ManifestCommittable committable = new ManifestCommittable(0); + for (Map.Entry> e : grouped.entrySet()) { + MergeKey key = e.getKey(); + CommitMessageImpl message = + new CommitMessageImpl( + key.partition, + key.bucket, + key.totalBuckets, + new DataIncrement( + e.getValue(), Collections.emptyList(), Collections.emptyList()), + CompactIncrement.emptyIncrement()); + committable.addFileCommittable(message); + } + + try (FileStoreCommit commit = branchTable.store().newCommit(commitUser, branchTable)) { + commit.appendCommitCheckConflict(true).commit(committable, true); + } + } + + private DataFileMeta prepareFileForTargetCommit(DataFileMeta file, boolean rowTrackingEnabled) { + if (rowTrackingEnabled && file.firstRowId() != null) { + // Source files already have row ids assigned in their branch. Clear them so the + // target branch commit path assigns fresh, non-overlapping row ids. + return file.newFirstRowId(null); + } + return file; + } + + private static class MergeKey { + final BinaryRow partition; + final int bucket; + final int totalBuckets; + + MergeKey(BinaryRow partition, int bucket, int totalBuckets) { + this.partition = partition; + this.bucket = bucket; + this.totalBuckets = totalBuckets; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MergeKey)) { + return false; + } + MergeKey that = (MergeKey) o; + return bucket == that.bucket + && totalBuckets == that.totalBuckets + && Objects.equals(partition, that.partition); + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket, totalBuckets); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java index b2906544ada8..9f97cf3a097d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java @@ -110,11 +110,7 @@ public void fastForward(String branchName) { @Override public void mergeBranch(String sourceBranch, String targetBranch) { - executePost( - catalog -> { - BranchManager.mergeValidate(sourceBranch, targetBranch); - catalog.mergeBranch(identifier, sourceBranch, targetBranch); - }); + throw new UnsupportedOperationException("Branch merge is not supported via catalog."); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java b/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java deleted file mode 100644 index 698126b48f2d..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/utils/DefaultBranchMergeHandler.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.utils; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.io.CompactIncrement; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.manifest.FileEntry; -import org.apache.paimon.manifest.ManifestCommittable; -import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.manifest.ManifestFile; -import org.apache.paimon.manifest.ManifestList; -import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.sink.CommitMessageImpl; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.function.Function; - -import static org.apache.paimon.utils.Preconditions.checkArgument; - -/** Default implementation of {@link BranchMergeHandler} backed by {@link FileStoreTable}. */ -public class DefaultBranchMergeHandler implements BranchMergeHandler { - - private final Function branchTableFactory; - - public DefaultBranchMergeHandler(Function branchTableFactory) { - this.branchTableFactory = branchTableFactory; - } - - @Override - public Map readBranchFiles(String branch) { - FileStoreTable branchTable = branchTableFactory.apply(branch); - Snapshot snapshot = branchTable.snapshotManager().latestSnapshot(); - checkArgument( - snapshot != null, - "Cannot read branch '%s', because it does not have any snapshot.", - branch); - ManifestList manifestList = branchTable.store().manifestListFactory().create(); - ManifestFile manifestFile = branchTable.store().manifestFileFactory().create(); - Map files = new LinkedHashMap<>(); - FileEntry.mergeEntries(manifestFile, manifestList.readDataManifests(snapshot), files, null); - return files; - } - - @Override - public void commit(String targetBranch, List filesToMerge) { - FileStoreTable branchTable = branchTableFactory.apply(targetBranch); - boolean rowTrackingEnabled = - new CoreOptions(branchTable.schema().options()).rowTrackingEnabled(); - - Map> grouped = new LinkedHashMap<>(); - for (ManifestEntry entry : filesToMerge) { - DataFileMeta file = prepareFileForTargetCommit(entry.file(), rowTrackingEnabled); - grouped.computeIfAbsent( - new MergeKey( - entry.partition().copy(), entry.bucket(), entry.totalBuckets()), - k -> new ArrayList<>()) - .add(file); - } - - String commitUser = UUID.randomUUID().toString(); - ManifestCommittable committable = new ManifestCommittable(0); - for (Map.Entry> e : grouped.entrySet()) { - MergeKey key = e.getKey(); - CommitMessageImpl message = - new CommitMessageImpl( - key.partition, - key.bucket, - key.totalBuckets, - new DataIncrement( - e.getValue(), Collections.emptyList(), Collections.emptyList()), - CompactIncrement.emptyIncrement()); - committable.addFileCommittable(message); - } - - try (FileStoreCommit commit = branchTable.store().newCommit(commitUser, branchTable)) { - commit.appendCommitCheckConflict(true).commit(committable, true); - } - } - - private DataFileMeta prepareFileForTargetCommit(DataFileMeta file, boolean rowTrackingEnabled) { - if (rowTrackingEnabled && file.firstRowId() != null) { - // Source files already have row ids assigned in their branch. Clear them so the - // target branch commit path assigns fresh, non-overlapping row ids. - return file.newFirstRowId(null); - } - return file; - } - - private static class MergeKey { - final BinaryRow partition; - final int bucket; - final int totalBuckets; - - MergeKey(BinaryRow partition, int bucket, int totalBuckets) { - this.partition = partition; - this.bucket = bucket; - this.totalBuckets = totalBuckets; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof MergeKey)) { - return false; - } - MergeKey that = (MergeKey) o; - return bucket == that.bucket - && totalBuckets == that.totalBuckets - && Objects.equals(partition, that.partition); - } - - @Override - public int hashCode() { - return Objects.hash(partition, bucket, totalBuckets); - } - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 1da72a437fab..8cefa2ec7b42 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -374,9 +374,6 @@ void testApiWhenTableNoPermission() throws Exception { assertThrows( Catalog.TableNoPermissionException.class, () -> restCatalog.fastForward(identifier, "test_branch")); - assertThrows( - UnsupportedOperationException.class, - () -> restCatalog.mergeBranch(identifier, "test_branch", "main")); assertThrows(ForbiddenException.class, () -> restCatalog.api().loadTableToken(identifier)); assertThrows( Catalog.TableNoPermissionException.class, @@ -2137,17 +2134,6 @@ public void testBranchBatchRecordsWrite() throws Exception { .containsExactlyInAnyOrder("+I[5]", "+I[12]", "+I[18]", "+I[2]", "+I[1]", "+I[9]"); } - @Test - public void testMergeBranchUnsupported() throws Exception { - Identifier tableIdentifier = Identifier.create("merge_db", "merge_table"); - Map options = Maps.newHashMap(); - createTable(tableIdentifier, options, Collections.emptyList()); - - assertThrows( - UnsupportedOperationException.class, - () -> restCatalog.mergeBranch(tableIdentifier, "branch1", "main")); - } - @Test void testBranches() throws Exception { String databaseName = "testBranchTable"; @@ -2201,9 +2187,6 @@ void testBranches() throws Exception { assertThrows( Catalog.BranchNotExistException.class, () -> restCatalog.fastForward(identifier, "no_exist_branch")); - assertThrows( - UnsupportedOperationException.class, - () -> restCatalog.mergeBranch(identifier, "no_exist_branch", "main")); assertThat(restCatalog.listBranches(identifier)).isEmpty(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index 2544e85e6343..c91dbb4c8e4f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -76,7 +76,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.DefaultBranchMergeHandler; +import org.apache.paimon.utils.BranchMergeHandler; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat; @@ -1827,7 +1827,7 @@ public void testMergeBranchFailsOnStaleDuplicateCommit() throws Exception { commit.commit(1, write.prepareCommit(false, 2)); } - DefaultBranchMergeHandler handler = new DefaultBranchMergeHandler(table::switchToBranch); + BranchMergeHandler handler = new BranchMergeHandler(table::switchToBranch); Map sourceFiles = handler.readBranchFiles(BRANCH_NAME); Map targetFiles =