diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/TargetColumns.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/TargetColumns.java new file mode 100644 index 0000000000..1c9144fa33 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/TargetColumns.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.metadata.Schema; + +import java.util.BitSet; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** Helpers for interpreting KV write {@code targetColumns}. */ +public final class TargetColumns { + + private TargetColumns() {} + + /** + * Returns {@code true} when {@code targetColumns} specifies every row field index of {@code + * schema} (each index in {@code [0, fieldCount)} appears at least once, with no index outside + * that range). + * + *

In that case the write is equivalent to a full-row upsert for merge-engine purposes even + * if the client passed an explicit column array instead of {@code null}. + */ + public static boolean specifiesAllSchemaFieldIndexes(Schema schema, int[] targetColumns) { + checkNotNull(schema, "schema"); + checkNotNull(targetColumns, "targetColumns"); + int fieldCount = schema.getRowType().getFieldCount(); + if (fieldCount == 0) { + return targetColumns.length == 0; + } + BitSet covered = new BitSet(fieldCount); + for (int col : targetColumns) { + if (col < 0 || col >= fieldCount) { + return false; + } + covered.set(col); + } + return covered.nextClearBit(0) >= fieldCount; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java index 82f1fb8771..d7f7eacfdd 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java @@ -21,6 +21,7 @@ import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.record.BinaryValue; +import org.apache.fluss.server.kv.TargetColumns; import org.apache.fluss.server.kv.partialupdate.PartialUpdater; import org.apache.fluss.server.kv.partialupdate.PartialUpdaterCache; @@ -28,7 +29,11 @@ /** * The default row merger of primary key table that always retains the latest row and supports - * configure target merge columns to do partial update. + * configuring target merge columns for partial update. + * + *

If {@link RowMerger#configureTargetColumns(int[], short, Schema)} receives target indexes that + * cover every field of the latest schema (same semantic as a full-row write), this merger keeps + * using plain merge semantics instead of wrapping a partial updater. */ public class DefaultRowMerger implements RowMerger { @@ -66,7 +71,8 @@ public DeleteBehavior deleteBehavior() { @Override public RowMerger configureTargetColumns( @Nullable int[] targetColumns, short latestShemaId, Schema latestSchema) { - if (targetColumns == null) { + if (targetColumns == null + || TargetColumns.specifiesAllSchemaFieldIndexes(latestSchema, targetColumns)) { return this; } else { // this also sanity checks the validity of the partial update diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/TargetColumnsTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/TargetColumnsTest.java new file mode 100644 index 0000000000..0d630b9f9c --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/TargetColumnsTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TargetColumns}. */ +class TargetColumnsTest { + + private static final Schema TWO_COL_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + @Test + void testTargetColumns() { + assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, new int[] {0})) + .isFalse(); + + assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, new int[] {0, 1})) + .isTrue(); + assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, new int[] {1, 0})) + .isTrue(); + assertThat( + TargetColumns.specifiesAllSchemaFieldIndexes( + TWO_COL_SCHEMA, new int[] {0, 1, 0})) + .isTrue(); + + assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, new int[] {0, 2})) + .isFalse(); + + assertThatThrownBy(() -> TargetColumns.specifiesAllSchemaFieldIndexes(TWO_COL_SCHEMA, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("targetColumns"); + + Schema threeCols = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.INT()) + .primaryKey("a") + .build(); + assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(threeCols, new int[] {0, 1})) + .isFalse(); + assertThat(TargetColumns.specifiesAllSchemaFieldIndexes(threeCols, new int[] {2, 0, 1, 2})) + .isTrue(); + + assertThatThrownBy(() -> TargetColumns.specifiesAllSchemaFieldIndexes(null, new int[] {0})) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("schema"); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java index 3b5fc88c40..3f445d0a4c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java @@ -94,9 +94,10 @@ void testDefaultRowMerger(DeleteBehavior deleteBehavior) { void testPartialUpdateRowMergerDeleteBehavior(DeleteBehavior deleteBehavior) { DefaultRowMerger merger = new DefaultRowMerger(KvFormat.COMPACTED, deleteBehavior); - // Configure for partial update (only name column) + // Explicit full schema ({id, name}) matches plain merger behavior (same as null targets). RowMerger partialMerger = merger.configureTargetColumns(new int[] {0, 1}, (byte) 1, SCHEMA); // id + name + assertThat(partialMerger).isSameAs(merger); BinaryValue oldValue = createBinaryValue(1, "old"); @@ -106,7 +107,7 @@ void testPartialUpdateRowMergerDeleteBehavior(DeleteBehavior deleteBehavior) { assertThat(partialMerger.merge(null, oldValue)).isEqualTo(oldValue); - // schema change then partial update (except name column). + // schema change then partial update (only id + age; omit name). partialMerger = merger.configureTargetColumns(new int[] {0, 2}, (byte) 2, SCHEMA_2); BinaryValue newValue = createBinaryValue(1, null, "20"); BinaryValue mergeValue = createBinaryValue(1, "old", "20");