Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.paimon.operation.commit.ManifestEntryChanges;
import org.apache.paimon.operation.commit.RetryCommitResult;
import org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult;
import org.apache.paimon.operation.commit.RowIdColumnConflictChecker;
import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.operation.commit.SuccessCommitResult;
Expand Down Expand Up @@ -907,12 +908,22 @@ CommitResult tryCommitOnce(
.filter(entry -> !baseIdentifiers.contains(entry.identifier()))
.collect(Collectors.toList());
}
RowIdColumnConflictChecker rowIdColumnConflictChecker = null;
if (conflictDetection.hasRowIdCheckFromSnapshot()) {
rowIdColumnConflictChecker =
RowIdColumnConflictChecker.fromDataFiles(
schemaManager,
deltaFiles.stream()
.map(ManifestEntry::file)
.collect(Collectors.toList()));
}
Optional<RuntimeException> exception =
conflictDetection.checkConflicts(
latestSnapshot,
baseDataFiles,
SimpleFileEntry.from(deltaFiles),
indexFiles,
rowIdColumnConflictChecker,
commitKind);
if (exception.isPresent()) {
if (allowRollback && rollback != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
import org.apache.paimon.utils.SnapshotManager;

Expand All @@ -64,6 +63,7 @@
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
import static org.apache.paimon.utils.Preconditions.checkState;

Expand Down Expand Up @@ -160,6 +160,7 @@ public Optional<RuntimeException> checkConflicts(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
List<IndexManifestEntry> deltaIndexEntries,
@Nullable RowIdColumnConflictChecker rowIdColumnConflictChecker,
CommitKind commitKind) {
String baseCommitUser = latestSnapshot.commitUser();
if (deletionVectorsEnabled && bucketMode.equals(BucketMode.BUCKET_UNAWARE)) {
Expand Down Expand Up @@ -228,7 +229,8 @@ public Optional<RuntimeException> checkConflicts(
return exception;
}

return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, deltaIndexEntries);
return checkForRowIdFromSnapshot(
latestSnapshot, deltaEntries, deltaIndexEntries, rowIdColumnConflictChecker);
}

public <T extends FileEntry> Map<BinaryRow, Integer> collectUncheckedBucketPartitions(
Expand Down Expand Up @@ -473,7 +475,7 @@ private Optional<RuntimeException> checkRowIdRangeConflicts(
for (List<SimpleFileEntry> group : merged) {
List<SimpleFileEntry> dataFiles = new ArrayList<>();
for (SimpleFileEntry f : group) {
if (!isBlobFile(f.fileName())) {
if (!dedicatedStorageFile(f.fileName())) {
dataFiles.add(f);
}
}
Expand All @@ -491,24 +493,19 @@ private Optional<RuntimeException> checkRowIdRangeConflicts(
private Optional<RuntimeException> checkForRowIdFromSnapshot(
Snapshot latestSnapshot,
List<SimpleFileEntry> deltaEntries,
List<IndexManifestEntry> deltaIndexEntries) {
List<IndexManifestEntry> deltaIndexEntries,
@Nullable RowIdColumnConflictChecker columnChecker) {
if (!dataEvolutionEnabled) {
return Optional.empty();
}
if (rowIdCheckFromSnapshot == null) {
return Optional.empty();
}
if (columnChecker == null || columnChecker.isEmpty()) {
return Optional.empty();
}

List<BinaryRow> changedPartitions = changedPartitions(deltaEntries, deltaIndexEntries);
// collect history row id ranges
List<Range> historyIdRanges = new ArrayList<>();
for (SimpleFileEntry entry : deltaEntries) {
Long firstRowId = entry.firstRowId();
long rowCount = entry.rowCount();
if (firstRowId != null) {
historyIdRanges.add(new Range(firstRowId, firstRowId + rowCount - 1));
}
}

// check history row id ranges
Long checkNextRowId = snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId();
Expand All @@ -525,23 +522,24 @@ private Optional<RuntimeException> checkForRowIdFromSnapshot(
commitScanner.readIncrementalEntries(snapshot, changedPartitions);
for (ManifestEntry entry : changes) {
DataFileMeta file = entry.file();
Range fileRange = file.nonNullRowIdRange();
if (fileRange.from < checkNextRowId) {
for (Range range : historyIdRanges) {
if (range.hasIntersection(fileRange)) {
return Optional.of(
new RuntimeException(
"For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts,"
+ " updating the same file, which can render some updates ineffective."));
}
}
if (file.firstRowId() != null
&& file.nonNullRowIdRange().from < checkNextRowId
&& columnChecker.conflictsWith(file)) {
return Optional.of(
new RuntimeException(
"For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts,"
+ " updating the same file, which can render some updates ineffective."));
}
}
}

return Optional.empty();
}

private static boolean dedicatedStorageFile(String fileName) {
return isBlobFile(fileName) || isVectorStoreFile(fileName);
}

static List<SimpleFileEntry> buildBaseEntriesWithDV(
List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> baseIndexEntries) {
if (baseEntries.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation.commit;

import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Detects row-id range conflicts only when written field ids overlap. The detection process is as
* below:
*
* <ol>
* <li>Merge delta files by row range and calculate updated columns.
* <li>Sort those items by range.
* <li>For each checking files, do binary search to find overlapping ranges. If their updated
* columns also overlap, return conflicting result.
* </ol>
*/
public class RowIdColumnConflictChecker {

private final SchemaManager schemaManager;
private final List<WriteRange> writeRanges;
private final Map<Long, Map<String, Integer>> fieldIdByNameCache = new HashMap<>();

private RowIdColumnConflictChecker(SchemaManager schemaManager, List<DataFileMeta> deltaFiles) {
this.schemaManager = schemaManager;
this.writeRanges = buildWriteRanges(deltaFiles);
}

public static RowIdColumnConflictChecker fromDataFiles(
SchemaManager schemaManager, List<DataFileMeta> deltaFiles) {
return new RowIdColumnConflictChecker(schemaManager, deltaFiles);
}

private List<WriteRange> buildWriteRanges(List<DataFileMeta> deltaFiles) {
List<DataFileMeta> rowIdFiles =
deltaFiles.stream()
.filter(file -> file.firstRowId() != null)
.collect(Collectors.toList());

if (rowIdFiles.isEmpty()) {
return Collections.emptyList();
}

// 1. merge overlapping ranges and calculate [Range, Set<FieldId>] tuples.
RangeHelper<DataFileMeta> rangeHelper = new RangeHelper<>(DataFileMeta::nonNullRowIdRange);
List<WriteRange> writeRanges = new ArrayList<>();
for (List<DataFileMeta> group : rangeHelper.mergeOverlappingRanges(rowIdFiles)) {
Range range = mergeRange(group);
Set<Integer> fieldIds = new HashSet<>();
for (DataFileMeta file : group) {
addWriteFieldIds(fieldIds, file);
}

writeRanges.add(new WriteRange(range, fieldIds));
}

// 2. sort by range for binary search
writeRanges.sort(
Comparator.comparingLong((WriteRange writeRange) -> writeRange.range.from)
.thenComparingLong(writeRange -> writeRange.range.to));

return writeRanges;
}

private void addWriteFieldIds(Set<Integer> fieldIds, DataFileMeta file) {
List<String> writeCols = file.writeCols();
if (writeCols == null) {
fieldIds.addAll(
fieldIdByNameCache
.computeIfAbsent(file.schemaId(), this::fieldIdByName)
.values());
return;
}

for (String writeCol : writeCols) {
Integer fieldId = fieldId(file, writeCol);
if (fieldId != null) {
fieldIds.add(fieldId);
}
}
}

private static Range mergeRange(List<DataFileMeta> files) {
long from = Long.MAX_VALUE;
long to = Long.MIN_VALUE;
for (DataFileMeta file : files) {
Range range = file.nonNullRowIdRange();
from = Math.min(from, range.from);
to = Math.max(to, range.to);
}
return new Range(from, to);
}

boolean isEmpty() {
return writeRanges.isEmpty();
}

/**
* Check whether a committed incremental file entry conflicts with current committing delta
* files. If an existing file has both overlapping row range and overlapping write fields, then
* it conflicts.
*
* @param file committed incremental data file
* @return true if conflict
*/
boolean conflictsWith(DataFileMeta file) {
Long firstRowId = file.firstRowId();
if (firstRowId == null) {
return false;
}

Range range = new Range(firstRowId, firstRowId + file.rowCount() - 1);
int index = firstPossibleRange(range);
while (index < writeRanges.size()) {
WriteRange writeRange = writeRanges.get(index);
if (writeRange.range.from > range.to) {
return false;
}
// overlapping row range and overlapping write fields
if (writeRange.range.hasIntersection(range)
&& containsAnyWriteField(writeRange.fieldIds, file)) {
return true;
}
index++;
}
return false;
}

/**
* Binary search to find the first range whose `to` >= target range's `from`.
*
* @param range querying range
* @return index of the first range
*/
private int firstPossibleRange(Range range) {
int low = 0;
int high = writeRanges.size();
while (low < high) {
int mid = (low + high) >>> 1;
if (writeRanges.get(mid).range.to < range.from) {
low = mid + 1;
} else {
high = mid;
}
}
return low;
}

private boolean containsAnyWriteField(Set<Integer> fieldIds, DataFileMeta file) {
List<String> writeCols = file.writeCols();
// If write cols == null, it's a full-schema write
if (writeCols == null) {
return true;
}

for (String writeCol : writeCols) {
Integer fieldId = fieldId(file, writeCol);
if (fieldId != null && fieldIds.contains(fieldId)) {
return true;
}
}
return false;
}

private Integer fieldId(DataFileMeta file, String writeCol) {
Integer fieldId =
fieldIdByNameCache
.computeIfAbsent(file.schemaId(), this::fieldIdByName)
.get(writeCol);
if (fieldId == null) {
if (SpecialFields.isSystemField(writeCol)) {
return null;
}
throw new RuntimeException(
String.format(
"Cannot find write column '%s' in schema %s.",
writeCol, file.schemaId()));
}
return fieldId;
}

private Map<String, Integer> fieldIdByName(long schemaId) {
Map<String, Integer> fieldIdByName = new HashMap<>();
for (DataField field : schemaManager.schema(schemaId).logicalRowType().getFields()) {
fieldIdByName.put(field.name(), field.id());
}
return fieldIdByName;
}

/** Range and field id Set. */
private static class WriteRange {

private final Range range;
private final Set<Integer> fieldIds;

private WriteRange(Range range, Set<Integer> fieldIds) {
this.range = range;
this.fieldIds = fieldIds;
}
}
}
Loading