Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/static/rest-catalog-open-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3615,4 +3615,4 @@ components:
securitySchemes:
BearerAuth:
type: http
scheme: bearer
scheme: bearer
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ default Range nonNullRowIdRange() {

DataFileMeta assignFirstRowId(long firstRowId);

DataFileMeta newFirstRowId(@Nullable Long newFirstRowId);

default List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,31 @@ public PojoDataFileMeta assignFirstRowId(long firstRowId) {
writeCols);
}

@Override
public PojoDataFileMeta newFirstRowId(@Nullable Long newFirstRowId) {
return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols,
externalPath,
newFirstRowId,
writeCols);
}

@Override
public PojoDataFileMeta copy(List<String> newExtraFiles) {
return new PojoDataFileMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ public void fastForward(String branchName) {
wrapped.fastForward(branchName);
}

@Override
public void mergeBranch(String sourceBranch, String targetBranch) {
privilegeChecker.assertCanInsert(identifier);
wrapped.mergeBranch(sourceBranch, targetBranch);
}

@Override
public ExpireSnapshots newExpireSnapshots() {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.BranchMergeHandler;
import org.apache.paimon.utils.CatalogBranchManager;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.DVMetaCache;
Expand Down Expand Up @@ -737,6 +738,11 @@ public void fastForward(String branchName) {
branchManager().fastForward(branchName);
}

@Override
public void mergeBranch(String sourceBranch, String targetBranch) {
branchManager().mergeBranch(sourceBranch, targetBranch);
}

@Override
public TagManager tagManager() {
return new TagManager(fileIO, path, currentBranch(), coreOptions());
Expand All @@ -749,7 +755,12 @@ public BranchManager branchManager() {
return new CatalogBranchManager(catalogEnvironment.catalogLoader(), identifier());
}
return new FileSystemBranchManager(
fileIO, path, snapshotManager(), tagManager(), schemaManager());
fileIO,
path,
snapshotManager(),
tagManager(),
schemaManager(),
new BranchMergeHandler(this::switchToBranch));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ public void fastForward(String branchName) {
wrapped.fastForward(branchName);
}

@Override
public void mergeBranch(String sourceBranch, String targetBranch) {
wrapped.mergeBranch(sourceBranch, targetBranch);
}

@Override
public ExpireSnapshots newExpireSnapshots() {
return wrapped.newExpireSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ default void fastForward(String branchName) {
throw new UnsupportedOperationException();
}

@Override
default void mergeBranch(String sourceBranch, String targetBranch) {
throw new UnsupportedOperationException();
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ default void fastForward(String branchName) {
this.getClass().getSimpleName()));
}

@Override
default void mergeBranch(String sourceBranch, String targetBranch) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support mergeBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ default void deleteBranches(String branchNames) {
@Experimental
void fastForward(String branchName);

/** Merge source branch into target branch (append-only tables only). */
@Experimental
void mergeBranch(String sourceBranch, String targetBranch);

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public interface BranchManager {

void fastForward(String branchName);

void mergeBranch(String sourceBranch, String targetBranch);

void renameBranch(String fromBranch, String toBranch);

List<String> branches();
Expand Down Expand Up @@ -98,6 +100,21 @@ static void validateBranch(String branchName) {
branchName);
}

static void mergeValidate(String sourceBranch, String targetBranch) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sourceBranch),
"Source branch name '%s' is blank.",
sourceBranch);
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(targetBranch),
"Target branch name '%s' is blank.",
targetBranch);
checkArgument(
!sourceBranch.equals(targetBranch),
"Cannot merge branch '%s' into itself.",
sourceBranch);
}

static void fastForwardValidate(String branchName, String currentBranch) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Branch merge handler backed by {@link FileStoreTable}. */
public class BranchMergeHandler {

private final Function<String, FileStoreTable> branchTableFactory;

public BranchMergeHandler(Function<String, FileStoreTable> branchTableFactory) {
this.branchTableFactory = branchTableFactory;
}

public Map<FileEntry.Identifier, ManifestEntry> readBranchFiles(String branch) {
FileStoreTable branchTable = branchTableFactory.apply(branch);
Snapshot snapshot = branchTable.snapshotManager().latestSnapshot();
checkArgument(
snapshot != null,
"Cannot read branch '%s', because it does not have any snapshot.",
branch);
ManifestList manifestList = branchTable.store().manifestListFactory().create();
ManifestFile manifestFile = branchTable.store().manifestFileFactory().create();
Map<FileEntry.Identifier, ManifestEntry> files = new LinkedHashMap<>();
FileEntry.mergeEntries(manifestFile, manifestList.readDataManifests(snapshot), files, null);
return files;
}

public void commit(String targetBranch, List<ManifestEntry> filesToMerge) {
FileStoreTable branchTable = branchTableFactory.apply(targetBranch);
boolean rowTrackingEnabled =
new CoreOptions(branchTable.schema().options()).rowTrackingEnabled();

Map<MergeKey, List<DataFileMeta>> grouped = new LinkedHashMap<>();
for (ManifestEntry entry : filesToMerge) {
DataFileMeta file = prepareFileForTargetCommit(entry.file(), rowTrackingEnabled);
grouped.computeIfAbsent(
new MergeKey(
entry.partition().copy(), entry.bucket(), entry.totalBuckets()),
k -> new ArrayList<>())
.add(file);
}

String commitUser = UUID.randomUUID().toString();
ManifestCommittable committable = new ManifestCommittable(0);
for (Map.Entry<MergeKey, List<DataFileMeta>> e : grouped.entrySet()) {
MergeKey key = e.getKey();
CommitMessageImpl message =
new CommitMessageImpl(
key.partition,
key.bucket,
key.totalBuckets,
new DataIncrement(
e.getValue(), Collections.emptyList(), Collections.emptyList()),
CompactIncrement.emptyIncrement());
committable.addFileCommittable(message);
}

try (FileStoreCommit commit = branchTable.store().newCommit(commitUser, branchTable)) {
commit.appendCommitCheckConflict(true).commit(committable, true);
}
}

private DataFileMeta prepareFileForTargetCommit(DataFileMeta file, boolean rowTrackingEnabled) {
if (rowTrackingEnabled && file.firstRowId() != null) {
// Source files already have row ids assigned in their branch. Clear them so the
// target branch commit path assigns fresh, non-overlapping row ids.
return file.newFirstRowId(null);
}
return file;
}

private static class MergeKey {
final BinaryRow partition;
final int bucket;
final int totalBuckets;

MergeKey(BinaryRow partition, int bucket, int totalBuckets) {
this.partition = partition;
this.bucket = bucket;
this.totalBuckets = totalBuckets;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MergeKey)) {
return false;
}
MergeKey that = (MergeKey) o;
return bucket == that.bucket
&& totalBuckets == that.totalBuckets
&& Objects.equals(partition, that.partition);
}

@Override
public int hashCode() {
return Objects.hash(partition, bucket, totalBuckets);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public void fastForward(String branchName) {
});
}

@Override
public void mergeBranch(String sourceBranch, String targetBranch) {
throw new UnsupportedOperationException("Branch merge is not supported via catalog.");
}

@Override
public void renameBranch(String fromBranch, String toBranch) {
executePost(catalog -> catalog.renameBranch(identifier, fromBranch, toBranch));
Expand Down
Loading
Loading