diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index ffdfe8523d94..a9a54936ac74 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -49,6 +49,7 @@ import org.apache.paimon.operation.commit.ManifestEntryChanges; import org.apache.paimon.operation.commit.RetryCommitResult; import org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult; +import org.apache.paimon.operation.commit.RowIdColumnConflictChecker; import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned; import org.apache.paimon.operation.commit.StrictModeChecker; import org.apache.paimon.operation.commit.SuccessCommitResult; @@ -907,12 +908,22 @@ CommitResult tryCommitOnce( .filter(entry -> !baseIdentifiers.contains(entry.identifier())) .collect(Collectors.toList()); } + RowIdColumnConflictChecker rowIdColumnConflictChecker = null; + if (conflictDetection.hasRowIdCheckFromSnapshot()) { + rowIdColumnConflictChecker = + RowIdColumnConflictChecker.fromDataFiles( + schemaManager, + deltaFiles.stream() + .map(ManifestEntry::file) + .collect(Collectors.toList())); + } Optional exception = conflictDetection.checkConflicts( latestSnapshot, baseDataFiles, SimpleFileEntry.from(deltaFiles), indexFiles, + rowIdColumnConflictChecker, commitKind); if (exception.isPresent()) { if (allowRollback && rollback != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 46d5c436192d..29ee0458183f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -37,7 +37,6 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RangeHelper; import org.apache.paimon.utils.SnapshotManager; @@ -64,6 +63,7 @@ import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions; +import static org.apache.paimon.types.VectorType.isVectorStoreFile; import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString; import static org.apache.paimon.utils.Preconditions.checkState; @@ -160,6 +160,7 @@ public Optional checkConflicts( List baseEntries, List deltaEntries, List deltaIndexEntries, + @Nullable RowIdColumnConflictChecker rowIdColumnConflictChecker, CommitKind commitKind) { String baseCommitUser = latestSnapshot.commitUser(); if (deletionVectorsEnabled && bucketMode.equals(BucketMode.BUCKET_UNAWARE)) { @@ -228,7 +229,8 @@ public Optional checkConflicts( return exception; } - return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, deltaIndexEntries); + return checkForRowIdFromSnapshot( + latestSnapshot, deltaEntries, deltaIndexEntries, rowIdColumnConflictChecker); } public Map collectUncheckedBucketPartitions( @@ -473,7 +475,7 @@ private Optional checkRowIdRangeConflicts( for (List group : merged) { List dataFiles = new ArrayList<>(); for (SimpleFileEntry f : group) { - if (!isBlobFile(f.fileName())) { + if (!dedicatedStorageFile(f.fileName())) { dataFiles.add(f); } } @@ -491,24 +493,19 @@ private Optional checkRowIdRangeConflicts( private Optional checkForRowIdFromSnapshot( Snapshot latestSnapshot, List deltaEntries, - List deltaIndexEntries) { + List deltaIndexEntries, + @Nullable RowIdColumnConflictChecker columnChecker) { if (!dataEvolutionEnabled) { return Optional.empty(); } if (rowIdCheckFromSnapshot == null) { return Optional.empty(); } + if (columnChecker == null || columnChecker.isEmpty()) { + return Optional.empty(); + } List changedPartitions = changedPartitions(deltaEntries, deltaIndexEntries); - // collect history row id ranges - List historyIdRanges = new ArrayList<>(); - for (SimpleFileEntry entry : deltaEntries) { - Long firstRowId = entry.firstRowId(); - long rowCount = entry.rowCount(); - if (firstRowId != null) { - historyIdRanges.add(new Range(firstRowId, firstRowId + rowCount - 1)); - } - } // check history row id ranges Long checkNextRowId = snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId(); @@ -525,16 +522,13 @@ private Optional checkForRowIdFromSnapshot( commitScanner.readIncrementalEntries(snapshot, changedPartitions); for (ManifestEntry entry : changes) { DataFileMeta file = entry.file(); - Range fileRange = file.nonNullRowIdRange(); - if (fileRange.from < checkNextRowId) { - for (Range range : historyIdRanges) { - if (range.hasIntersection(fileRange)) { - return Optional.of( - new RuntimeException( - "For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts," - + " updating the same file, which can render some updates ineffective.")); - } - } + if (file.firstRowId() != null + && file.nonNullRowIdRange().from < checkNextRowId + && columnChecker.conflictsWith(file)) { + return Optional.of( + new RuntimeException( + "For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts," + + " updating the same file, which can render some updates ineffective.")); } } } @@ -542,6 +536,10 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } + private static boolean dedicatedStorageFile(String fileName) { + return isBlobFile(fileName) || isVectorStoreFile(fileName); + } + static List buildBaseEntriesWithDV( List baseEntries, List baseIndexEntries) { if (baseEntries.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java new file mode 100644 index 000000000000..b2f8740f5269 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.DataField; +import org.apache.paimon.utils.Range; +import org.apache.paimon.utils.RangeHelper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Detects row-id range conflicts only when written field ids overlap. The detection process is as + * below: + * + *
    + *
  1. Merge delta files by row range and calculate updated columns. + *
  2. Sort those items by range. + *
  3. For each checking files, do binary search to find overlapping ranges. If their updated + * columns also overlap, return conflicting result. + *
+ */ +public class RowIdColumnConflictChecker { + + private final SchemaManager schemaManager; + private final List writeRanges; + private final Map> fieldIdByNameCache = new HashMap<>(); + + private RowIdColumnConflictChecker(SchemaManager schemaManager, List deltaFiles) { + this.schemaManager = schemaManager; + this.writeRanges = buildWriteRanges(deltaFiles); + } + + public static RowIdColumnConflictChecker fromDataFiles( + SchemaManager schemaManager, List deltaFiles) { + return new RowIdColumnConflictChecker(schemaManager, deltaFiles); + } + + private List buildWriteRanges(List deltaFiles) { + List rowIdFiles = + deltaFiles.stream() + .filter(file -> file.firstRowId() != null) + .collect(Collectors.toList()); + + if (rowIdFiles.isEmpty()) { + return Collections.emptyList(); + } + + // 1. merge overlapping ranges and calculate [Range, Set] tuples. + RangeHelper rangeHelper = new RangeHelper<>(DataFileMeta::nonNullRowIdRange); + List writeRanges = new ArrayList<>(); + for (List group : rangeHelper.mergeOverlappingRanges(rowIdFiles)) { + Range range = mergeRange(group); + Set fieldIds = new HashSet<>(); + for (DataFileMeta file : group) { + addWriteFieldIds(fieldIds, file); + } + + writeRanges.add(new WriteRange(range, fieldIds)); + } + + // 2. sort by range for binary search + writeRanges.sort( + Comparator.comparingLong((WriteRange writeRange) -> writeRange.range.from) + .thenComparingLong(writeRange -> writeRange.range.to)); + + return writeRanges; + } + + private void addWriteFieldIds(Set fieldIds, DataFileMeta file) { + List writeCols = file.writeCols(); + if (writeCols == null) { + fieldIds.addAll( + fieldIdByNameCache + .computeIfAbsent(file.schemaId(), this::fieldIdByName) + .values()); + return; + } + + for (String writeCol : writeCols) { + Integer fieldId = fieldId(file, writeCol); + if (fieldId != null) { + fieldIds.add(fieldId); + } + } + } + + private static Range mergeRange(List files) { + long from = Long.MAX_VALUE; + long to = Long.MIN_VALUE; + for (DataFileMeta file : files) { + Range range = file.nonNullRowIdRange(); + from = Math.min(from, range.from); + to = Math.max(to, range.to); + } + return new Range(from, to); + } + + boolean isEmpty() { + return writeRanges.isEmpty(); + } + + /** + * Check whether a committed incremental file entry conflicts with current committing delta + * files. If an existing file has both overlapping row range and overlapping write fields, then + * it conflicts. + * + * @param file committed incremental data file + * @return true if conflict + */ + boolean conflictsWith(DataFileMeta file) { + Long firstRowId = file.firstRowId(); + if (firstRowId == null) { + return false; + } + + Range range = new Range(firstRowId, firstRowId + file.rowCount() - 1); + int index = firstPossibleRange(range); + while (index < writeRanges.size()) { + WriteRange writeRange = writeRanges.get(index); + if (writeRange.range.from > range.to) { + return false; + } + // overlapping row range and overlapping write fields + if (writeRange.range.hasIntersection(range) + && containsAnyWriteField(writeRange.fieldIds, file)) { + return true; + } + index++; + } + return false; + } + + /** + * Binary search to find the first range whose `to` >= target range's `from`. + * + * @param range querying range + * @return index of the first range + */ + private int firstPossibleRange(Range range) { + int low = 0; + int high = writeRanges.size(); + while (low < high) { + int mid = (low + high) >>> 1; + if (writeRanges.get(mid).range.to < range.from) { + low = mid + 1; + } else { + high = mid; + } + } + return low; + } + + private boolean containsAnyWriteField(Set fieldIds, DataFileMeta file) { + List writeCols = file.writeCols(); + // If write cols == null, it's a full-schema write + if (writeCols == null) { + return true; + } + + for (String writeCol : writeCols) { + Integer fieldId = fieldId(file, writeCol); + if (fieldId != null && fieldIds.contains(fieldId)) { + return true; + } + } + return false; + } + + private Integer fieldId(DataFileMeta file, String writeCol) { + Integer fieldId = + fieldIdByNameCache + .computeIfAbsent(file.schemaId(), this::fieldIdByName) + .get(writeCol); + if (fieldId == null) { + if (SpecialFields.isSystemField(writeCol)) { + return null; + } + throw new RuntimeException( + String.format( + "Cannot find write column '%s' in schema %s.", + writeCol, file.schemaId())); + } + return fieldId; + } + + private Map fieldIdByName(long schemaId) { + Map fieldIdByName = new HashMap<>(); + for (DataField field : schemaManager.schema(schemaId).logicalRowType().getFields()) { + fieldIdByName.put(field.name(), field.id()); + } + return fieldIdByName; + } + + /** Range and field id Set. */ + private static class WriteRange { + + private final Range range; + private final Set fieldIds; + + private WriteRange(Range range, Set fieldIds) { + this.range = range; + this.fieldIds = fieldIds; + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java new file mode 100644 index 000000000000..8c45bfbb33ad --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.SchemaEvolutionTableTestBase.TestingSchemaManager; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class RowIdColumnConflictCheckerTest { + + @Test + void testAllowsDisjointWriteColumns() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 0L, Arrays.asList("b"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, Arrays.asList("c")))) + .isFalse(); + } + + @Test + void testDetectsSameWriteColumns() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 0L, Arrays.asList("b"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, Arrays.asList("b")))) + .isTrue(); + } + + @Test + void testUsesFieldIdAcrossRename() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 1L, Arrays.asList("b_renamed"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, Arrays.asList("b")))) + .isTrue(); + } + + @Test + void testTreatsNullWriteColumnsAsFullSchemaWrite() { + RowIdColumnConflictChecker checker = checker(file("current", 0L, 10L, 0L, null)); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 0L, Arrays.asList("b")))) + .isTrue(); + } + + @Test + void testMergesOverlappedDeltaRangesAndWriteColumns() { + RowIdColumnConflictChecker checker = + checker( + file("current-b", 0L, 11L, 0L, Arrays.asList("b")), + file("current-c", 5L, 11L, 0L, Arrays.asList("c"))); + + assertThat(checker.conflictsWith(file("historical-b", 12L, 1L, 0L, Arrays.asList("b")))) + .isTrue(); + assertThat(checker.conflictsWith(file("historical-c", 12L, 1L, 0L, Arrays.asList("c")))) + .isTrue(); + } + + @Test + void testScansAllOverlappedRangesAfterBinarySearch() { + RowIdColumnConflictChecker checker = + checker( + file("current-b", 0L, 5L, 0L, Arrays.asList("b")), + file("current-c", 10L, 5L, 0L, Arrays.asList("c"))); + + assertThat(checker.conflictsWith(file("historical", 3L, 10L, 0L, Arrays.asList("c")))) + .isTrue(); + } + + @Test + void testFailsOnUnknownNonSystemWriteColumn() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 0L, Arrays.asList("b"))); + + assertThatThrownBy( + () -> + checker.conflictsWith( + file("historical", 0L, 10L, 0L, Arrays.asList("missing")))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Cannot find write column 'missing'"); + } + + private RowIdColumnConflictChecker checker(DataFileMeta... files) { + return RowIdColumnConflictChecker.fromDataFiles( + createSchemaManager(), Arrays.asList(files)); + } + + private DataFileMeta file( + String fileName, + @Nullable Long firstRowId, + long rowCount, + long schemaId, + @Nullable List writeCols) { + return DataFileMeta.forAppend( + fileName, + 0L, + rowCount, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + schemaId, + Collections.emptyList(), + null, + null, + null, + null, + firstRowId, + writeCols); + } + + private SchemaManager createSchemaManager() { + Map schemas = new HashMap<>(); + schemas.put( + 0L, + org.apache.paimon.schema.TableSchema.create( + 0L, + new Schema( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "b", DataTypes.INT()), + new DataField(2, "c", DataTypes.INT())), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap(), + ""))); + schemas.put( + 1L, + org.apache.paimon.schema.TableSchema.create( + 1L, + new Schema( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "b_renamed", DataTypes.INT()), + new DataField(2, "c", DataTypes.INT())), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap(), + ""))); + return new TestingSchemaManager( + new Path("/tmp/row-id-column-conflict-checker-test"), schemas); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index 019bf6484be7..40c89113803c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -126,6 +126,9 @@ public class DataEvolutionMergeIntoAction extends TableActionBase { private int sinkParallelism; + // the snapshot id this action based on + private long baseSnapshotId; + public DataEvolutionMergeIntoAction( String databaseName, String tableName, Map catalogConfig) { super(databaseName, tableName, catalogConfig); @@ -142,6 +145,7 @@ public DataEvolutionMergeIntoAction( throw new UnsupportedOperationException( "merge-into action doesn't support updating an empty table."); } + this.baseSnapshotId = latestSnapshotId; table = table.copy( Collections.singletonMap( @@ -324,7 +328,7 @@ public DataStream> shuffleByFirstRowId( Transformation sourceTransformation = source.getTransformation(); List firstRowIds = ((FileStoreTable) table) - .store().newScan() + .store().newScan().withSnapshot(baseSnapshotId) .withManifestEntryFilter( entry -> entry.file().firstRowId() != null @@ -384,13 +388,16 @@ public DataStream writePartialColumns( return sorted.transform( "PARTIAL WRITE COLUMNS", new CommittableTypeInfo(), - new DataEvolutionPartialWriteOperator((FileStoreTable) table, rowType)) + new DataEvolutionPartialWriteOperator( + (FileStoreTable) table, rowType, baseSnapshotId)) .setParallelism(sinkParallelism); } public DataStream commit( DataStream written, Set updatedColumns) { FileStoreTable storeTable = (FileStoreTable) table; + // copy to avoid serialization issue + long baseSnapshotId = this.baseSnapshotId; // Check if some global-indexed columns are updated DataStream checked = @@ -409,7 +416,9 @@ public DataStream commit( context -> new StoreCommitter( storeTable, - storeTable.newCommit(context.commitUser()), + storeTable + .newCommit(context.commitUser()) + .rowIdCheckConflict(baseSnapshotId), context), new NoopCommittableStateManager()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java index 278d58306cad..f57393b7ca27 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java @@ -70,6 +70,7 @@ public class DataEvolutionPartialWriteOperator LoggerFactory.getLogger(DataEvolutionPartialWriteOperator.class); private final FileStoreTable table; + private final Long baseSnapshotId; // dataType private final RowType dataType; @@ -91,9 +92,11 @@ public class DataEvolutionPartialWriteOperator private transient AbstractFileStoreWrite tableWrite; private transient Writer writer; - public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType dataType) { + public DataEvolutionPartialWriteOperator( + FileStoreTable table, RowType dataType, Long baseSnapshotId) { this.table = table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G")); + this.baseSnapshotId = baseSnapshotId; List fieldNames = dataType.getFieldNames().stream() .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) @@ -121,6 +124,7 @@ public void open() throws Exception { entry.file().firstRowId() != null && !isBlobFile(entry.file().fileName()) && !isVectorStoreFile(entry.file().fileName())) + .withSnapshot(baseSnapshotId) .plan() .files(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java index 96edb8d75abb..ed8f46eddfe5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java @@ -742,10 +742,6 @@ public void testUpdateNonBlobColumnOnRawBlobTableWithSplitFiles() throws Excepti "(1, 'name1', X'48656C6C6F')", "(2, 'name2', X'5945')", "(3, 'name3', X'414243')"); - testBatchRead( - "SELECT COUNT(*) FROM `RAW_BLOB_SPLIT_T$files` " - + "WHERE file_path NOT LIKE '%.blob'", - Collections.singletonList(changelogRow("+I", 1L))); testBatchRead( "SELECT COUNT(*) > 1 FROM `RAW_BLOB_SPLIT_T$files` " + "WHERE file_path LIKE '%.blob'", diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index e012656cc8cc..ff54322b92a8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -140,6 +140,48 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } + test("Data Evolution: concurrent merge with disjoint update columns") { + withTable("sb", "sc", "t") { + sql(s""" + CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ( + 'row-tracking.enabled' = 'true', + 'data-evolution.enabled' = 'true') + """) + sql("INSERT INTO t VALUES (1, 0, 0)") + Seq((1, 1)).toDF("id", "b").createOrReplaceTempView("sb") + Seq((1, 1)).toDF("id", "c").createOrReplaceTempView("sc") + + val mergeB = Future { + for (_ <- 1 to 10) { + sql(s""" + |MERGE INTO t + |USING sb + |ON t.id = sb.id + |WHEN MATCHED THEN + |UPDATE SET t.b = sb.b + t.b + |""".stripMargin).collect() + } + } + + val mergeC = Future { + for (_ <- 1 to 10) { + sql(s""" + |MERGE INTO t + |USING sc + |ON t.id = sc.id + |WHEN MATCHED THEN + |UPDATE SET t.c = sc.c + t.c + |""".stripMargin).collect() + } + } + + Await.result(mergeB, 60.seconds) + Await.result(mergeC, 60.seconds) + + checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 10, 10))) + } + } + test("Data Evolution: concurrent merge and small files compact") { withTable("s", "t") { sql(s"""