Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv;

import org.apache.fluss.metadata.Schema;

import java.util.BitSet;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

/** Helpers for interpreting KV write {@code targetColumns}. */
public final class TargetColumns {

private TargetColumns() {}

/**
* Returns {@code true} when {@code targetColumns} specifies every row field index of {@code
* schema} (each index in {@code [0, fieldCount)} appears at least once, with no index outside
* that range).
*
* <p>In that case the write is equivalent to a full-row upsert for merge-engine purposes even
* if the client passed an explicit column array instead of {@code null}.
*/
public static boolean specifiesAllSchemaFieldIndexes(Schema schema, int[] targetColumns) {
checkNotNull(schema, "schema");
checkNotNull(targetColumns, "targetColumns");
int fieldCount = schema.getRowType().getFieldCount();
if (fieldCount == 0) {
return targetColumns.length == 0;
}
BitSet covered = new BitSet(fieldCount);
for (int col : targetColumns) {
if (col < 0 || col >= fieldCount) {
return false;
}
covered.set(col);
}
return covered.nextClearBit(0) >= fieldCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.server.kv.TargetColumns;
import org.apache.fluss.server.kv.partialupdate.PartialUpdater;
import org.apache.fluss.server.kv.partialupdate.PartialUpdaterCache;

import javax.annotation.Nullable;

/**
* The default row merger of primary key table that always retains the latest row and supports
* configure target merge columns to do partial update.
* configuring target merge columns for partial update.
*
* <p>If {@link RowMerger#configureTargetColumns(int[], short, Schema)} receives target indexes that
* cover every field of the latest schema (same semantic as a full-row write), this merger keeps
* using plain merge semantics instead of wrapping a partial updater.
*/
public class DefaultRowMerger implements RowMerger {

Expand Down Expand Up @@ -66,7 +71,8 @@ public DeleteBehavior deleteBehavior() {
@Override
public RowMerger configureTargetColumns(
@Nullable int[] targetColumns, short latestShemaId, Schema latestSchema) {
if (targetColumns == null) {
if (targetColumns == null
|| TargetColumns.specifiesAllSchemaFieldIndexes(latestSchema, targetColumns)) {
return this;
} else {
// this also sanity checks the validity of the partial update
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv;

import org.apache.fluss.metadata.Schema;
import org.apache.fluss.types.DataTypes;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link TargetColumns}. */
class TargetColumnsTest {

private static final Schema TWO_COL_SCHEMA =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.primaryKey("id")
.build();

@Test
void testTargetColumns() {
assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, new int[] {0}))
.isFalse();

assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, new int[] {0, 1}))
.isTrue();
assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, new int[] {1, 0}))
.isTrue();
assertThat(
TargetColumns.specifiesAllSchemaFieldIndexes(
TWO_COL_SCHEMA, new int[] {0, 1, 0}))
.isTrue();

assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, new int[] {0, 2}))
.isFalse();

assertThatThrownBy(() -> TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, null))
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("targetColumns");

Schema threeCols =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.column("c", DataTypes.INT())
.primaryKey("a")
.build();
assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(threeCols, new int[] {0, 1}))
.isFalse();
assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(threeCols, new int[] {2, 0, 1, 2}))
.isTrue();

assertThatThrownBy(() -> TargetColumns.specifiesAllSchemaFieldIndexes(null, new int[] {0}))
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("schema");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ void testDefaultRowMerger(DeleteBehavior deleteBehavior) {
void testPartialUpdateRowMergerDeleteBehavior(DeleteBehavior deleteBehavior) {
DefaultRowMerger merger = new DefaultRowMerger(KvFormat.COMPACTED, deleteBehavior);

// Configure for partial update (only name column)
// Explicit full schema ({id, name}) matches plain merger behavior (same as null targets).
RowMerger partialMerger =
merger.configureTargetColumns(new int[] {0, 1}, (byte) 1, SCHEMA); // id + name
assertThat(partialMerger).isSameAs(merger);

BinaryValue oldValue = createBinaryValue(1, "old");

Expand All @@ -106,7 +107,7 @@ void testPartialUpdateRowMergerDeleteBehavior(DeleteBehavior deleteBehavior) {

assertThat(partialMerger.merge(null, oldValue)).isEqualTo(oldValue);

// schema change then partial update (except name column).
// schema change then partial update (only id + age; omit name).
partialMerger = merger.configureTargetColumns(new int[] {0, 2}, (byte) 2, SCHEMA_2);
BinaryValue newValue = createBinaryValue(1, null, "20");
BinaryValue mergeValue = createBinaryValue(1, "old", "20");
Expand Down