From dce55ce8ed40b48de5ff8ec038c4cdde2fe40954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 15 May 2026 16:42:24 +0800 Subject: [PATCH 01/11] [core] DataEvolution table supports concurrent updates to different columns. --- .../org/apache/paimon/AbstractFileStore.java | 1 + .../paimon/manifest/ExpireFileEntry.java | 12 +- .../org/apache/paimon/manifest/FileEntry.java | 5 + .../apache/paimon/manifest/ManifestEntry.java | 11 + .../paimon/manifest/SimpleFileEntry.java | 39 ++- .../manifest/SimpleFileEntryWithDV.java | 4 +- .../operation/commit/ConflictDetection.java | 41 +-- .../commit/RowIdColumnConflictChecker.java | 221 ++++++++++++ .../paimon/operation/FileStoreCommitTest.java | 1 + .../commit/ConflictDetectionTest.java | 321 ++++++++++++++++++ .../spark/sql/RowTrackingTestBase.scala | 42 +++ 11 files changed, 669 insertions(+), 29 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 9aad0259cdf8..cd962f9abb29 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -284,6 +284,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { tableName, commitUser, partitionType, + schemaManager, pathFactory(), newKeyComparator(), bucketMode(), diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java index 024eac2b2096..512c942fdd82 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java @@ -45,7 +45,9 @@ public ExpireFileEntry( @Nullable FileSource fileSource, @Nullable String externalPath, long rowCount, - @Nullable Long firstRowId) { + @Nullable Long firstRowId, + long schemaId, + @Nullable List writeCols) { super( kind, partition, @@ -59,7 +61,9 @@ public ExpireFileEntry( maxKey, externalPath, rowCount, - firstRowId); + firstRowId, + schemaId, + writeCols); this.fileSource = fileSource; } @@ -82,7 +86,9 @@ public static ExpireFileEntry from(ManifestEntry entry) { entry.file().fileSource().orElse(null), entry.externalPath(), entry.rowCount(), - entry.firstRowId()); + entry.firstRowId(), + entry.file().schemaId(), + entry.file().writeCols()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 3a8008825581..09c1e09d598c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -72,6 +72,11 @@ public interface FileEntry { @Nullable Long firstRowId(); + long schemaId(); + + @Nullable + List writeCols(); + /** * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data * file. diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index f95ba19221a2..1ec274935df0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -58,6 +58,17 @@ static ManifestEntry create( DataFileMeta file(); + @Override + default long schemaId() { + return file().schemaId(); + } + + @Nullable + @Override + default List writeCols() { + return file().writeCols(); + } + ManifestEntry copyWithoutStats(); ManifestEntry assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index 77ae12b580df..c5ce43362b79 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -45,6 +45,8 @@ public class SimpleFileEntry implements FileEntry { @Nullable private final String externalPath; private final long rowCount; @Nullable private final Long firstRowId; + private final long schemaId; + @Nullable private final List writeCols; public SimpleFileEntry( FileKind kind, @@ -59,7 +61,9 @@ public SimpleFileEntry( BinaryRow maxKey, @Nullable String externalPath, long rowCount, - @Nullable Long firstRowId) { + @Nullable Long firstRowId, + long schemaId, + @Nullable List writeCols) { this.kind = kind; this.partition = partition; this.bucket = bucket; @@ -73,6 +77,8 @@ public SimpleFileEntry( this.externalPath = externalPath; this.rowCount = rowCount; this.firstRowId = firstRowId; + this.schemaId = schemaId; + this.writeCols = writeCols; } public static SimpleFileEntry from(ManifestEntry entry) { @@ -89,7 +95,9 @@ public static SimpleFileEntry from(ManifestEntry entry) { entry.maxKey(), entry.externalPath(), entry.file().rowCount(), - entry.firstRowId()); + entry.firstRowId(), + entry.file().schemaId(), + entry.file().writeCols()); } public SimpleFileEntry toDelete() { @@ -106,7 +114,9 @@ public SimpleFileEntry toDelete() { maxKey, externalPath, rowCount, - firstRowId); + firstRowId, + schemaId, + writeCols); } public static List from(List entries) { @@ -185,6 +195,17 @@ public long rowCount() { return firstRowId; } + @Override + public long schemaId() { + return schemaId; + } + + @Nullable + @Override + public List writeCols() { + return writeCols; + } + public long nonNullFirstRowId() { Long firstRowId = firstRowId(); checkArgument(firstRowId != null, "First row id of '%s' should not be null.", fileName()); @@ -216,7 +237,9 @@ public boolean equals(Object o) { && Objects.equals(maxKey, that.maxKey) && Objects.equals(externalPath, that.externalPath) && rowCount == that.rowCount - && Objects.equals(firstRowId, that.firstRowId); + && Objects.equals(firstRowId, that.firstRowId) + && schemaId == that.schemaId + && Objects.equals(writeCols, that.writeCols); } @Override @@ -233,7 +256,9 @@ public int hashCode() { maxKey, externalPath, rowCount, - firstRowId); + firstRowId, + schemaId, + writeCols); } @Override @@ -263,6 +288,10 @@ public String toString() { + rowCount + ", firstRowId=" + firstRowId + + ", schemaId=" + + schemaId + + ", writeCols=" + + writeCols + '}'; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java index 6b4c47915752..e2a82dfbef0c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java @@ -41,7 +41,9 @@ public SimpleFileEntryWithDV(SimpleFileEntry entry, @Nullable String dvFileName) entry.maxKey(), entry.externalPath(), entry.rowCount(), - entry.firstRowId()); + entry.firstRowId(), + entry.schemaId(), + entry.writeCols()); this.dvFileName = dvFileName; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 46d5c436192d..63ee4bee82ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -33,11 +33,11 @@ import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.manifest.SimpleFileEntryWithDV; import org.apache.paimon.operation.PartitionExpire; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RangeHelper; import org.apache.paimon.utils.SnapshotManager; @@ -64,6 +64,7 @@ import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions; +import static org.apache.paimon.types.VectorType.isVectorStoreFile; import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString; import static org.apache.paimon.utils.Preconditions.checkState; @@ -76,6 +77,7 @@ public class ConflictDetection { private final String tableName; private final String commitUser; private final RowType partitionType; + private final SchemaManager schemaManager; private final FileStorePathFactory pathFactory; private final @Nullable Comparator keyComparator; private final BucketMode bucketMode; @@ -100,6 +102,7 @@ public ConflictDetection( String tableName, String commitUser, RowType partitionType, + SchemaManager schemaManager, FileStorePathFactory pathFactory, @Nullable Comparator keyComparator, BucketMode bucketMode, @@ -112,6 +115,7 @@ public ConflictDetection( this.tableName = tableName; this.commitUser = commitUser; this.partitionType = partitionType; + this.schemaManager = schemaManager; this.pathFactory = pathFactory; this.keyComparator = keyComparator; this.bucketMode = bucketMode; @@ -473,7 +477,7 @@ private Optional checkRowIdRangeConflicts( for (List group : merged) { List dataFiles = new ArrayList<>(); for (SimpleFileEntry f : group) { - if (!isBlobFile(f.fileName())) { + if (!dedicatedStorageFile(f.fileName())) { dataFiles.add(f); } } @@ -500,14 +504,10 @@ private Optional checkForRowIdFromSnapshot( } List changedPartitions = changedPartitions(deltaEntries, deltaIndexEntries); - // collect history row id ranges - List historyIdRanges = new ArrayList<>(); - for (SimpleFileEntry entry : deltaEntries) { - Long firstRowId = entry.firstRowId(); - long rowCount = entry.rowCount(); - if (firstRowId != null) { - historyIdRanges.add(new Range(firstRowId, firstRowId + rowCount - 1)); - } + RowIdColumnConflictChecker conflictChecker = + RowIdColumnConflictChecker.fromDeltaEntries(schemaManager, deltaEntries); + if (conflictChecker.isEmpty()) { + return Optional.empty(); } // check history row id ranges @@ -525,16 +525,13 @@ private Optional checkForRowIdFromSnapshot( commitScanner.readIncrementalEntries(snapshot, changedPartitions); for (ManifestEntry entry : changes) { DataFileMeta file = entry.file(); - Range fileRange = file.nonNullRowIdRange(); - if (fileRange.from < checkNextRowId) { - for (Range range : historyIdRanges) { - if (range.hasIntersection(fileRange)) { - return Optional.of( - new RuntimeException( - "For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts," - + " updating the same file, which can render some updates ineffective.")); - } - } + if (file.firstRowId() != null + && file.nonNullRowIdRange().from < checkNextRowId + && conflictChecker.conflictsWith(entry)) { + return Optional.of( + new RuntimeException( + "For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts," + + " updating the same file, which can render some updates ineffective.")); } } } @@ -542,6 +539,10 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } + private static boolean dedicatedStorageFile(String fileName) { + return isBlobFile(fileName) || isVectorStoreFile(fileName); + } + static List buildBaseEntriesWithDV( List baseEntries, List baseIndexEntries) { if (baseEntries.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java new file mode 100644 index 000000000000..2f828981bd07 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.DataField; +import org.apache.paimon.utils.Range; +import org.apache.paimon.utils.RangeHelper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** Detects row-id range conflicts only when written field ids overlap. */ +class RowIdColumnConflictChecker { + + private final SchemaManager schemaManager; + private final List writeRanges; + private final Map> fieldIdByNameCache = new HashMap<>(); + + private RowIdColumnConflictChecker( + SchemaManager schemaManager, List deltaEntries) { + this.schemaManager = schemaManager; + this.writeRanges = buildWriteRanges(deltaEntries); + } + + static RowIdColumnConflictChecker fromDeltaEntries( + SchemaManager schemaManager, List deltaEntries) { + return new RowIdColumnConflictChecker(schemaManager, deltaEntries); + } + + private List buildWriteRanges(List deltaEntries) { + List rowIdEntries = new ArrayList<>(); + for (SimpleFileEntry entry : deltaEntries) { + if (entry.firstRowId() != null) { + rowIdEntries.add(entry); + } + } + + if (rowIdEntries.isEmpty()) { + return Collections.emptyList(); + } + + RangeHelper rangeHelper = + new RangeHelper<>(SimpleFileEntry::nonNullRowIdRange); + List writeRanges = new ArrayList<>(); + for (List group : rangeHelper.mergeOverlappingRanges(rowIdEntries)) { + Range range = mergeRange(group); + Set fieldIds = new HashSet<>(); + for (SimpleFileEntry entry : group) { + addWriteFieldIds(fieldIds, entry); + } + + if (!fieldIds.isEmpty()) { + writeRanges.add(new WriteRange(range, fieldIds)); + } + } + writeRanges.sort( + Comparator.comparingLong((WriteRange writeRange) -> writeRange.range.from) + .thenComparingLong(writeRange -> writeRange.range.to)); + + return writeRanges; + } + + boolean isEmpty() { + return writeRanges.isEmpty(); + } + + boolean conflictsWith(FileEntry entry) { + if (entry.firstRowId() == null) { + return false; + } + + Range range = rowIdRange(entry); + int index = firstPossibleRange(range); + while (index < writeRanges.size()) { + WriteRange writeRange = writeRanges.get(index); + if (writeRange.range.from > range.to) { + break; + } + if (writeRange.range.hasIntersection(range) + && containsAnyWriteField(writeRange.fieldIds, entry)) { + return true; + } + index++; + } + return false; + } + + private static Range mergeRange(List entries) { + long from = Long.MAX_VALUE; + long to = Long.MIN_VALUE; + for (SimpleFileEntry entry : entries) { + Range range = entry.nonNullRowIdRange(); + from = Math.min(from, range.from); + to = Math.max(to, range.to); + } + return new Range(from, to); + } + + private int firstPossibleRange(Range range) { + int low = 0; + int high = writeRanges.size(); + while (low < high) { + int mid = (low + high) >>> 1; + if (writeRanges.get(mid).range.to < range.from) { + low = mid + 1; + } else { + high = mid; + } + } + return low; + } + + private void addWriteFieldIds(Set fieldIds, FileEntry entry) { + List writeCols = entry.writeCols(); + if (writeCols == null) { + throw nullWriteColsException(entry); + } + + for (String writeCol : writeCols) { + Integer fieldId = fieldId(entry, writeCol); + if (fieldId != null && !SpecialFields.isSystemField(fieldId)) { + fieldIds.add(fieldId); + } + } + } + + private boolean containsAnyWriteField(Set fieldIds, FileEntry entry) { + List writeCols = entry.writeCols(); + if (writeCols == null) { + throw nullWriteColsException(entry); + } + + for (String writeCol : writeCols) { + Integer fieldId = fieldId(entry, writeCol); + if (fieldId != null + && !SpecialFields.isSystemField(fieldId) + && fieldIds.contains(fieldId)) { + return true; + } + } + return false; + } + + private Integer fieldId(FileEntry entry, String writeCol) { + Integer fieldId = + fieldIdByNameCache + .computeIfAbsent(entry.schemaId(), this::fieldIdByName) + .get(writeCol); + if (fieldId == null) { + if (SpecialFields.isSystemField(writeCol)) { + return null; + } + throw new RuntimeException( + String.format( + "Cannot find write column '%s' in schema %s.", + writeCol, entry.schemaId())); + } + return fieldId; + } + + private Map fieldIdByName(long schemaId) { + Map fieldIdByName = new HashMap<>(); + for (DataField field : schemaManager.schema(schemaId).logicalRowType().getFields()) { + fieldIdByName.put(field.name(), field.id()); + } + return fieldIdByName; + } + + private static RuntimeException nullWriteColsException(FileEntry entry) { + return new RuntimeException( + String.format( + "Write columns of row-id file '%s' in schema %s cannot be null.", + entry.fileName(), entry.schemaId())); + } + + private static Range rowIdRange(FileEntry entry) { + Long firstRowId = entry.firstRowId(); + if (firstRowId == null) { + throw new IllegalArgumentException( + String.format("First row id of '%s' should not be null.", entry.fileName())); + } + return new Range(firstRowId, firstRowId + entry.rowCount() - 1); + } + + private static class WriteRange { + + private final Range range; + private final Set fieldIds; + + private WriteRange(Range range, Set fieldIds) { + this.range = range; + this.fieldIds = fieldIds; + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 71eb081de89f..49ad673f50ec 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1106,6 +1106,7 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( tableName, commitUser, store.partitionType(), + new SchemaManager(store.fileIO(), store.options().path()), store.pathFactory(), store.newKeyComparator(), store.bucketMode(), diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index c3e0258da28f..e64361a94c45 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -18,15 +18,29 @@ package org.apache.paimon.operation.commit; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.Snapshot.CommitKind; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.manifest.SimpleFileEntryWithDV; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.SchemaEvolutionTableTestBase.TestingSchemaManager; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.Test; @@ -36,8 +50,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; @@ -45,7 +62,9 @@ import static org.apache.paimon.manifest.FileKind.DELETE; import static org.apache.paimon.operation.commit.ConflictDetection.buildBaseEntriesWithDV; import static org.apache.paimon.operation.commit.ConflictDetection.buildDeltaEntriesWithDV; +import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; class ConflictDetectionTest { @@ -82,6 +101,108 @@ public void testBuildBaseEntriesWithDV() { } } + @Test + void testSimpleFileEntryFromManifestEntryPreservesSchemaAndWriteCols() { + ManifestEntry manifestEntry = + ManifestEntry.create( + ADD, + EMPTY_ROW, + 0, + 1, + createDataFileMeta("f1", 0L, 1L, 1L, Arrays.asList("b"))); + + SimpleFileEntry entry = SimpleFileEntry.from(manifestEntry); + assertThat(entry.schemaId()).isEqualTo(1L); + assertThat(entry.writeCols()).containsExactly("b"); + + SimpleFileEntry delete = entry.toDelete(); + assertThat(delete.schemaId()).isEqualTo(1L); + assertThat(delete.writeCols()).containsExactly("b"); + } + + @Test + void testRowIdConflictAllowsDisjointWriteColumns() { + Optional conflict = + checkRowIdConflict(Arrays.asList("b"), 0L, Arrays.asList("c"), 0L); + + assertThat(conflict).isEmpty(); + } + + @Test + void testRowIdConflictDetectsSameWriteColumns() { + Optional conflict = + checkRowIdConflict(Arrays.asList("b"), 0L, Arrays.asList("b"), 0L); + + assertThat(conflict).isPresent(); + assertThat(conflict.get().getMessage()) + .contains("multiple 'MERGE INTO' operations have encountered conflicts"); + } + + @Test + void testRowIdConflictRequiresWriteColumns() { + assertThatThrownBy(() -> checkRowIdConflict(null, 0L, Arrays.asList("b"), 0L)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Write columns of row-id file"); + } + + @Test + void testRowIdConflictUsesFieldIdAcrossRename() { + Optional conflict = + checkRowIdConflict(Arrays.asList("b_renamed"), 1L, Arrays.asList("b"), 0L); + + assertThat(conflict).isPresent(); + } + + @Test + void testRowIdConflictIndexMergesOverlappedDeltaRanges() { + RowIdColumnConflictChecker checker = + RowIdColumnConflictChecker.fromDeltaEntries( + createSchemaManager(), + Arrays.asList( + createFileEntry("current-b", ADD, 0L, 11L, 0L, Arrays.asList("b")), + createFileEntry( + "current-c", ADD, 5L, 11L, 0L, Arrays.asList("c")))); + + ManifestEntry historicalB = + ManifestEntry.create( + ADD, + EMPTY_ROW, + 0, + 1, + createDataFileMeta("historical-b", 12L, 1L, 0L, Arrays.asList("b"))); + ManifestEntry historicalC = + ManifestEntry.create( + ADD, + EMPTY_ROW, + 0, + 1, + createDataFileMeta("historical-c", 12L, 1L, 0L, Arrays.asList("c"))); + + assertThat(checker.conflictsWith(historicalB)).isTrue(); + assertThat(checker.conflictsWith(historicalC)).isTrue(); + } + + @Test + void testRowIdConflictIndexScansAllOverlappedRanges() { + RowIdColumnConflictChecker checker = + RowIdColumnConflictChecker.fromDeltaEntries( + createSchemaManager(), + Arrays.asList( + createFileEntry("current-b", ADD, 0L, 5L, 0L, Arrays.asList("b")), + createFileEntry( + "current-c", ADD, 10L, 5L, 0L, Arrays.asList("c")))); + + ManifestEntry historical = + ManifestEntry.create( + ADD, + EMPTY_ROW, + 0, + 1, + createDataFileMeta("historical", 3L, 10L, 0L, Arrays.asList("c"))); + + assertThat(checker.conflictsWith(historical)).isTrue(); + } + @Test public void testBuildDeltaEntriesWithDV() { { @@ -307,6 +428,8 @@ private SimpleFileEntry createFileEntry(String fileName, FileKind kind) { EMPTY_ROW, null, 0L, + null, + 0L, null); } @@ -379,6 +502,7 @@ private ConflictDetection createConflictDetection() { "test-table", "test-user", RowType.of(), + createSchemaManager(), null, null, BucketMode.HASH_FIXED, @@ -389,4 +513,201 @@ private ConflictDetection createConflictDetection() { null, null); } + + private Optional checkRowIdConflict( + @Nullable List currentWriteCols, + long currentSchemaId, + @Nullable List historicalWriteCols, + long historicalSchemaId) { + Snapshot baseSnapshot = snapshot(1L, CommitKind.APPEND, 10L); + Snapshot latestSnapshot = snapshot(2L, CommitKind.APPEND, 10L); + + ManifestEntry historicalEntry = + ManifestEntry.create( + ADD, + EMPTY_ROW, + 0, + 1, + createDataFileMeta( + "historical", 0L, 10L, historicalSchemaId, historicalWriteCols)); + + SnapshotManager snapshotManager = new TestingSnapshotManager(baseSnapshot, latestSnapshot); + CommitScanner commitScanner = + new TestingCommitScanner( + latestSnapshot.id(), Collections.singletonList(historicalEntry)); + + ConflictDetection detection = + new ConflictDetection( + "test-table", + "test-user", + RowType.of(), + createSchemaManager(), + null, + null, + BucketMode.HASH_FIXED, + false, + true, + false, + null, + snapshotManager, + commitScanner); + detection.setRowIdCheckFromSnapshot(1L); + + return detection.checkConflicts( + latestSnapshot, + Collections.emptyList(), + Collections.singletonList( + createFileEntry( + "current", ADD, 0L, 10L, currentSchemaId, currentWriteCols)), + Collections.emptyList(), + CommitKind.APPEND); + } + + private SimpleFileEntry createFileEntry( + String fileName, + FileKind kind, + @Nullable Long firstRowId, + long rowCount, + long schemaId, + @Nullable List writeCols) { + return new SimpleFileEntry( + kind, + EMPTY_ROW, + 0, + 1, + 0, + fileName, + Collections.emptyList(), + null, + EMPTY_ROW, + EMPTY_ROW, + null, + rowCount, + firstRowId, + schemaId, + writeCols); + } + + private DataFileMeta createDataFileMeta( + String fileName, + @Nullable Long firstRowId, + long rowCount, + long schemaId, + @Nullable List writeCols) { + return DataFileMeta.create( + fileName, + 0L, + rowCount, + EMPTY_ROW, + EMPTY_ROW, + EMPTY_STATS, + EMPTY_STATS, + 0L, + 0L, + schemaId, + 0, + Collections.emptyList(), + null, + null, + null, + null, + null, + firstRowId, + writeCols); + } + + private SchemaManager createSchemaManager() { + Map schemas = new HashMap<>(); + schemas.put( + 0L, + org.apache.paimon.schema.TableSchema.create( + 0L, + new Schema( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "b", DataTypes.INT()), + new DataField(2, "c", DataTypes.INT())), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap(), + ""))); + schemas.put( + 1L, + org.apache.paimon.schema.TableSchema.create( + 1L, + new Schema( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "b_renamed", DataTypes.INT()), + new DataField(2, "c", DataTypes.INT())), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap(), + ""))); + return new TestingSchemaManager(new Path("/tmp/conflict-detection-test"), schemas); + } + + private Snapshot snapshot(long id, CommitKind commitKind, @Nullable Long nextRowId) { + return new Snapshot( + id, + 0L, + "base", + 0L, + "delta", + 0L, + null, + null, + null, + "user", + id, + commitKind, + 0L, + 0L, + 0L, + null, + null, + null, + null, + nextRowId); + } + + private static class TestingSnapshotManager extends SnapshotManager { + + private final Map snapshots = new HashMap<>(); + + private TestingSnapshotManager(Snapshot... snapshots) { + super( + LocalFileIO.create(), + new Path("/tmp/conflict-detection-snapshot-test"), + null, + null, + null); + for (Snapshot snapshot : snapshots) { + this.snapshots.put(snapshot.id(), snapshot); + } + } + + @Override + public Snapshot snapshot(long snapshotId) { + return snapshots.get(snapshotId); + } + } + + private static class TestingCommitScanner extends CommitScanner { + + private final long snapshotId; + private final List entries; + + private TestingCommitScanner(long snapshotId, List entries) { + super(() -> null, null, new CoreOptions(new Options())); + this.snapshotId = snapshotId; + this.entries = entries; + } + + @Override + public List readIncrementalEntries( + Snapshot snapshot, List changedPartitions) { + return snapshot.id() == snapshotId ? entries : Collections.emptyList(); + } + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index e012656cc8cc..ff54322b92a8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -140,6 +140,48 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } + test("Data Evolution: concurrent merge with disjoint update columns") { + withTable("sb", "sc", "t") { + sql(s""" + CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ( + 'row-tracking.enabled' = 'true', + 'data-evolution.enabled' = 'true') + """) + sql("INSERT INTO t VALUES (1, 0, 0)") + Seq((1, 1)).toDF("id", "b").createOrReplaceTempView("sb") + Seq((1, 1)).toDF("id", "c").createOrReplaceTempView("sc") + + val mergeB = Future { + for (_ <- 1 to 10) { + sql(s""" + |MERGE INTO t + |USING sb + |ON t.id = sb.id + |WHEN MATCHED THEN + |UPDATE SET t.b = sb.b + t.b + |""".stripMargin).collect() + } + } + + val mergeC = Future { + for (_ <- 1 to 10) { + sql(s""" + |MERGE INTO t + |USING sc + |ON t.id = sc.id + |WHEN MATCHED THEN + |UPDATE SET t.c = sc.c + t.c + |""".stripMargin).collect() + } + } + + Await.result(mergeB, 60.seconds) + Await.result(mergeC, 60.seconds) + + checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 10, 10))) + } + } + test("Data Evolution: concurrent merge and small files compact") { withTable("s", "t") { sql(s""" From 125dca8b39112c9966ef0b08080a605249ee9ded Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 15 May 2026 17:48:34 +0800 Subject: [PATCH 02/11] fix test --- .../operation/commit/ConflictDetection.java | 6 +- .../commit/RowIdColumnConflictChecker.java | 116 +++++++++++------- 2 files changed, 72 insertions(+), 50 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 63ee4bee82ea..b70f90600294 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -504,9 +504,9 @@ private Optional checkForRowIdFromSnapshot( } List changedPartitions = changedPartitions(deltaEntries, deltaIndexEntries); - RowIdColumnConflictChecker conflictChecker = + RowIdColumnConflictChecker columnChecker = RowIdColumnConflictChecker.fromDeltaEntries(schemaManager, deltaEntries); - if (conflictChecker.isEmpty()) { + if (columnChecker.isEmpty()) { return Optional.empty(); } @@ -527,7 +527,7 @@ private Optional checkForRowIdFromSnapshot( DataFileMeta file = entry.file(); if (file.firstRowId() != null && file.nonNullRowIdRange().from < checkNextRowId - && conflictChecker.conflictsWith(entry)) { + && columnChecker.conflictsWith(entry)) { return Optional.of( new RuntimeException( "For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts," diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java index 2f828981bd07..e14a48873ded 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -34,8 +34,19 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; -/** Detects row-id range conflicts only when written field ids overlap. */ +/** + * Detects row-id range conflicts only when written field ids overlap. The detection process is as + * below: + * + *
    + *
  1. Merge delta files by row range and calculate updated columns. + *
  2. Sort those items by range. + *
  3. For each checking files, do binary search to find overlapping ranges. If their updated + * columns also overlaps, return conflicting result. + *
+ */ class RowIdColumnConflictChecker { private final SchemaManager schemaManager; @@ -54,17 +65,16 @@ static RowIdColumnConflictChecker fromDeltaEntries( } private List buildWriteRanges(List deltaEntries) { - List rowIdEntries = new ArrayList<>(); - for (SimpleFileEntry entry : deltaEntries) { - if (entry.firstRowId() != null) { - rowIdEntries.add(entry); - } - } + List rowIdEntries = + deltaEntries.stream() + .filter(entry -> entry.firstRowId() != null) + .collect(Collectors.toList()); if (rowIdEntries.isEmpty()) { return Collections.emptyList(); } + // 1. merge overlapping ranges and calculate [Range, Set] tuples. RangeHelper rangeHelper = new RangeHelper<>(SimpleFileEntry::nonNullRowIdRange); List writeRanges = new ArrayList<>(); @@ -75,10 +85,10 @@ private List buildWriteRanges(List deltaEntries) { addWriteFieldIds(fieldIds, entry); } - if (!fieldIds.isEmpty()) { - writeRanges.add(new WriteRange(range, fieldIds)); - } + writeRanges.add(new WriteRange(range, fieldIds)); } + + // 2. sort by range for binary search writeRanges.sort( Comparator.comparingLong((WriteRange writeRange) -> writeRange.range.from) .thenComparingLong(writeRange -> writeRange.range.to)); @@ -86,10 +96,47 @@ private List buildWriteRanges(List deltaEntries) { return writeRanges; } + private void addWriteFieldIds(Set fieldIds, FileEntry entry) { + List writeCols = entry.writeCols(); + if (writeCols == null) { + fieldIds.addAll( + fieldIdByNameCache + .computeIfAbsent(entry.schemaId(), this::fieldIdByName) + .values()); + return; + } + + for (String writeCol : writeCols) { + Integer fieldId = fieldId(entry, writeCol); + if (fieldId != null) { + fieldIds.add(fieldId); + } + } + } + + private static Range mergeRange(List entries) { + long from = Long.MAX_VALUE; + long to = Long.MIN_VALUE; + for (SimpleFileEntry entry : entries) { + Range range = entry.nonNullRowIdRange(); + from = Math.min(from, range.from); + to = Math.max(to, range.to); + } + return new Range(from, to); + } + boolean isEmpty() { return writeRanges.isEmpty(); } + /** + * Check whether a committed incremental file entry conflicts with current committing delta + * files. If an existing file has both overlapping row range and overlapping write fields, then + * it conflicts. + * + * @param entry committed incremental file entry + * @return true if conflict + */ boolean conflictsWith(FileEntry entry) { if (entry.firstRowId() == null) { return false; @@ -100,8 +147,9 @@ boolean conflictsWith(FileEntry entry) { while (index < writeRanges.size()) { WriteRange writeRange = writeRanges.get(index); if (writeRange.range.from > range.to) { - break; + return false; } + // overlapping row range and overlapping write fields if (writeRange.range.hasIntersection(range) && containsAnyWriteField(writeRange.fieldIds, entry)) { return true; @@ -111,17 +159,12 @@ && containsAnyWriteField(writeRange.fieldIds, entry)) { return false; } - private static Range mergeRange(List entries) { - long from = Long.MAX_VALUE; - long to = Long.MIN_VALUE; - for (SimpleFileEntry entry : entries) { - Range range = entry.nonNullRowIdRange(); - from = Math.min(from, range.from); - to = Math.max(to, range.to); - } - return new Range(from, to); - } - + /** + * Binary search to find the first range whose `to` >= target range's `from` + * + * @param range querying range + * @return index of the first range + */ private int firstPossibleRange(Range range) { int low = 0; int high = writeRanges.size(); @@ -136,31 +179,16 @@ private int firstPossibleRange(Range range) { return low; } - private void addWriteFieldIds(Set fieldIds, FileEntry entry) { - List writeCols = entry.writeCols(); - if (writeCols == null) { - throw nullWriteColsException(entry); - } - - for (String writeCol : writeCols) { - Integer fieldId = fieldId(entry, writeCol); - if (fieldId != null && !SpecialFields.isSystemField(fieldId)) { - fieldIds.add(fieldId); - } - } - } - private boolean containsAnyWriteField(Set fieldIds, FileEntry entry) { List writeCols = entry.writeCols(); + // If write cols == null, it's a full-schema write if (writeCols == null) { - throw nullWriteColsException(entry); + return true; } for (String writeCol : writeCols) { Integer fieldId = fieldId(entry, writeCol); - if (fieldId != null - && !SpecialFields.isSystemField(fieldId) - && fieldIds.contains(fieldId)) { + if (fieldId != null && fieldIds.contains(fieldId)) { return true; } } @@ -192,13 +220,6 @@ private Map fieldIdByName(long schemaId) { return fieldIdByName; } - private static RuntimeException nullWriteColsException(FileEntry entry) { - return new RuntimeException( - String.format( - "Write columns of row-id file '%s' in schema %s cannot be null.", - entry.fileName(), entry.schemaId())); - } - private static Range rowIdRange(FileEntry entry) { Long firstRowId = entry.firstRowId(); if (firstRowId == null) { @@ -208,6 +229,7 @@ private static Range rowIdRange(FileEntry entry) { return new Range(firstRowId, firstRowId + entry.rowCount() - 1); } + /** Range and field id Set. */ private static class WriteRange { private final Range range; From 4b79be9f0e9a1b39f1ce1ad53b7576f97e8718c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 15 May 2026 20:57:25 +0800 Subject: [PATCH 03/11] fix test --- .../commit/RowIdColumnConflictChecker.java | 2 +- .../commit/ConflictDetectionTest.java | 320 +----------------- .../RowIdColumnConflictCheckerTest.java | 176 ++++++++++ 3 files changed, 178 insertions(+), 320 deletions(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java index e14a48873ded..48c343168649 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -160,7 +160,7 @@ && containsAnyWriteField(writeRange.fieldIds, entry)) { } /** - * Binary search to find the first range whose `to` >= target range's `from` + * Binary search to find the first range whose `to` >= target range's `from`. * * @param range querying range * @return index of the first range diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index e64361a94c45..ec91e8a26997 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -18,29 +18,15 @@ package org.apache.paimon.operation.commit; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; -import org.apache.paimon.Snapshot.CommitKind; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileMeta; -import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; -import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.manifest.SimpleFileEntryWithDV; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; -import org.apache.paimon.table.SchemaEvolutionTableTestBase.TestingSchemaManager; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.Test; @@ -50,11 +36,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Optional; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; @@ -62,9 +45,7 @@ import static org.apache.paimon.manifest.FileKind.DELETE; import static org.apache.paimon.operation.commit.ConflictDetection.buildBaseEntriesWithDV; import static org.apache.paimon.operation.commit.ConflictDetection.buildDeltaEntriesWithDV; -import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; class ConflictDetectionTest { @@ -101,108 +82,6 @@ public void testBuildBaseEntriesWithDV() { } } - @Test - void testSimpleFileEntryFromManifestEntryPreservesSchemaAndWriteCols() { - ManifestEntry manifestEntry = - ManifestEntry.create( - ADD, - EMPTY_ROW, - 0, - 1, - createDataFileMeta("f1", 0L, 1L, 1L, Arrays.asList("b"))); - - SimpleFileEntry entry = SimpleFileEntry.from(manifestEntry); - assertThat(entry.schemaId()).isEqualTo(1L); - assertThat(entry.writeCols()).containsExactly("b"); - - SimpleFileEntry delete = entry.toDelete(); - assertThat(delete.schemaId()).isEqualTo(1L); - assertThat(delete.writeCols()).containsExactly("b"); - } - - @Test - void testRowIdConflictAllowsDisjointWriteColumns() { - Optional conflict = - checkRowIdConflict(Arrays.asList("b"), 0L, Arrays.asList("c"), 0L); - - assertThat(conflict).isEmpty(); - } - - @Test - void testRowIdConflictDetectsSameWriteColumns() { - Optional conflict = - checkRowIdConflict(Arrays.asList("b"), 0L, Arrays.asList("b"), 0L); - - assertThat(conflict).isPresent(); - assertThat(conflict.get().getMessage()) - .contains("multiple 'MERGE INTO' operations have encountered conflicts"); - } - - @Test - void testRowIdConflictRequiresWriteColumns() { - assertThatThrownBy(() -> checkRowIdConflict(null, 0L, Arrays.asList("b"), 0L)) - .isInstanceOf(RuntimeException.class) - .hasMessageContaining("Write columns of row-id file"); - } - - @Test - void testRowIdConflictUsesFieldIdAcrossRename() { - Optional conflict = - checkRowIdConflict(Arrays.asList("b_renamed"), 1L, Arrays.asList("b"), 0L); - - assertThat(conflict).isPresent(); - } - - @Test - void testRowIdConflictIndexMergesOverlappedDeltaRanges() { - RowIdColumnConflictChecker checker = - RowIdColumnConflictChecker.fromDeltaEntries( - createSchemaManager(), - Arrays.asList( - createFileEntry("current-b", ADD, 0L, 11L, 0L, Arrays.asList("b")), - createFileEntry( - "current-c", ADD, 5L, 11L, 0L, Arrays.asList("c")))); - - ManifestEntry historicalB = - ManifestEntry.create( - ADD, - EMPTY_ROW, - 0, - 1, - createDataFileMeta("historical-b", 12L, 1L, 0L, Arrays.asList("b"))); - ManifestEntry historicalC = - ManifestEntry.create( - ADD, - EMPTY_ROW, - 0, - 1, - createDataFileMeta("historical-c", 12L, 1L, 0L, Arrays.asList("c"))); - - assertThat(checker.conflictsWith(historicalB)).isTrue(); - assertThat(checker.conflictsWith(historicalC)).isTrue(); - } - - @Test - void testRowIdConflictIndexScansAllOverlappedRanges() { - RowIdColumnConflictChecker checker = - RowIdColumnConflictChecker.fromDeltaEntries( - createSchemaManager(), - Arrays.asList( - createFileEntry("current-b", ADD, 0L, 5L, 0L, Arrays.asList("b")), - createFileEntry( - "current-c", ADD, 10L, 5L, 0L, Arrays.asList("c")))); - - ManifestEntry historical = - ManifestEntry.create( - ADD, - EMPTY_ROW, - 0, - 1, - createDataFileMeta("historical", 3L, 10L, 0L, Arrays.asList("c"))); - - assertThat(checker.conflictsWith(historical)).isTrue(); - } - @Test public void testBuildDeltaEntriesWithDV() { { @@ -502,7 +381,7 @@ private ConflictDetection createConflictDetection() { "test-table", "test-user", RowType.of(), - createSchemaManager(), + null, null, null, BucketMode.HASH_FIXED, @@ -513,201 +392,4 @@ private ConflictDetection createConflictDetection() { null, null); } - - private Optional checkRowIdConflict( - @Nullable List currentWriteCols, - long currentSchemaId, - @Nullable List historicalWriteCols, - long historicalSchemaId) { - Snapshot baseSnapshot = snapshot(1L, CommitKind.APPEND, 10L); - Snapshot latestSnapshot = snapshot(2L, CommitKind.APPEND, 10L); - - ManifestEntry historicalEntry = - ManifestEntry.create( - ADD, - EMPTY_ROW, - 0, - 1, - createDataFileMeta( - "historical", 0L, 10L, historicalSchemaId, historicalWriteCols)); - - SnapshotManager snapshotManager = new TestingSnapshotManager(baseSnapshot, latestSnapshot); - CommitScanner commitScanner = - new TestingCommitScanner( - latestSnapshot.id(), Collections.singletonList(historicalEntry)); - - ConflictDetection detection = - new ConflictDetection( - "test-table", - "test-user", - RowType.of(), - createSchemaManager(), - null, - null, - BucketMode.HASH_FIXED, - false, - true, - false, - null, - snapshotManager, - commitScanner); - detection.setRowIdCheckFromSnapshot(1L); - - return detection.checkConflicts( - latestSnapshot, - Collections.emptyList(), - Collections.singletonList( - createFileEntry( - "current", ADD, 0L, 10L, currentSchemaId, currentWriteCols)), - Collections.emptyList(), - CommitKind.APPEND); - } - - private SimpleFileEntry createFileEntry( - String fileName, - FileKind kind, - @Nullable Long firstRowId, - long rowCount, - long schemaId, - @Nullable List writeCols) { - return new SimpleFileEntry( - kind, - EMPTY_ROW, - 0, - 1, - 0, - fileName, - Collections.emptyList(), - null, - EMPTY_ROW, - EMPTY_ROW, - null, - rowCount, - firstRowId, - schemaId, - writeCols); - } - - private DataFileMeta createDataFileMeta( - String fileName, - @Nullable Long firstRowId, - long rowCount, - long schemaId, - @Nullable List writeCols) { - return DataFileMeta.create( - fileName, - 0L, - rowCount, - EMPTY_ROW, - EMPTY_ROW, - EMPTY_STATS, - EMPTY_STATS, - 0L, - 0L, - schemaId, - 0, - Collections.emptyList(), - null, - null, - null, - null, - null, - firstRowId, - writeCols); - } - - private SchemaManager createSchemaManager() { - Map schemas = new HashMap<>(); - schemas.put( - 0L, - org.apache.paimon.schema.TableSchema.create( - 0L, - new Schema( - Arrays.asList( - new DataField(0, "id", DataTypes.INT()), - new DataField(1, "b", DataTypes.INT()), - new DataField(2, "c", DataTypes.INT())), - Collections.emptyList(), - Collections.singletonList("id"), - Collections.emptyMap(), - ""))); - schemas.put( - 1L, - org.apache.paimon.schema.TableSchema.create( - 1L, - new Schema( - Arrays.asList( - new DataField(0, "id", DataTypes.INT()), - new DataField(1, "b_renamed", DataTypes.INT()), - new DataField(2, "c", DataTypes.INT())), - Collections.emptyList(), - Collections.singletonList("id"), - Collections.emptyMap(), - ""))); - return new TestingSchemaManager(new Path("/tmp/conflict-detection-test"), schemas); - } - - private Snapshot snapshot(long id, CommitKind commitKind, @Nullable Long nextRowId) { - return new Snapshot( - id, - 0L, - "base", - 0L, - "delta", - 0L, - null, - null, - null, - "user", - id, - commitKind, - 0L, - 0L, - 0L, - null, - null, - null, - null, - nextRowId); - } - - private static class TestingSnapshotManager extends SnapshotManager { - - private final Map snapshots = new HashMap<>(); - - private TestingSnapshotManager(Snapshot... snapshots) { - super( - LocalFileIO.create(), - new Path("/tmp/conflict-detection-snapshot-test"), - null, - null, - null); - for (Snapshot snapshot : snapshots) { - this.snapshots.put(snapshot.id(), snapshot); - } - } - - @Override - public Snapshot snapshot(long snapshotId) { - return snapshots.get(snapshotId); - } - } - - private static class TestingCommitScanner extends CommitScanner { - - private final long snapshotId; - private final List entries; - - private TestingCommitScanner(long snapshotId, List entries) { - super(() -> null, null, new CoreOptions(new Options())); - this.snapshotId = snapshotId; - this.entries = entries; - } - - @Override - public List readIncrementalEntries( - Snapshot snapshot, List changedPartitions) { - return snapshot.id() == snapshotId ? entries : Collections.emptyList(); - } - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java new file mode 100644 index 000000000000..ed5b43c9c01b --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.SchemaEvolutionTableTestBase.TestingSchemaManager; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class RowIdColumnConflictCheckerTest { + + @Test + void testAllowsDisjointWriteColumns() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 0L, Arrays.asList("b"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, Arrays.asList("c")))) + .isFalse(); + } + + @Test + void testDetectsSameWriteColumns() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 0L, Arrays.asList("b"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, Arrays.asList("b")))) + .isTrue(); + } + + @Test + void testUsesFieldIdAcrossRename() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 1L, Arrays.asList("b_renamed"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, Arrays.asList("b")))) + .isTrue(); + } + + @Test + void testTreatsNullWriteColumnsAsFullSchemaWrite() { + RowIdColumnConflictChecker checker = checker(file("current", 0L, 10L, 0L, null)); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, Arrays.asList("b")))) + .isTrue(); + } + + @Test + void testMergesOverlappedDeltaRangesAndWriteColumns() { + RowIdColumnConflictChecker checker = + checker( + file("current-b", 0L, 11L, 0L, Arrays.asList("b")), + file("current-c", 5L, 11L, 0L, Arrays.asList("c"))); + + assertThat(checker.conflictsWith(file("historical-b", 12L, 1L, 0L, Arrays.asList("b")))) + .isTrue(); + assertThat(checker.conflictsWith(file("historical-c", 12L, 1L, 0L, Arrays.asList("c")))) + .isTrue(); + } + + @Test + void testScansAllOverlappedRangesAfterBinarySearch() { + RowIdColumnConflictChecker checker = + checker( + file("current-b", 0L, 5L, 0L, Arrays.asList("b")), + file("current-c", 10L, 5L, 0L, Arrays.asList("c"))); + + assertThat(checker.conflictsWith(file("historical", 3L, 10L, 0L, Arrays.asList("c")))) + .isTrue(); + } + + @Test + void testFailsOnUnknownNonSystemWriteColumn() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 0L, Arrays.asList("b"))); + + assertThatThrownBy( + () -> + checker.conflictsWith( + file("historical", 0L, 10L, 0L, Arrays.asList("missing")))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Cannot find write column 'missing'"); + } + + private RowIdColumnConflictChecker checker(SimpleFileEntry... entries) { + return RowIdColumnConflictChecker.fromDeltaEntries(createSchemaManager(), Arrays.asList(entries)); + } + + private SimpleFileEntry file( + String fileName, + @Nullable Long firstRowId, + long rowCount, + long schemaId, + @Nullable List writeCols) { + return new SimpleFileEntry( + FileKind.ADD, + EMPTY_ROW, + 0, + 1, + 0, + fileName, + Collections.emptyList(), + null, + EMPTY_ROW, + EMPTY_ROW, + null, + rowCount, + firstRowId, + schemaId, + writeCols); + } + + private SchemaManager createSchemaManager() { + Map schemas = new HashMap<>(); + schemas.put( + 0L, + org.apache.paimon.schema.TableSchema.create( + 0L, + new Schema( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "b", DataTypes.INT()), + new DataField(2, "c", DataTypes.INT())), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap(), + ""))); + schemas.put( + 1L, + org.apache.paimon.schema.TableSchema.create( + 1L, + new Schema( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "b_renamed", DataTypes.INT()), + new DataField(2, "c", DataTypes.INT())), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap(), + ""))); + return new TestingSchemaManager(new Path("/tmp/row-id-column-conflict-checker-test"), schemas); + } +} From c4454b927825dae901752e9a5a1947631675854b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 15 May 2026 21:02:02 +0800 Subject: [PATCH 04/11] fix test --- .../operation/commit/RowIdColumnConflictCheckerTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java index ed5b43c9c01b..db5603d4f928 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java @@ -116,7 +116,8 @@ void testFailsOnUnknownNonSystemWriteColumn() { } private RowIdColumnConflictChecker checker(SimpleFileEntry... entries) { - return RowIdColumnConflictChecker.fromDeltaEntries(createSchemaManager(), Arrays.asList(entries)); + return RowIdColumnConflictChecker.fromDeltaEntries( + createSchemaManager(), Arrays.asList(entries)); } private SimpleFileEntry file( @@ -171,6 +172,7 @@ private SchemaManager createSchemaManager() { Collections.singletonList("id"), Collections.emptyMap(), ""))); - return new TestingSchemaManager(new Path("/tmp/row-id-column-conflict-checker-test"), schemas); + return new TestingSchemaManager( + new Path("/tmp/row-id-column-conflict-checker-test"), schemas); } } From fd656fa63ff3dacaebc67fccd7c1150a95f1a4e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 15 May 2026 21:50:48 +0800 Subject: [PATCH 05/11] add check for flink --- .../flink/action/DataEvolutionMergeIntoAction.java | 13 ++++++++++--- .../DataEvolutionPartialWriteOperator.java | 6 +++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index 019bf6484be7..cc6779772d6e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -126,6 +126,9 @@ public class DataEvolutionMergeIntoAction extends TableActionBase { private int sinkParallelism; + // the snapshot id this action based on + private long baseSnapshotId; + public DataEvolutionMergeIntoAction( String databaseName, String tableName, Map catalogConfig) { super(databaseName, tableName, catalogConfig); @@ -142,6 +145,7 @@ public DataEvolutionMergeIntoAction( throw new UnsupportedOperationException( "merge-into action doesn't support updating an empty table."); } + this.baseSnapshotId = latestSnapshotId; table = table.copy( Collections.singletonMap( @@ -324,7 +328,7 @@ public DataStream> shuffleByFirstRowId( Transformation sourceTransformation = source.getTransformation(); List firstRowIds = ((FileStoreTable) table) - .store().newScan() + .store().newScan().withSnapshot(baseSnapshotId) .withManifestEntryFilter( entry -> entry.file().firstRowId() != null @@ -384,7 +388,8 @@ public DataStream writePartialColumns( return sorted.transform( "PARTIAL WRITE COLUMNS", new CommittableTypeInfo(), - new DataEvolutionPartialWriteOperator((FileStoreTable) table, rowType)) + new DataEvolutionPartialWriteOperator( + (FileStoreTable) table, rowType, baseSnapshotId)) .setParallelism(sinkParallelism); } @@ -409,7 +414,9 @@ public DataStream commit( context -> new StoreCommitter( storeTable, - storeTable.newCommit(context.commitUser()), + storeTable + .newCommit(context.commitUser()) + .rowIdCheckConflict(baseSnapshotId), context), new NoopCommittableStateManager()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java index 278d58306cad..f57393b7ca27 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java @@ -70,6 +70,7 @@ public class DataEvolutionPartialWriteOperator LoggerFactory.getLogger(DataEvolutionPartialWriteOperator.class); private final FileStoreTable table; + private final Long baseSnapshotId; // dataType private final RowType dataType; @@ -91,9 +92,11 @@ public class DataEvolutionPartialWriteOperator private transient AbstractFileStoreWrite tableWrite; private transient Writer writer; - public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType dataType) { + public DataEvolutionPartialWriteOperator( + FileStoreTable table, RowType dataType, Long baseSnapshotId) { this.table = table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G")); + this.baseSnapshotId = baseSnapshotId; List fieldNames = dataType.getFieldNames().stream() .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) @@ -121,6 +124,7 @@ public void open() throws Exception { entry.file().firstRowId() != null && !isBlobFile(entry.file().fileName()) && !isVectorStoreFile(entry.file().fileName())) + .withSnapshot(baseSnapshotId) .plan() .files(); From 092c662b32362f0c2fb693c296d72455868c76ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 15 May 2026 23:35:40 +0800 Subject: [PATCH 06/11] fix serialization --- .../paimon/flink/action/DataEvolutionMergeIntoAction.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index cc6779772d6e..40c89113803c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -396,6 +396,8 @@ public DataStream writePartialColumns( public DataStream commit( DataStream written, Set updatedColumns) { FileStoreTable storeTable = (FileStoreTable) table; + // copy to avoid serialization issue + long baseSnapshotId = this.baseSnapshotId; // Check if some global-indexed columns are updated DataStream checked = From 9eac488fffc03bdbe2962ba39ff5faca23d183fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 15 May 2026 23:54:53 +0800 Subject: [PATCH 07/11] minor optim --- .../commit/RowIdColumnConflictChecker.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java index 48c343168649..5c4d2fff8f31 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -138,11 +138,12 @@ boolean isEmpty() { * @return true if conflict */ boolean conflictsWith(FileEntry entry) { - if (entry.firstRowId() == null) { + Long firstRowId = entry.firstRowId(); + if (firstRowId == null) { return false; } - Range range = rowIdRange(entry); + Range range = new Range(firstRowId, firstRowId + entry.rowCount() - 1); int index = firstPossibleRange(range); while (index < writeRanges.size()) { WriteRange writeRange = writeRanges.get(index); @@ -220,15 +221,6 @@ private Map fieldIdByName(long schemaId) { return fieldIdByName; } - private static Range rowIdRange(FileEntry entry) { - Long firstRowId = entry.firstRowId(); - if (firstRowId == null) { - throw new IllegalArgumentException( - String.format("First row id of '%s' should not be null.", entry.fileName())); - } - return new Range(firstRowId, firstRowId + entry.rowCount() - 1); - } - /** Range and field id Set. */ private static class WriteRange { From e063abaa12ce93c67c8c4e02136387b90cc5883b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Mon, 18 May 2026 11:11:34 +0800 Subject: [PATCH 08/11] remove unstable tests --- .../flink/action/DataEvolutionMergeIntoActionITCase.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java index 96edb8d75abb..ed8f46eddfe5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java @@ -742,10 +742,6 @@ public void testUpdateNonBlobColumnOnRawBlobTableWithSplitFiles() throws Excepti "(1, 'name1', X'48656C6C6F')", "(2, 'name2', X'5945')", "(3, 'name3', X'414243')"); - testBatchRead( - "SELECT COUNT(*) FROM `RAW_BLOB_SPLIT_T$files` " - + "WHERE file_path NOT LIKE '%.blob'", - Collections.singletonList(changelogRow("+I", 1L))); testBatchRead( "SELECT COUNT(*) > 1 FROM `RAW_BLOB_SPLIT_T$files` " + "WHERE file_path LIKE '%.blob'", From 54bd8b169987b6d220f040a128cf71fbd3013d3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 21 May 2026 22:28:28 +0800 Subject: [PATCH 09/11] fix comments --- .../paimon/manifest/ExpireFileEntry.java | 12 +--- .../org/apache/paimon/manifest/FileEntry.java | 5 -- .../apache/paimon/manifest/ManifestEntry.java | 11 --- .../paimon/manifest/SimpleFileEntry.java | 39 ++-------- .../manifest/SimpleFileEntryWithDV.java | 4 +- .../paimon/operation/FileStoreCommitImpl.java | 11 +++ .../operation/commit/ConflictDetection.java | 19 +++-- .../commit/RowIdColumnConflictChecker.java | 71 +++++++++---------- .../commit/ConflictDetectionTest.java | 2 - .../RowIdColumnConflictCheckerTest.java | 32 ++++----- 10 files changed, 78 insertions(+), 128 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java index 512c942fdd82..024eac2b2096 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java @@ -45,9 +45,7 @@ public ExpireFileEntry( @Nullable FileSource fileSource, @Nullable String externalPath, long rowCount, - @Nullable Long firstRowId, - long schemaId, - @Nullable List writeCols) { + @Nullable Long firstRowId) { super( kind, partition, @@ -61,9 +59,7 @@ public ExpireFileEntry( maxKey, externalPath, rowCount, - firstRowId, - schemaId, - writeCols); + firstRowId); this.fileSource = fileSource; } @@ -86,9 +82,7 @@ public static ExpireFileEntry from(ManifestEntry entry) { entry.file().fileSource().orElse(null), entry.externalPath(), entry.rowCount(), - entry.firstRowId(), - entry.file().schemaId(), - entry.file().writeCols()); + entry.firstRowId()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 09c1e09d598c..3a8008825581 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -72,11 +72,6 @@ public interface FileEntry { @Nullable Long firstRowId(); - long schemaId(); - - @Nullable - List writeCols(); - /** * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data * file. diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 1ec274935df0..f95ba19221a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -58,17 +58,6 @@ static ManifestEntry create( DataFileMeta file(); - @Override - default long schemaId() { - return file().schemaId(); - } - - @Nullable - @Override - default List writeCols() { - return file().writeCols(); - } - ManifestEntry copyWithoutStats(); ManifestEntry assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index c5ce43362b79..77ae12b580df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -45,8 +45,6 @@ public class SimpleFileEntry implements FileEntry { @Nullable private final String externalPath; private final long rowCount; @Nullable private final Long firstRowId; - private final long schemaId; - @Nullable private final List writeCols; public SimpleFileEntry( FileKind kind, @@ -61,9 +59,7 @@ public SimpleFileEntry( BinaryRow maxKey, @Nullable String externalPath, long rowCount, - @Nullable Long firstRowId, - long schemaId, - @Nullable List writeCols) { + @Nullable Long firstRowId) { this.kind = kind; this.partition = partition; this.bucket = bucket; @@ -77,8 +73,6 @@ public SimpleFileEntry( this.externalPath = externalPath; this.rowCount = rowCount; this.firstRowId = firstRowId; - this.schemaId = schemaId; - this.writeCols = writeCols; } public static SimpleFileEntry from(ManifestEntry entry) { @@ -95,9 +89,7 @@ public static SimpleFileEntry from(ManifestEntry entry) { entry.maxKey(), entry.externalPath(), entry.file().rowCount(), - entry.firstRowId(), - entry.file().schemaId(), - entry.file().writeCols()); + entry.firstRowId()); } public SimpleFileEntry toDelete() { @@ -114,9 +106,7 @@ public SimpleFileEntry toDelete() { maxKey, externalPath, rowCount, - firstRowId, - schemaId, - writeCols); + firstRowId); } public static List from(List entries) { @@ -195,17 +185,6 @@ public long rowCount() { return firstRowId; } - @Override - public long schemaId() { - return schemaId; - } - - @Nullable - @Override - public List writeCols() { - return writeCols; - } - public long nonNullFirstRowId() { Long firstRowId = firstRowId(); checkArgument(firstRowId != null, "First row id of '%s' should not be null.", fileName()); @@ -237,9 +216,7 @@ public boolean equals(Object o) { && Objects.equals(maxKey, that.maxKey) && Objects.equals(externalPath, that.externalPath) && rowCount == that.rowCount - && Objects.equals(firstRowId, that.firstRowId) - && schemaId == that.schemaId - && Objects.equals(writeCols, that.writeCols); + && Objects.equals(firstRowId, that.firstRowId); } @Override @@ -256,9 +233,7 @@ public int hashCode() { maxKey, externalPath, rowCount, - firstRowId, - schemaId, - writeCols); + firstRowId); } @Override @@ -288,10 +263,6 @@ public String toString() { + rowCount + ", firstRowId=" + firstRowId - + ", schemaId=" - + schemaId - + ", writeCols=" - + writeCols + '}'; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java index e2a82dfbef0c..6b4c47915752 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java @@ -41,9 +41,7 @@ public SimpleFileEntryWithDV(SimpleFileEntry entry, @Nullable String dvFileName) entry.maxKey(), entry.externalPath(), entry.rowCount(), - entry.firstRowId(), - entry.schemaId(), - entry.writeCols()); + entry.firstRowId()); this.dvFileName = dvFileName; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index ffdfe8523d94..a9a54936ac74 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -49,6 +49,7 @@ import org.apache.paimon.operation.commit.ManifestEntryChanges; import org.apache.paimon.operation.commit.RetryCommitResult; import org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult; +import org.apache.paimon.operation.commit.RowIdColumnConflictChecker; import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned; import org.apache.paimon.operation.commit.StrictModeChecker; import org.apache.paimon.operation.commit.SuccessCommitResult; @@ -907,12 +908,22 @@ CommitResult tryCommitOnce( .filter(entry -> !baseIdentifiers.contains(entry.identifier())) .collect(Collectors.toList()); } + RowIdColumnConflictChecker rowIdColumnConflictChecker = null; + if (conflictDetection.hasRowIdCheckFromSnapshot()) { + rowIdColumnConflictChecker = + RowIdColumnConflictChecker.fromDataFiles( + schemaManager, + deltaFiles.stream() + .map(ManifestEntry::file) + .collect(Collectors.toList())); + } Optional exception = conflictDetection.checkConflicts( latestSnapshot, baseDataFiles, SimpleFileEntry.from(deltaFiles), indexFiles, + rowIdColumnConflictChecker, commitKind); if (exception.isPresent()) { if (allowRollback && rollback != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index b70f90600294..2d12857a210e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -77,7 +77,6 @@ public class ConflictDetection { private final String tableName; private final String commitUser; private final RowType partitionType; - private final SchemaManager schemaManager; private final FileStorePathFactory pathFactory; private final @Nullable Comparator keyComparator; private final BucketMode bucketMode; @@ -115,7 +114,6 @@ public ConflictDetection( this.tableName = tableName; this.commitUser = commitUser; this.partitionType = partitionType; - this.schemaManager = schemaManager; this.pathFactory = pathFactory; this.keyComparator = keyComparator; this.bucketMode = bucketMode; @@ -164,6 +162,7 @@ public Optional checkConflicts( List baseEntries, List deltaEntries, List deltaIndexEntries, + @Nullable RowIdColumnConflictChecker rowIdColumnConflictChecker, CommitKind commitKind) { String baseCommitUser = latestSnapshot.commitUser(); if (deletionVectorsEnabled && bucketMode.equals(BucketMode.BUCKET_UNAWARE)) { @@ -232,7 +231,8 @@ public Optional checkConflicts( return exception; } - return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, deltaIndexEntries); + return checkForRowIdFromSnapshot( + latestSnapshot, deltaEntries, deltaIndexEntries, rowIdColumnConflictChecker); } public Map collectUncheckedBucketPartitions( @@ -495,21 +495,20 @@ private Optional checkRowIdRangeConflicts( private Optional checkForRowIdFromSnapshot( Snapshot latestSnapshot, List deltaEntries, - List deltaIndexEntries) { + List deltaIndexEntries, + @Nullable RowIdColumnConflictChecker columnChecker) { if (!dataEvolutionEnabled) { return Optional.empty(); } if (rowIdCheckFromSnapshot == null) { return Optional.empty(); } - - List changedPartitions = changedPartitions(deltaEntries, deltaIndexEntries); - RowIdColumnConflictChecker columnChecker = - RowIdColumnConflictChecker.fromDeltaEntries(schemaManager, deltaEntries); - if (columnChecker.isEmpty()) { + if (columnChecker == null || columnChecker.isEmpty()) { return Optional.empty(); } + List changedPartitions = changedPartitions(deltaEntries, deltaIndexEntries); + // check history row id ranges Long checkNextRowId = snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId(); checkState( @@ -527,7 +526,7 @@ private Optional checkForRowIdFromSnapshot( DataFileMeta file = entry.file(); if (file.firstRowId() != null && file.nonNullRowIdRange().from < checkNextRowId - && columnChecker.conflictsWith(entry)) { + && columnChecker.conflictsWith(file)) { return Optional.of( new RuntimeException( "For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts," diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java index 5c4d2fff8f31..9c188f36a47d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -18,8 +18,7 @@ package org.apache.paimon.operation.commit; -import org.apache.paimon.manifest.FileEntry; -import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; @@ -47,42 +46,40 @@ * columns also overlaps, return conflicting result. * */ -class RowIdColumnConflictChecker { +public class RowIdColumnConflictChecker { private final SchemaManager schemaManager; private final List writeRanges; private final Map> fieldIdByNameCache = new HashMap<>(); - private RowIdColumnConflictChecker( - SchemaManager schemaManager, List deltaEntries) { + private RowIdColumnConflictChecker(SchemaManager schemaManager, List deltaFiles) { this.schemaManager = schemaManager; - this.writeRanges = buildWriteRanges(deltaEntries); + this.writeRanges = buildWriteRanges(deltaFiles); } - static RowIdColumnConflictChecker fromDeltaEntries( - SchemaManager schemaManager, List deltaEntries) { - return new RowIdColumnConflictChecker(schemaManager, deltaEntries); + public static RowIdColumnConflictChecker fromDataFiles( + SchemaManager schemaManager, List deltaFiles) { + return new RowIdColumnConflictChecker(schemaManager, deltaFiles); } - private List buildWriteRanges(List deltaEntries) { - List rowIdEntries = - deltaEntries.stream() - .filter(entry -> entry.firstRowId() != null) + private List buildWriteRanges(List deltaFiles) { + List rowIdFiles = + deltaFiles.stream() + .filter(file -> file.firstRowId() != null) .collect(Collectors.toList()); - if (rowIdEntries.isEmpty()) { + if (rowIdFiles.isEmpty()) { return Collections.emptyList(); } // 1. merge overlapping ranges and calculate [Range, Set] tuples. - RangeHelper rangeHelper = - new RangeHelper<>(SimpleFileEntry::nonNullRowIdRange); + RangeHelper rangeHelper = new RangeHelper<>(DataFileMeta::nonNullRowIdRange); List writeRanges = new ArrayList<>(); - for (List group : rangeHelper.mergeOverlappingRanges(rowIdEntries)) { + for (List group : rangeHelper.mergeOverlappingRanges(rowIdFiles)) { Range range = mergeRange(group); Set fieldIds = new HashSet<>(); - for (SimpleFileEntry entry : group) { - addWriteFieldIds(fieldIds, entry); + for (DataFileMeta file : group) { + addWriteFieldIds(fieldIds, file); } writeRanges.add(new WriteRange(range, fieldIds)); @@ -96,29 +93,29 @@ private List buildWriteRanges(List deltaEntries) { return writeRanges; } - private void addWriteFieldIds(Set fieldIds, FileEntry entry) { - List writeCols = entry.writeCols(); + private void addWriteFieldIds(Set fieldIds, DataFileMeta file) { + List writeCols = file.writeCols(); if (writeCols == null) { fieldIds.addAll( fieldIdByNameCache - .computeIfAbsent(entry.schemaId(), this::fieldIdByName) + .computeIfAbsent(file.schemaId(), this::fieldIdByName) .values()); return; } for (String writeCol : writeCols) { - Integer fieldId = fieldId(entry, writeCol); + Integer fieldId = fieldId(file, writeCol); if (fieldId != null) { fieldIds.add(fieldId); } } } - private static Range mergeRange(List entries) { + private static Range mergeRange(List files) { long from = Long.MAX_VALUE; long to = Long.MIN_VALUE; - for (SimpleFileEntry entry : entries) { - Range range = entry.nonNullRowIdRange(); + for (DataFileMeta file : files) { + Range range = file.nonNullRowIdRange(); from = Math.min(from, range.from); to = Math.max(to, range.to); } @@ -134,16 +131,16 @@ boolean isEmpty() { * files. If an existing file has both overlapping row range and overlapping write fields, then * it conflicts. * - * @param entry committed incremental file entry + * @param file committed incremental data file * @return true if conflict */ - boolean conflictsWith(FileEntry entry) { - Long firstRowId = entry.firstRowId(); + boolean conflictsWith(DataFileMeta file) { + Long firstRowId = file.firstRowId(); if (firstRowId == null) { return false; } - Range range = new Range(firstRowId, firstRowId + entry.rowCount() - 1); + Range range = new Range(firstRowId, firstRowId + file.rowCount() - 1); int index = firstPossibleRange(range); while (index < writeRanges.size()) { WriteRange writeRange = writeRanges.get(index); @@ -152,7 +149,7 @@ boolean conflictsWith(FileEntry entry) { } // overlapping row range and overlapping write fields if (writeRange.range.hasIntersection(range) - && containsAnyWriteField(writeRange.fieldIds, entry)) { + && containsAnyWriteField(writeRange.fieldIds, file)) { return true; } index++; @@ -180,15 +177,15 @@ private int firstPossibleRange(Range range) { return low; } - private boolean containsAnyWriteField(Set fieldIds, FileEntry entry) { - List writeCols = entry.writeCols(); + private boolean containsAnyWriteField(Set fieldIds, DataFileMeta file) { + List writeCols = file.writeCols(); // If write cols == null, it's a full-schema write if (writeCols == null) { return true; } for (String writeCol : writeCols) { - Integer fieldId = fieldId(entry, writeCol); + Integer fieldId = fieldId(file, writeCol); if (fieldId != null && fieldIds.contains(fieldId)) { return true; } @@ -196,10 +193,10 @@ private boolean containsAnyWriteField(Set fieldIds, FileEntry entry) { return false; } - private Integer fieldId(FileEntry entry, String writeCol) { + private Integer fieldId(DataFileMeta file, String writeCol) { Integer fieldId = fieldIdByNameCache - .computeIfAbsent(entry.schemaId(), this::fieldIdByName) + .computeIfAbsent(file.schemaId(), this::fieldIdByName) .get(writeCol); if (fieldId == null) { if (SpecialFields.isSystemField(writeCol)) { @@ -208,7 +205,7 @@ private Integer fieldId(FileEntry entry, String writeCol) { throw new RuntimeException( String.format( "Cannot find write column '%s' in schema %s.", - writeCol, entry.schemaId())); + writeCol, file.schemaId())); } return fieldId; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index ec91e8a26997..dd00af14b9ad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -307,8 +307,6 @@ private SimpleFileEntry createFileEntry(String fileName, FileKind kind) { EMPTY_ROW, null, 0L, - null, - 0L, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java index db5603d4f928..8c45bfbb33ad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java @@ -19,10 +19,10 @@ package org.apache.paimon.operation.commit; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.FileKind; -import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.SchemaEvolutionTableTestBase.TestingSchemaManager; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -37,7 +37,6 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -115,32 +114,31 @@ void testFailsOnUnknownNonSystemWriteColumn() { .hasMessageContaining("Cannot find write column 'missing'"); } - private RowIdColumnConflictChecker checker(SimpleFileEntry... entries) { - return RowIdColumnConflictChecker.fromDeltaEntries( - createSchemaManager(), Arrays.asList(entries)); + private RowIdColumnConflictChecker checker(DataFileMeta... files) { + return RowIdColumnConflictChecker.fromDataFiles( + createSchemaManager(), Arrays.asList(files)); } - private SimpleFileEntry file( + private DataFileMeta file( String fileName, @Nullable Long firstRowId, long rowCount, long schemaId, @Nullable List writeCols) { - return new SimpleFileEntry( - FileKind.ADD, - EMPTY_ROW, - 0, - 1, - 0, + return DataFileMeta.forAppend( fileName, + 0L, + rowCount, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + schemaId, Collections.emptyList(), null, - EMPTY_ROW, - EMPTY_ROW, null, - rowCount, + null, + null, firstRowId, - schemaId, writeCols); } From 0537cf79632cdc3da91a8f4451405db128495165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 21 May 2026 22:34:26 +0800 Subject: [PATCH 10/11] minor fix --- .../src/main/java/org/apache/paimon/AbstractFileStore.java | 1 - .../org/apache/paimon/operation/commit/ConflictDetection.java | 2 -- .../java/org/apache/paimon/operation/FileStoreCommitTest.java | 1 - .../apache/paimon/operation/commit/ConflictDetectionTest.java | 1 - 4 files changed, 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index cd962f9abb29..9aad0259cdf8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -284,7 +284,6 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { tableName, commitUser, partitionType, - schemaManager, pathFactory(), newKeyComparator(), bucketMode(), diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 2d12857a210e..29ee0458183f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -33,7 +33,6 @@ import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.manifest.SimpleFileEntryWithDV; import org.apache.paimon.operation.PartitionExpire; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; @@ -101,7 +100,6 @@ public ConflictDetection( String tableName, String commitUser, RowType partitionType, - SchemaManager schemaManager, FileStorePathFactory pathFactory, @Nullable Comparator keyComparator, BucketMode bucketMode, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 49ad673f50ec..71eb081de89f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1106,7 +1106,6 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( tableName, commitUser, store.partitionType(), - new SchemaManager(store.fileIO(), store.options().path()), store.pathFactory(), store.newKeyComparator(), store.bucketMode(), diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index dd00af14b9ad..c3e0258da28f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -381,7 +381,6 @@ private ConflictDetection createConflictDetection() { RowType.of(), null, null, - null, BucketMode.HASH_FIXED, false, true, From b5bf6037f6b9f1250c850bd15ce5ba230d096909 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 22 May 2026 00:09:18 +0800 Subject: [PATCH 11/11] minor fix --- .../paimon/operation/commit/RowIdColumnConflictChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java index 9c188f36a47d..b2f8740f5269 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -43,7 +43,7 @@ *
  • Merge delta files by row range and calculate updated columns. *
  • Sort those items by range. *
  • For each checking files, do binary search to find overlapping ranges. If their updated - * columns also overlaps, return conflicting result. + * columns also overlap, return conflicting result. * */ public class RowIdColumnConflictChecker {