From c709a1c40d765bf6fb0d585f4ae831438fdc276a Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 14 May 2026 17:56:02 +0800 Subject: [PATCH 1/9] [lake/hudi] Introduce HudiBucketingFunction for bucket strategy. --- .../fluss/bucketing/BucketingFunction.java | 2 + .../bucketing/HudiBucketingFunction.java | 37 ++ .../apache/fluss/row/encode/KeyEncoder.java | 3 + .../row/encode/hudi/HudiBinaryRowWriter.java | 406 ++++++++++++++++++ .../fluss/row/encode/hudi/HudiKeyEncoder.java | 67 +++ fluss-lake/fluss-lake-hudi/pom.xml | 36 ++ .../bucketing/HudiBucketingFunctionTest.java | 263 ++++++++++++ 7 files changed, 814 insertions(+) create mode 100644 fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiBinaryRowWriter.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java create mode 100644 fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java diff --git a/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java b/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java index 915fbd172f..5291ccdc39 100644 --- a/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java +++ b/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java @@ -43,6 +43,8 @@ static BucketingFunction of(@Nullable DataLakeFormat lakeFormat) { return new FlussBucketingFunction(); } else if (lakeFormat == DataLakeFormat.ICEBERG) { return new IcebergBucketingFunction(); + } else if (lakeFormat == DataLakeFormat.HUDI) { + return new HudiBucketingFunction(); } else { throw new UnsupportedOperationException("Unsupported lake format: " + lakeFormat); } diff --git a/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java new file mode 100644 index 0000000000..2a6cce225d --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.bucketing; + +import java.nio.ByteBuffer; + +/** An implementation of {@link BucketingFunction} to follow Hudi's bucketing strategy. */ +public class HudiBucketingFunction implements BucketingFunction { + @Override + public int bucketing(byte[] bucketKey, int numBuckets) { + if (bucketKey == null || bucketKey.length == 0) { + throw new IllegalArgumentException("bucketKey must not be null or empty"); + } + if (numBuckets <= 0) { + throw new IllegalArgumentException("numBuckets must be positive"); + } + + int restored = ByteBuffer.wrap(bucketKey).getInt(); + + return (restored & Integer.MAX_VALUE) % numBuckets; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java index 4d59be33b0..1f22c3b3ce 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java @@ -20,6 +20,7 @@ import org.apache.fluss.config.TableConfig; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.hudi.HudiKeyEncoder; import org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder; import org.apache.fluss.row.encode.paimon.PaimonKeyEncoder; import org.apache.fluss.types.RowType; @@ -129,6 +130,8 @@ static KeyEncoder of( return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields); } else if (lakeFormat == DataLakeFormat.ICEBERG) { return new IcebergKeyEncoder(rowType, keyFields); + } else if (lakeFormat == DataLakeFormat.HUDI) { + return new HudiKeyEncoder(rowType, keyFields); } else { throw new UnsupportedOperationException("Unsupported datalake format: " + lakeFormat); } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiBinaryRowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiBinaryRowWriter.java new file mode 100644 index 0000000000..b3375aba95 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiBinaryRowWriter.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.encode.hudi; + +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.BinarySegmentUtils; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.types.DataType; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.apache.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link org.apache.fluss.row.InternalRow} using Hudi's binary encoding + * format. + */ +public class HudiBinaryRowWriter { + + private final int nullBitsSizeInBytes; + private final int fixedSize; + private byte[] buffer; + private MemorySegment segment; + private int cursor; + + public HudiBinaryRowWriter(int arity) { + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); + this.fixedSize = getFixedLengthPartSize(nullBitsSizeInBytes, arity); + this.cursor = fixedSize; + this.buffer = new byte[fixedSize]; + this.segment = MemorySegment.wrap(buffer); + } + + public void reset() { + this.cursor = this.fixedSize; + + for (int i = 0; i < this.nullBitsSizeInBytes; i += 8) { + this.segment.putLong(i, 0L); + } + } + + public byte[] toBytes() { + byte[] result = new byte[cursor]; + System.arraycopy(buffer, 0, result, 0, cursor); + return result; + } + + public void setNullAt(int pos) { + setNullBit(pos); + segment.putLong(getFieldOffset(pos), 0L); + } + + public void setNullBit(int pos) { + BinarySegmentUtils.bitSet(segment, 0, pos + 8); + } + + public void writeRowKind(ChangeType kind) { + // convert Fluss changeType to Hudi rowKind byte value + byte hudiRowKindByte; + switch (kind) { + case APPEND_ONLY: + case INSERT: + hudiRowKindByte = 0; + break; + case UPDATE_BEFORE: + hudiRowKindByte = 1; + break; + case UPDATE_AFTER: + hudiRowKindByte = 2; + break; + case DELETE: + hudiRowKindByte = 3; + break; + default: + throw new IllegalArgumentException("Unsupported change type: " + kind); + } + segment.put(0, hudiRowKindByte); + } + + public void writeBoolean(int pos, boolean value) { + segment.putBoolean(getFieldOffset(pos), value); + } + + public void writeByte(int pos, byte value) { + segment.put(getFieldOffset(pos), value); + } + + public void writeShort(int pos, short value) { + segment.putShort(getFieldOffset(pos), value); + } + + public void writeInt(int pos, int value) { + segment.putInt(getFieldOffset(pos), value); + } + + public void writeLong(int pos, long value) { + segment.putLong(getFieldOffset(pos), value); + } + + public void writeFloat(int pos, float value) { + segment.putFloat(getFieldOffset(pos), value); + } + + public void writeDouble(int pos, double value) { + segment.putDouble(getFieldOffset(pos), value); + } + + public void writeString(int pos, BinaryString input) { + if (input.getSegments() == null) { + String javaObject = input.toString(); + writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8)); + } else { + int len = input.getSizeInBytes(); + if (len <= 7) { + byte[] bytes = BinarySegmentUtils.allocateReuseBytes(len); + BinarySegmentUtils.copyToBytes( + input.getSegments(), input.getOffset(), bytes, 0, len); + writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); + } else { + writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), len); + } + } + } + + private void writeBytes(int pos, byte[] bytes) { + int len = bytes.length; + if (len <= 7) { + writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); + } else { + writeBytesToVarLenPart(pos, bytes, len); + } + } + + public void writeDecimal(int pos, Decimal value, int precision) { + assert value == null || value.precision() == precision; + + if (Decimal.isCompact(precision)) { + assert value != null; + + this.writeLong(pos, value.toUnscaledLong()); + } else { + this.ensureCapacity(16); + this.segment.putLong(this.cursor, 0L); + this.segment.putLong(this.cursor + 8, 0L); + if (value == null) { + this.setNullBit(pos); + this.setOffsetAndSize(pos, this.cursor, 0L); + } else { + byte[] bytes = value.toUnscaledBytes(); + + assert bytes.length <= 16; + + this.segment.put(this.cursor, bytes, 0, bytes.length); + this.setOffsetAndSize(pos, this.cursor, bytes.length); + } + + this.cursor += 16; + } + } + + public void writeTimestampNtz(int pos, TimestampNtz value, int precision) { + if (TimestampNtz.isCompact(precision)) { + writeLong(pos, value.getMillisecond()); + } else { + // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond + ensureCapacity(8); + if (value == null) { + setNullBit(pos); + // zero-out the bytes + segment.putLong(cursor, 0L); + setOffsetAndSize(pos, cursor, 0); + } else { + segment.putLong(cursor, value.getMillisecond()); + setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond()); + } + cursor += 8; + } + } + + public void writeTimestampLtz(int pos, TimestampLtz value, int precision) { + if (TimestampLtz.isCompact(precision)) { + writeLong(pos, value.getEpochMillisecond()); + } else { + // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond + ensureCapacity(8); + if (value == null) { + setNullBit(pos); + // zero-out the bytes + segment.putLong(cursor, 0L); + setOffsetAndSize(pos, cursor, 0); + } else { + segment.putLong(cursor, value.getEpochMillisecond()); + setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond()); + } + } + } + + private void writeSegmentsToVarLenPart( + int pos, MemorySegment[] segments, int offset, int size) { + int roundedSize = roundNumberOfBytesToNearestWord(size); + this.ensureCapacity(roundedSize); + this.zeroOutPaddingBytes(size); + if (segments.length == 1) { + segments[0].copyTo(offset, this.segment, this.cursor, size); + } else { + this.writeMultiSegmentsToVarLenPart(segments, offset, size); + } + + this.setOffsetAndSize(pos, this.cursor, size); + this.cursor += roundedSize; + } + + private void writeMultiSegmentsToVarLenPart(MemorySegment[] segments, int offset, int size) { + int needCopy = size; + int fromOffset = offset; + int toOffset = this.cursor; + + for (MemorySegment sourceSegment : segments) { + int remain = sourceSegment.size() - fromOffset; + if (remain > 0) { + int copySize = Math.min(remain, needCopy); + sourceSegment.copyTo(fromOffset, this.segment, toOffset, copySize); + needCopy -= copySize; + toOffset += copySize; + fromOffset = 0; + } else { + fromOffset -= sourceSegment.size(); + } + } + } + + private void writeBytesToVarLenPart(int pos, byte[] bytes, int len) { + int roundedSize = roundNumberOfBytesToNearestWord(len); + this.ensureCapacity(roundedSize); + this.zeroOutPaddingBytes(len); + this.segment.put(this.cursor, bytes, 0, len); + this.setOffsetAndSize(pos, this.cursor, len); + this.cursor += roundedSize; + } + + protected static int roundNumberOfBytesToNearestWord(int numBytes) { + int remainder = numBytes & 7; + return remainder == 0 ? numBytes : numBytes + (8 - remainder); + } + + protected void zeroOutPaddingBytes(int numBytes) { + if ((numBytes & 7) > 0) { + this.segment.putLong(this.cursor + (numBytes >> 3 << 3), 0L); + } + } + + protected void ensureCapacity(int neededSize) { + int length = this.cursor + neededSize; + if (this.segment.size() < length) { + this.grow(length); + } + } + + private void grow(int minCapacity) { + int oldCapacity = this.segment.size(); + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity - minCapacity < 0) { + newCapacity = minCapacity; + } + this.buffer = Arrays.copyOf(this.segment.getArray(), newCapacity); + this.segment = MemorySegment.wrap(buffer); + } + + private static void writeBytesToFixLenPart( + MemorySegment segment, int fieldOffset, byte[] bytes, int len) { + long firstByte = len | 128; + long sevenBytes = 0L; + if (MemorySegment.LITTLE_ENDIAN) { + for (int i = 0; i < len; ++i) { + sevenBytes |= (255L & (long) bytes[i]) << (int) ((long) i * 8L); + } + } else { + for (int i = 0; i < len; ++i) { + sevenBytes |= (255L & (long) bytes[i]) << (int) ((long) (6 - i) * 8L); + } + } + + long offsetAndSize = firstByte << 56 | sevenBytes; + segment.putLong(fieldOffset, offsetAndSize); + } + + // ----------------------- internal methods ------------------------------- + + private int getFieldOffset(int pos) { + return nullBitsSizeInBytes + 8 * pos; + } + + private static int getFixedLengthPartSize(int nullBitsSizeInBytes, int arity) { + return nullBitsSizeInBytes + 8 * arity; + } + + private static int calculateBitSetWidthInBytes(int arity) { + return (arity + 63 + 8) / 64 * 8; + } + + public void setOffsetAndSize(int pos, int offset, long size) { + long offsetAndSize = (long) offset << 32 | size; + this.segment.putLong(this.getFieldOffset(pos), offsetAndSize); + } + + /** + * Creates an accessor for writing the elements of an hudi binary row writer during runtime. + * + * @param fieldType the field type to write + */ + public static FieldWriter createFieldWriter(DataType fieldType) { + final FieldWriter fieldWriter; + switch (fieldType.getTypeRoot()) { + case CHAR: + case STRING: + fieldWriter = (writer, pos, value) -> writer.writeString(pos, (BinaryString) value); + break; + case BOOLEAN: + fieldWriter = ((writer, pos, value) -> writer.writeBoolean(pos, (boolean) value)); + break; + case BINARY: + case BYTES: + fieldWriter = (writer, pos, value) -> writer.writeBytes(pos, (byte[]) value); + break; + case DECIMAL: + final int decimalPrecision = getPrecision(fieldType); + fieldWriter = + (writer, pos, value) -> + writer.writeDecimal(pos, (Decimal) value, decimalPrecision); + break; + case TINYINT: + fieldWriter = (writer, pos, value) -> writer.writeByte(pos, (byte) value); + break; + case SMALLINT: + fieldWriter = (writer, pos, value) -> writer.writeShort(pos, (short) value); + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + fieldWriter = (writer, pos, value) -> writer.writeInt(pos, (int) value); + break; + case BIGINT: + fieldWriter = (writer, pos, value) -> writer.writeLong(pos, (long) value); + break; + case FLOAT: + fieldWriter = (writer, pos, value) -> writer.writeFloat(pos, (float) value); + break; + case DOUBLE: + fieldWriter = (writer, pos, value) -> writer.writeDouble(pos, (double) value); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampNtzPrecision = getPrecision(fieldType); + fieldWriter = + (writer, pos, value) -> + writer.writeTimestampNtz( + pos, (TimestampNtz) value, timestampNtzPrecision); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampLtzPrecision = getPrecision(fieldType); + fieldWriter = + (writer, pos, value) -> + writer.writeTimestampLtz( + pos, (TimestampLtz) value, timestampLtzPrecision); + break; + default: + throw new IllegalArgumentException( + "Unsupported type for Hudi BinaryRow writer: " + fieldType); + } + if (!fieldType.isNullable()) { + return fieldWriter; + } + return (writer, pos, value) -> { + if (value == null) { + writer.setNullAt(pos); + } else { + fieldWriter.writeField(writer, pos, value); + } + }; + } + + /** Accessor for writing the elements of an hudi binary row writer during runtime. */ + interface FieldWriter extends Serializable { + void writeField(HudiBinaryRowWriter writer, int pos, Object value); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java new file mode 100644 index 0000000000..3f9b27f003 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.encode.hudi; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** An implementation of {@link KeyEncoder} to follow Hudi's encoding strategy. */ +public class HudiKeyEncoder implements KeyEncoder { + + private final InternalRow.FieldGetter[] fieldGetters; + + private final HudiBinaryRowWriter.FieldWriter[] fieldEncoders; + + public HudiKeyEncoder(RowType rowType, List keys) { + // for get fields from fluss internal row + fieldGetters = new InternalRow.FieldGetter[keys.size()]; + // for encode fields into hudi + fieldEncoders = new HudiBinaryRowWriter.FieldWriter[keys.size()]; + for (int i = 0; i < keys.size(); i++) { + int keyIndex = rowType.getFieldIndex(keys.get(i)); + DataType keyDataType = rowType.getTypeAt(keyIndex); + fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, keyIndex); + fieldEncoders[i] = HudiBinaryRowWriter.createFieldWriter(keyDataType); + } + } + + @Override + public byte[] encodeKey(InternalRow row) { + List values = new ArrayList<>(); + // iterate all the fields of the row, and encode each field + for (int i = 0; i < fieldGetters.length; i++) { + Object value = fieldGetters[i].getFieldOrNull(row); + if (value instanceof BinaryString) { + values.add(((BinaryString) value).toString()); + } else { + values.add(String.valueOf(value)); + } + } + int hashCode = Arrays.asList(values.toArray(new String[0])).hashCode(); + + return ByteBuffer.allocate(4).putInt(hashCode).array(); + } +} diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index ddf6b1329d..09605b8aa7 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -32,6 +32,42 @@ + + org.apache.hudi + hudi-flink${flink.major.version}-bundle + ${hudi.version} + + + log4j + log4j + + + org.slf4j + slf4j-api + + + org.apache.commons + commons-lang3 + + + org.apache.curator + curator-client + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + org.apache.zookeeper + zookeeper + + + + org.apache.fluss fluss-common diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java new file mode 100644 index 0000000000..e8361225eb --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.bucketing; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.encode.hudi.HudiKeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link HudiBucketingFunction}. */ +class HudiBucketingFunctionTest { + + @Test + void testIntegerHash() { + int testValue = 42; + int bucketNum = 10; + String key = "id"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + String recordKey = String.valueOf(testValue); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {recordKey}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testLongHash() { + long testValue = 1234567890123456789L; + int bucketNum = 10; + String key = "id"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + String recordKey = String.valueOf(testValue); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {recordKey}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testStringHash() { + String testValue = "Hello Hudi, Fluss this side!"; + int bucketNum = 10; + String key = "name"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {key}); + + GenericRow row = GenericRow.of(BinaryString.fromString(testValue)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testDecimalHash() { + BigDecimal testValue = new BigDecimal("123.45"); + Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2); + int bucketNum = 10; + String key = "amount"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new String[] {key}); + + GenericRow row = GenericRow.of(decimal); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue.toPlainString()}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue.toPlainString(), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testTimestampEncodingHash() throws IOException { + long millis = 1698235273182L; + int nanos = 123456; + long micros = millis * 1000 + (nanos / 1000); + TimestampNtz testValue = TimestampNtz.fromMillis(millis, nanos); + int bucketNum = 10; + String key = "event_time"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue.toString()}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue.toString(), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testDateHash() { + int dateValue = 19655; + int bucketNum = 10; + String key = "date"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new String[] {key}); + GenericRow row = GenericRow.of(dateValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {String.valueOf(dateValue)}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(String.valueOf(dateValue), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testMultiKeysHashing() { + int testIntValue = 42; + long testLongValue = 1234567890123456789L; + String testStringValue = "Hello Hudi, Fluss this side!"; + BigDecimal testValue = new BigDecimal("123.45"); + Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2); + int bucketNum = 10; + String key = "age,id,name,amount"; + String recordKey = + "age:" + + testIntValue + + ",id:" + + testLongValue + + ",name:" + + testStringValue + + ",amount:" + + testValue; + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.DECIMAL(10, 2) + }, + new String[] {"age", "id", "name", "amount"}); + GenericRow row = + GenericRow.of( + testIntValue, + testLongValue, + BinaryString.fromString(testStringValue), + decimal); + HudiKeyEncoder encoder = + new HudiKeyEncoder(rowType, Arrays.asList("age", "id", "name", "amount")); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = + toBytes( + new String[] { + String.valueOf(testIntValue), + String.valueOf(testLongValue), + testStringValue, + String.valueOf(decimal) + }); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + private byte[] toBytes(String[] value) { + List values = Arrays.asList(value); + return ByteBuffer.allocate(4).putInt(values.hashCode()).array(); + } +} From 8ee308e44856d62a753c87d365311d0fecc8549a Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 14 May 2026 20:10:37 +0800 Subject: [PATCH 2/9] [lake/hudi] optimize code impl of HudiBucketingFunction and HudiKeyEncoder,add four test cases in HudiBucketingFunctionTest --- .../bucketing/HudiBucketingFunction.java | 40 +++++++-- .../fluss/row/encode/hudi/HudiKeyEncoder.java | 65 +++++++++----- .../bucketing/HudiBucketingFunctionTest.java | 84 +++++++++++++++++++ 3 files changed, 160 insertions(+), 29 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java index 2a6cce225d..7a02f9e96a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java +++ b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java @@ -17,21 +17,45 @@ package org.apache.fluss.bucketing; -import java.nio.ByteBuffer; - -/** An implementation of {@link BucketingFunction} to follow Hudi's bucketing strategy. */ +/** + * An implementation of {@link BucketingFunction} to follow Hudi's bucketing strategy. + * + *

The bucket id is computed in the same way as Hudi's {@code + * org.apache.hudi.index.bucket.BucketIdentifier#getBucketId(String, String, int)}: take a 32-bit + * integer hash that is encoded as a fixed 4-byte big-endian array by {@code HudiKeyEncoder}, + * mask its sign bit and modulo by {@code numBuckets}. + */ public class HudiBucketingFunction implements BucketingFunction { + + /** Length of a Hudi-encoded bucket key, in bytes (a single big-endian {@code int}). */ + private static final int ENCODED_KEY_LENGTH = 4; + @Override public int bucketing(byte[] bucketKey, int numBuckets) { - if (bucketKey == null || bucketKey.length == 0) { - throw new IllegalArgumentException("bucketKey must not be null or empty"); + if (bucketKey == null) { + throw new IllegalArgumentException("bucketKey must not be null"); + } + if (bucketKey.length != ENCODED_KEY_LENGTH) { + throw new IllegalArgumentException( + "bucketKey must be exactly " + + ENCODED_KEY_LENGTH + + " bytes for Hudi bucketing, but got " + + bucketKey.length + + " bytes. The bucket key bytes are expected to be produced by HudiKeyEncoder."); } if (numBuckets <= 0) { - throw new IllegalArgumentException("numBuckets must be positive"); + throw new IllegalArgumentException( + "numBuckets must be positive, but got " + numBuckets); } - int restored = ByteBuffer.wrap(bucketKey).getInt(); + // Decode 4-byte big-endian int produced by HudiKeyEncoder via bit operations + // to avoid allocating a ByteBuffer instance on this hot path. + int restored = + ((bucketKey[0] & 0xFF) << 24) + | ((bucketKey[1] & 0xFF) << 16) + | ((bucketKey[2] & 0xFF) << 8) + | (bucketKey[3] & 0xFF); return (restored & Integer.MAX_VALUE) % numBuckets; } -} +} \ No newline at end of file diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java index 3f9b27f003..606cefdd31 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java @@ -23,45 +23,68 @@ import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -/** An implementation of {@link KeyEncoder} to follow Hudi's encoding strategy. */ +/** + * An implementation of {@link KeyEncoder} to follow Hudi's encoding strategy. + * + *

The encoded bytes are a 4-byte big-endian representation of {@code + * List.hashCode()} over the stringified key fields, which matches the way + * Hudi's {@code BucketIdentifier} hashes a record key. Null fields are replaced by + * {@link #NULL_RECORDKEY_PLACEHOLDER} so that an explicit null and the literal + * string {@code "null"} no longer collide in the hash space. + */ public class HudiKeyEncoder implements KeyEncoder { - private final InternalRow.FieldGetter[] fieldGetters; + /** + * Placeholder used to represent a {@code null} key field when computing the + * record-key hash. It is intentionally aligned with Hudi's + * {@code KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER} so that the resulting bucket id + * stays identical to what Hudi would compute on its side. + */ + public static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; - private final HudiBinaryRowWriter.FieldWriter[] fieldEncoders; + private final InternalRow.FieldGetter[] fieldGetters; public HudiKeyEncoder(RowType rowType, List keys) { - // for get fields from fluss internal row + // for getting key fields out of fluss internal row fieldGetters = new InternalRow.FieldGetter[keys.size()]; - // for encode fields into hudi - fieldEncoders = new HudiBinaryRowWriter.FieldWriter[keys.size()]; for (int i = 0; i < keys.size(); i++) { int keyIndex = rowType.getFieldIndex(keys.get(i)); DataType keyDataType = rowType.getTypeAt(keyIndex); fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, keyIndex); - fieldEncoders[i] = HudiBinaryRowWriter.createFieldWriter(keyDataType); } } @Override public byte[] encodeKey(InternalRow row) { - List values = new ArrayList<>(); - // iterate all the fields of the row, and encode each field - for (int i = 0; i < fieldGetters.length; i++) { - Object value = fieldGetters[i].getFieldOrNull(row); - if (value instanceof BinaryString) { - values.add(((BinaryString) value).toString()); - } else { - values.add(String.valueOf(value)); - } + // Build the same string list that Hudi would build out of a record key, so the + // resulting List#hashCode() — and therefore the bucket id — match Hudi's own + // BucketIdentifier#getBucketId. + List values = new ArrayList<>(fieldGetters.length); + for (InternalRow.FieldGetter fieldGetter : fieldGetters) { + Object value = fieldGetter.getFieldOrNull(row); + values.add(stringifyForRecordKey(value)); } - int hashCode = Arrays.asList(values.toArray(new String[0])).hashCode(); + int hashCode = values.hashCode(); - return ByteBuffer.allocate(4).putInt(hashCode).array(); + // 4-byte big-endian, decoded symmetrically by HudiBucketingFunction. + return new byte[] { + (byte) (hashCode >>> 24), + (byte) (hashCode >>> 16), + (byte) (hashCode >>> 8), + (byte) hashCode + }; + } + + private static String stringifyForRecordKey(Object value) { + if (value == null) { + return NULL_RECORDKEY_PLACEHOLDER; + } + if (value instanceof BinaryString) { + return value.toString(); + } + return String.valueOf(value); } -} +} \ No newline at end of file diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java index e8361225eb..8cb4c4834e 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java @@ -36,7 +36,9 @@ import java.util.Collections; import java.util.List; +import static org.apache.fluss.row.encode.hudi.HudiKeyEncoder.NULL_RECORDKEY_PLACEHOLDER; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit test for {@link HudiBucketingFunction}. */ class HudiBucketingFunctionTest { @@ -256,6 +258,88 @@ void testMultiKeysHashing() { assertThat(ourBucket).isEqualTo(hudiBucket); } + @Test + void testNullFieldUsesPlaceholder() { + int bucketNum = 10; + String key = "name"; + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); + + // a row with an explicit null on the bucket key column + GenericRow row = GenericRow.of((Object) null); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encoded bytes must hash the placeholder, NOT the literal "null" string. + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] placeholderBytes = toBytes(new String[] {NULL_RECORDKEY_PLACEHOLDER}); + byte[] javaNullLiteralBytes = toBytes(new String[] {"null"}); + + assertThat(ourEncodedKey).isEqualTo(placeholderBytes); + assertThat(ourEncodedKey).isNotEqualTo(javaNullLiteralBytes); + + int hudiBucket = + BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testNullFieldDoesNotCollideWithLiteralNullString() { + // The literal string "null" must not produce the same encoded bytes as a real + // null value — a regression test for the previous String.valueOf(null) behavior. + String key = "name"; + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); + + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + byte[] encodedNull = encoder.encodeKey(GenericRow.of((Object) null)); + byte[] encodedLiteralNull = + encoder.encodeKey(GenericRow.of(BinaryString.fromString("null"))); + + assertThat(encodedNull).isNotEqualTo(encodedLiteralNull); + } + + @Test + void testBucketingRejectsInvalidBucketKey() { + HudiBucketingFunction function = new HudiBucketingFunction(); + + assertThatThrownBy(() -> function.bucketing(null, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be null"); + + // length 0 — wrong length + assertThatThrownBy(() -> function.bucketing(new byte[0], 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + + // length < 4 — used to throw BufferUnderflowException, now must be IAE + assertThatThrownBy(() -> function.bucketing(new byte[] {1, 2, 3}, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + + // length > 4 — used to silently truncate, now must be IAE + assertThatThrownBy(() -> function.bucketing(new byte[] {1, 2, 3, 4, 5}, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + } + + @Test + void testBucketingRejectsNonPositiveNumBuckets() { + HudiBucketingFunction function = new HudiBucketingFunction(); + byte[] anyKey = new byte[] {0, 0, 0, 1}; + + assertThatThrownBy(() -> function.bucketing(anyKey, 0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be positive"); + assertThatThrownBy(() -> function.bucketing(anyKey, -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be positive"); + } + private byte[] toBytes(String[] value) { List values = Arrays.asList(value); return ByteBuffer.allocate(4).putInt(values.hashCode()).array(); From ff163bccd6b4f168938b4998852908318f7e1223 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 14 May 2026 20:45:27 +0800 Subject: [PATCH 3/9] =?UTF-8?q?[lake/hudi]=20rm=20HudiBinaryRowWriter=20&?= =?UTF-8?q?=20add=20five=20test=20cases=20in=20HudiBucketingFunctionTest(c?= =?UTF-8?q?omposite=20keys=E3=80=81multi=20data=20type).?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../row/encode/hudi/HudiBinaryRowWriter.java | 406 ------------------ .../bucketing/HudiBucketingFunctionTest.java | 163 +++++++ 2 files changed, 163 insertions(+), 406 deletions(-) delete mode 100644 fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiBinaryRowWriter.java diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiBinaryRowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiBinaryRowWriter.java deleted file mode 100644 index b3375aba95..0000000000 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiBinaryRowWriter.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.row.encode.hudi; - -import org.apache.fluss.memory.MemorySegment; -import org.apache.fluss.record.ChangeType; -import org.apache.fluss.row.BinarySegmentUtils; -import org.apache.fluss.row.BinaryString; -import org.apache.fluss.row.Decimal; -import org.apache.fluss.row.TimestampLtz; -import org.apache.fluss.row.TimestampNtz; -import org.apache.fluss.types.DataType; - -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; - -import static org.apache.fluss.types.DataTypeChecks.getPrecision; - -/** - * A writer to encode Fluss's {@link org.apache.fluss.row.InternalRow} using Hudi's binary encoding - * format. - */ -public class HudiBinaryRowWriter { - - private final int nullBitsSizeInBytes; - private final int fixedSize; - private byte[] buffer; - private MemorySegment segment; - private int cursor; - - public HudiBinaryRowWriter(int arity) { - this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); - this.fixedSize = getFixedLengthPartSize(nullBitsSizeInBytes, arity); - this.cursor = fixedSize; - this.buffer = new byte[fixedSize]; - this.segment = MemorySegment.wrap(buffer); - } - - public void reset() { - this.cursor = this.fixedSize; - - for (int i = 0; i < this.nullBitsSizeInBytes; i += 8) { - this.segment.putLong(i, 0L); - } - } - - public byte[] toBytes() { - byte[] result = new byte[cursor]; - System.arraycopy(buffer, 0, result, 0, cursor); - return result; - } - - public void setNullAt(int pos) { - setNullBit(pos); - segment.putLong(getFieldOffset(pos), 0L); - } - - public void setNullBit(int pos) { - BinarySegmentUtils.bitSet(segment, 0, pos + 8); - } - - public void writeRowKind(ChangeType kind) { - // convert Fluss changeType to Hudi rowKind byte value - byte hudiRowKindByte; - switch (kind) { - case APPEND_ONLY: - case INSERT: - hudiRowKindByte = 0; - break; - case UPDATE_BEFORE: - hudiRowKindByte = 1; - break; - case UPDATE_AFTER: - hudiRowKindByte = 2; - break; - case DELETE: - hudiRowKindByte = 3; - break; - default: - throw new IllegalArgumentException("Unsupported change type: " + kind); - } - segment.put(0, hudiRowKindByte); - } - - public void writeBoolean(int pos, boolean value) { - segment.putBoolean(getFieldOffset(pos), value); - } - - public void writeByte(int pos, byte value) { - segment.put(getFieldOffset(pos), value); - } - - public void writeShort(int pos, short value) { - segment.putShort(getFieldOffset(pos), value); - } - - public void writeInt(int pos, int value) { - segment.putInt(getFieldOffset(pos), value); - } - - public void writeLong(int pos, long value) { - segment.putLong(getFieldOffset(pos), value); - } - - public void writeFloat(int pos, float value) { - segment.putFloat(getFieldOffset(pos), value); - } - - public void writeDouble(int pos, double value) { - segment.putDouble(getFieldOffset(pos), value); - } - - public void writeString(int pos, BinaryString input) { - if (input.getSegments() == null) { - String javaObject = input.toString(); - writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8)); - } else { - int len = input.getSizeInBytes(); - if (len <= 7) { - byte[] bytes = BinarySegmentUtils.allocateReuseBytes(len); - BinarySegmentUtils.copyToBytes( - input.getSegments(), input.getOffset(), bytes, 0, len); - writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); - } else { - writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), len); - } - } - } - - private void writeBytes(int pos, byte[] bytes) { - int len = bytes.length; - if (len <= 7) { - writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); - } else { - writeBytesToVarLenPart(pos, bytes, len); - } - } - - public void writeDecimal(int pos, Decimal value, int precision) { - assert value == null || value.precision() == precision; - - if (Decimal.isCompact(precision)) { - assert value != null; - - this.writeLong(pos, value.toUnscaledLong()); - } else { - this.ensureCapacity(16); - this.segment.putLong(this.cursor, 0L); - this.segment.putLong(this.cursor + 8, 0L); - if (value == null) { - this.setNullBit(pos); - this.setOffsetAndSize(pos, this.cursor, 0L); - } else { - byte[] bytes = value.toUnscaledBytes(); - - assert bytes.length <= 16; - - this.segment.put(this.cursor, bytes, 0, bytes.length); - this.setOffsetAndSize(pos, this.cursor, bytes.length); - } - - this.cursor += 16; - } - } - - public void writeTimestampNtz(int pos, TimestampNtz value, int precision) { - if (TimestampNtz.isCompact(precision)) { - writeLong(pos, value.getMillisecond()); - } else { - // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond - ensureCapacity(8); - if (value == null) { - setNullBit(pos); - // zero-out the bytes - segment.putLong(cursor, 0L); - setOffsetAndSize(pos, cursor, 0); - } else { - segment.putLong(cursor, value.getMillisecond()); - setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond()); - } - cursor += 8; - } - } - - public void writeTimestampLtz(int pos, TimestampLtz value, int precision) { - if (TimestampLtz.isCompact(precision)) { - writeLong(pos, value.getEpochMillisecond()); - } else { - // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond - ensureCapacity(8); - if (value == null) { - setNullBit(pos); - // zero-out the bytes - segment.putLong(cursor, 0L); - setOffsetAndSize(pos, cursor, 0); - } else { - segment.putLong(cursor, value.getEpochMillisecond()); - setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond()); - } - } - } - - private void writeSegmentsToVarLenPart( - int pos, MemorySegment[] segments, int offset, int size) { - int roundedSize = roundNumberOfBytesToNearestWord(size); - this.ensureCapacity(roundedSize); - this.zeroOutPaddingBytes(size); - if (segments.length == 1) { - segments[0].copyTo(offset, this.segment, this.cursor, size); - } else { - this.writeMultiSegmentsToVarLenPart(segments, offset, size); - } - - this.setOffsetAndSize(pos, this.cursor, size); - this.cursor += roundedSize; - } - - private void writeMultiSegmentsToVarLenPart(MemorySegment[] segments, int offset, int size) { - int needCopy = size; - int fromOffset = offset; - int toOffset = this.cursor; - - for (MemorySegment sourceSegment : segments) { - int remain = sourceSegment.size() - fromOffset; - if (remain > 0) { - int copySize = Math.min(remain, needCopy); - sourceSegment.copyTo(fromOffset, this.segment, toOffset, copySize); - needCopy -= copySize; - toOffset += copySize; - fromOffset = 0; - } else { - fromOffset -= sourceSegment.size(); - } - } - } - - private void writeBytesToVarLenPart(int pos, byte[] bytes, int len) { - int roundedSize = roundNumberOfBytesToNearestWord(len); - this.ensureCapacity(roundedSize); - this.zeroOutPaddingBytes(len); - this.segment.put(this.cursor, bytes, 0, len); - this.setOffsetAndSize(pos, this.cursor, len); - this.cursor += roundedSize; - } - - protected static int roundNumberOfBytesToNearestWord(int numBytes) { - int remainder = numBytes & 7; - return remainder == 0 ? numBytes : numBytes + (8 - remainder); - } - - protected void zeroOutPaddingBytes(int numBytes) { - if ((numBytes & 7) > 0) { - this.segment.putLong(this.cursor + (numBytes >> 3 << 3), 0L); - } - } - - protected void ensureCapacity(int neededSize) { - int length = this.cursor + neededSize; - if (this.segment.size() < length) { - this.grow(length); - } - } - - private void grow(int minCapacity) { - int oldCapacity = this.segment.size(); - int newCapacity = oldCapacity + (oldCapacity >> 1); - if (newCapacity - minCapacity < 0) { - newCapacity = minCapacity; - } - this.buffer = Arrays.copyOf(this.segment.getArray(), newCapacity); - this.segment = MemorySegment.wrap(buffer); - } - - private static void writeBytesToFixLenPart( - MemorySegment segment, int fieldOffset, byte[] bytes, int len) { - long firstByte = len | 128; - long sevenBytes = 0L; - if (MemorySegment.LITTLE_ENDIAN) { - for (int i = 0; i < len; ++i) { - sevenBytes |= (255L & (long) bytes[i]) << (int) ((long) i * 8L); - } - } else { - for (int i = 0; i < len; ++i) { - sevenBytes |= (255L & (long) bytes[i]) << (int) ((long) (6 - i) * 8L); - } - } - - long offsetAndSize = firstByte << 56 | sevenBytes; - segment.putLong(fieldOffset, offsetAndSize); - } - - // ----------------------- internal methods ------------------------------- - - private int getFieldOffset(int pos) { - return nullBitsSizeInBytes + 8 * pos; - } - - private static int getFixedLengthPartSize(int nullBitsSizeInBytes, int arity) { - return nullBitsSizeInBytes + 8 * arity; - } - - private static int calculateBitSetWidthInBytes(int arity) { - return (arity + 63 + 8) / 64 * 8; - } - - public void setOffsetAndSize(int pos, int offset, long size) { - long offsetAndSize = (long) offset << 32 | size; - this.segment.putLong(this.getFieldOffset(pos), offsetAndSize); - } - - /** - * Creates an accessor for writing the elements of an hudi binary row writer during runtime. - * - * @param fieldType the field type to write - */ - public static FieldWriter createFieldWriter(DataType fieldType) { - final FieldWriter fieldWriter; - switch (fieldType.getTypeRoot()) { - case CHAR: - case STRING: - fieldWriter = (writer, pos, value) -> writer.writeString(pos, (BinaryString) value); - break; - case BOOLEAN: - fieldWriter = ((writer, pos, value) -> writer.writeBoolean(pos, (boolean) value)); - break; - case BINARY: - case BYTES: - fieldWriter = (writer, pos, value) -> writer.writeBytes(pos, (byte[]) value); - break; - case DECIMAL: - final int decimalPrecision = getPrecision(fieldType); - fieldWriter = - (writer, pos, value) -> - writer.writeDecimal(pos, (Decimal) value, decimalPrecision); - break; - case TINYINT: - fieldWriter = (writer, pos, value) -> writer.writeByte(pos, (byte) value); - break; - case SMALLINT: - fieldWriter = (writer, pos, value) -> writer.writeShort(pos, (short) value); - break; - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - fieldWriter = (writer, pos, value) -> writer.writeInt(pos, (int) value); - break; - case BIGINT: - fieldWriter = (writer, pos, value) -> writer.writeLong(pos, (long) value); - break; - case FLOAT: - fieldWriter = (writer, pos, value) -> writer.writeFloat(pos, (float) value); - break; - case DOUBLE: - fieldWriter = (writer, pos, value) -> writer.writeDouble(pos, (double) value); - break; - case TIMESTAMP_WITHOUT_TIME_ZONE: - final int timestampNtzPrecision = getPrecision(fieldType); - fieldWriter = - (writer, pos, value) -> - writer.writeTimestampNtz( - pos, (TimestampNtz) value, timestampNtzPrecision); - break; - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - final int timestampLtzPrecision = getPrecision(fieldType); - fieldWriter = - (writer, pos, value) -> - writer.writeTimestampLtz( - pos, (TimestampLtz) value, timestampLtzPrecision); - break; - default: - throw new IllegalArgumentException( - "Unsupported type for Hudi BinaryRow writer: " + fieldType); - } - if (!fieldType.isNullable()) { - return fieldWriter; - } - return (writer, pos, value) -> { - if (value == null) { - writer.setNullAt(pos); - } else { - fieldWriter.writeField(writer, pos, value); - } - }; - } - - /** Accessor for writing the elements of an hudi binary row writer during runtime. */ - interface FieldWriter extends Serializable { - void writeField(HudiBinaryRowWriter writer, int pos, Object value); - } -} diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java index 8cb4c4834e..bbea06153d 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.row.encode.hudi.HudiKeyEncoder; import org.apache.fluss.types.DataType; @@ -32,6 +33,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -340,6 +342,167 @@ void testBucketingRejectsNonPositiveNumBuckets() { .hasMessageContaining("must be positive"); } + @Test + void testCompositeBucketKeyMatchesHudiFieldValueRecordKey() { + // Two-field bucket key: Hudi expects record key in "f1:v1,f2:v2" form when its + // BucketIdentifier sees a ':' in the record key, then extracts ["v1","v2"] and + // hashes that List. Fluss feeds the same ["v1","v2"] into List#hashCode(). + int bucketNum = 16; + String f1 = "user_id"; + String f2 = "region"; + int idValue = 12345; + String regionValue = "cn-north"; + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING()}, + new String[] {f1, f2}); + GenericRow row = GenericRow.of(idValue, BinaryString.fromString(regionValue)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] expected = + toBytes(new String[] {String.valueOf(idValue), regionValue}); + assertThat(ourEncodedKey).isEqualTo(expected); + + // Compare against Hudi's List-based overload to avoid Hudi's own + // recordKey-parsing path (which would split on ':' inside values like timestamps). + int hudiBucket = + BucketIdentifier.getBucketId( + Arrays.asList(String.valueOf(idValue), regionValue), bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testCompositeBucketKeyWithNullFieldUsesPlaceholder() { + int bucketNum = 8; + String f1 = "user_id"; + String f2 = "region"; + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().copy(true), DataTypes.STRING().copy(true) + }, + new String[] {f1, f2}); + // f2 is null on purpose + GenericRow row = GenericRow.of(42, null); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {"42", NULL_RECORDKEY_PLACEHOLDER}); + assertThat(ourEncodedKey).isEqualTo(expected); + + int hudiBucket = + BucketIdentifier.getBucketId( + Arrays.asList("42", NULL_RECORDKEY_PLACEHOLDER), bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testBooleanAndIntegralTypes() { + int bucketNum = 10; + // BOOLEAN + assertSingleFieldRoundTrip( + DataTypes.BOOLEAN(), + true, + String.valueOf(true), + "flag", + bucketNum); + assertSingleFieldRoundTrip( + DataTypes.BOOLEAN(), + false, + String.valueOf(false), + "flag", + bucketNum); + // TINYINT / SMALLINT + assertSingleFieldRoundTrip( + DataTypes.TINYINT(), (byte) 7, "7", "b", bucketNum); + assertSingleFieldRoundTrip( + DataTypes.SMALLINT(), (short) 12345, "12345", "s", bucketNum); + // FLOAT (use a value whose Float.toString is stable across JVMs) + assertSingleFieldRoundTrip( + DataTypes.FLOAT(), 3.5f, Float.toString(3.5f), "f", bucketNum); + } + + @Test + void testDateAndTimeTypes() { + int bucketNum = 10; + // DATE is stored as days-since-epoch int internally; record key is its String value. + int dateDays = 19852; + assertSingleFieldRoundTrip( + DataTypes.DATE(), dateDays, String.valueOf(dateDays), "d", bucketNum); + // TIME is stored as ms-of-day int. + int timeMillis = 12345678; + assertSingleFieldRoundTrip( + DataTypes.TIME(), timeMillis, String.valueOf(timeMillis), "t", bucketNum); + } + + @Test + void testTimestampLtzType() throws IOException { + int bucketNum = 10; + TimestampLtz ts = TimestampLtz.fromInstant(Instant.ofEpochMilli(1700000000000L)); + RowType rowType = + RowType.of(new DataType[] {DataTypes.TIMESTAMP_LTZ(6)}, new String[] {"ts"}); + GenericRow row = GenericRow.of(ts); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList("ts")); + byte[] enc = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {ts.toString()}); + assertThat(enc).isEqualTo(expected); + + int hudiBucket = + BucketIdentifier.getBucketId( + Collections.singletonList(ts.toString()), bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(enc, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testBucketingNumBucketsBoundaryValues() { + HudiBucketingFunction f = new HudiBucketingFunction(); + // numBuckets == 1 => bucket id always 0 + for (int sample : new int[] {0, 1, -1, Integer.MAX_VALUE, Integer.MIN_VALUE}) { + byte[] key = intToBytes(sample); + assertThat(f.bucketing(key, 1)).isEqualTo(0); + } + // numBuckets == Integer.MAX_VALUE: bucket id == hash & MAX_VALUE + int hash = Integer.MIN_VALUE; // stress sign-bit handling + byte[] key = intToBytes(hash); + int expected = (hash & Integer.MAX_VALUE) % Integer.MAX_VALUE; + assertThat(f.bucketing(key, Integer.MAX_VALUE)).isEqualTo(expected); + // bucket id must always be in [0, numBuckets) + assertThat(f.bucketing(key, 7)) + .isGreaterThanOrEqualTo(0) + .isLessThan(7); + } + + private void assertSingleFieldRoundTrip( + DataType dataType, + Object value, + String stringified, + String key, + int bucketNum) { + RowType rowType = RowType.of(new DataType[] {dataType}, new String[] {key}); + GenericRow row = GenericRow.of(value); + HudiKeyEncoder encoder = + new HudiKeyEncoder(rowType, Collections.singletonList(key)); + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {stringified}); + assertThat(ourEncodedKey).isEqualTo(expected); + + int hudiBucket = BucketIdentifier.getBucketId(stringified, key, bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + private static byte[] intToBytes(int v) { + return new byte[] { + (byte) (v >>> 24), (byte) (v >>> 16), (byte) (v >>> 8), (byte) v + }; + } + private byte[] toBytes(String[] value) { List values = Arrays.asList(value); return ByteBuffer.allocate(4).putInt(values.hashCode()).array(); From 4e1098c819c6c6ee8e58a53566ebce9d80c24041 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 14 May 2026 21:23:12 +0800 Subject: [PATCH 4/9] [lake/hudi] set scope for hudi-flink bundle in pom.xml --- fluss-lake/fluss-lake-hudi/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index 09605b8aa7..9fbce449f5 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -36,6 +36,7 @@ org.apache.hudi hudi-flink${flink.major.version}-bundle ${hudi.version} + test log4j From b0726dbfadabae02f4d32b7b82df2f3c728a9309 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 14 May 2026 21:41:00 +0800 Subject: [PATCH 5/9] [lake/hudi] fix format violations --- .../bucketing/HudiBucketingFunction.java | 6 +- .../fluss/row/encode/hudi/HudiKeyEncoder.java | 18 +++--- .../bucketing/HudiBucketingFunctionTest.java | 57 +++++-------------- 3 files changed, 26 insertions(+), 55 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java index 7a02f9e96a..0799d64b5f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java +++ b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java @@ -22,8 +22,8 @@ * *

The bucket id is computed in the same way as Hudi's {@code * org.apache.hudi.index.bucket.BucketIdentifier#getBucketId(String, String, int)}: take a 32-bit - * integer hash that is encoded as a fixed 4-byte big-endian array by {@code HudiKeyEncoder}, - * mask its sign bit and modulo by {@code numBuckets}. + * integer hash that is encoded as a fixed 4-byte big-endian array by {@code HudiKeyEncoder}, mask + * its sign bit and modulo by {@code numBuckets}. */ public class HudiBucketingFunction implements BucketingFunction { @@ -58,4 +58,4 @@ public int bucketing(byte[] bucketKey, int numBuckets) { return (restored & Integer.MAX_VALUE) % numBuckets; } -} \ No newline at end of file +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java index 606cefdd31..ae5ab770fe 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java @@ -29,19 +29,17 @@ /** * An implementation of {@link KeyEncoder} to follow Hudi's encoding strategy. * - *

The encoded bytes are a 4-byte big-endian representation of {@code - * List.hashCode()} over the stringified key fields, which matches the way - * Hudi's {@code BucketIdentifier} hashes a record key. Null fields are replaced by - * {@link #NULL_RECORDKEY_PLACEHOLDER} so that an explicit null and the literal - * string {@code "null"} no longer collide in the hash space. + *

The encoded bytes are a 4-byte big-endian representation of {@code List.hashCode()} + * over the stringified key fields, which matches the way Hudi's {@code BucketIdentifier} hashes a + * record key. Null fields are replaced by {@link #NULL_RECORDKEY_PLACEHOLDER} so that an explicit + * null and the literal string {@code "null"} no longer collide in the hash space. */ public class HudiKeyEncoder implements KeyEncoder { /** - * Placeholder used to represent a {@code null} key field when computing the - * record-key hash. It is intentionally aligned with Hudi's - * {@code KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER} so that the resulting bucket id - * stays identical to what Hudi would compute on its side. + * Placeholder used to represent a {@code null} key field when computing the record-key hash. It + * is intentionally aligned with Hudi's {@code KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER} so that + * the resulting bucket id stays identical to what Hudi would compute on its side. */ public static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; @@ -87,4 +85,4 @@ private static String stringifyForRecordKey(Object value) { } return String.valueOf(value); } -} \ No newline at end of file +} diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java index bbea06153d..d9a96248ca 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java @@ -266,8 +266,7 @@ void testNullFieldUsesPlaceholder() { String key = "name"; RowType rowType = - RowType.of( - new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); + RowType.of(new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); // a row with an explicit null on the bucket key column GenericRow row = GenericRow.of((Object) null); @@ -281,8 +280,7 @@ void testNullFieldUsesPlaceholder() { assertThat(ourEncodedKey).isEqualTo(placeholderBytes); assertThat(ourEncodedKey).isNotEqualTo(javaNullLiteralBytes); - int hudiBucket = - BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); + int hudiBucket = BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } @@ -293,8 +291,7 @@ void testNullFieldDoesNotCollideWithLiteralNullString() { // null value — a regression test for the previous String.valueOf(null) behavior. String key = "name"; RowType rowType = - RowType.of( - new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); + RowType.of(new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); @@ -361,8 +358,7 @@ void testCompositeBucketKeyMatchesHudiFieldValueRecordKey() { HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); byte[] ourEncodedKey = encoder.encodeKey(row); - byte[] expected = - toBytes(new String[] {String.valueOf(idValue), regionValue}); + byte[] expected = toBytes(new String[] {String.valueOf(idValue), regionValue}); assertThat(ourEncodedKey).isEqualTo(expected); // Compare against Hudi's List-based overload to avoid Hudi's own @@ -382,9 +378,7 @@ void testCompositeBucketKeyWithNullFieldUsesPlaceholder() { RowType rowType = RowType.of( - new DataType[] { - DataTypes.INT().copy(true), DataTypes.STRING().copy(true) - }, + new DataType[] {DataTypes.INT().copy(true), DataTypes.STRING().copy(true)}, new String[] {f1, f2}); // f2 is null on purpose GenericRow row = GenericRow.of(42, null); @@ -406,25 +400,14 @@ void testBooleanAndIntegralTypes() { int bucketNum = 10; // BOOLEAN assertSingleFieldRoundTrip( - DataTypes.BOOLEAN(), - true, - String.valueOf(true), - "flag", - bucketNum); + DataTypes.BOOLEAN(), true, String.valueOf(true), "flag", bucketNum); assertSingleFieldRoundTrip( - DataTypes.BOOLEAN(), - false, - String.valueOf(false), - "flag", - bucketNum); + DataTypes.BOOLEAN(), false, String.valueOf(false), "flag", bucketNum); // TINYINT / SMALLINT - assertSingleFieldRoundTrip( - DataTypes.TINYINT(), (byte) 7, "7", "b", bucketNum); - assertSingleFieldRoundTrip( - DataTypes.SMALLINT(), (short) 12345, "12345", "s", bucketNum); + assertSingleFieldRoundTrip(DataTypes.TINYINT(), (byte) 7, "7", "b", bucketNum); + assertSingleFieldRoundTrip(DataTypes.SMALLINT(), (short) 12345, "12345", "s", bucketNum); // FLOAT (use a value whose Float.toString is stable across JVMs) - assertSingleFieldRoundTrip( - DataTypes.FLOAT(), 3.5f, Float.toString(3.5f), "f", bucketNum); + assertSingleFieldRoundTrip(DataTypes.FLOAT(), 3.5f, Float.toString(3.5f), "f", bucketNum); } @Test @@ -453,8 +436,7 @@ void testTimestampLtzType() throws IOException { assertThat(enc).isEqualTo(expected); int hudiBucket = - BucketIdentifier.getBucketId( - Collections.singletonList(ts.toString()), bucketNum); + BucketIdentifier.getBucketId(Collections.singletonList(ts.toString()), bucketNum); int ourBucket = new HudiBucketingFunction().bucketing(enc, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } @@ -473,21 +455,14 @@ void testBucketingNumBucketsBoundaryValues() { int expected = (hash & Integer.MAX_VALUE) % Integer.MAX_VALUE; assertThat(f.bucketing(key, Integer.MAX_VALUE)).isEqualTo(expected); // bucket id must always be in [0, numBuckets) - assertThat(f.bucketing(key, 7)) - .isGreaterThanOrEqualTo(0) - .isLessThan(7); + assertThat(f.bucketing(key, 7)).isGreaterThanOrEqualTo(0).isLessThan(7); } private void assertSingleFieldRoundTrip( - DataType dataType, - Object value, - String stringified, - String key, - int bucketNum) { + DataType dataType, Object value, String stringified, String key, int bucketNum) { RowType rowType = RowType.of(new DataType[] {dataType}, new String[] {key}); GenericRow row = GenericRow.of(value); - HudiKeyEncoder encoder = - new HudiKeyEncoder(rowType, Collections.singletonList(key)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); byte[] ourEncodedKey = encoder.encodeKey(row); byte[] expected = toBytes(new String[] {stringified}); assertThat(ourEncodedKey).isEqualTo(expected); @@ -498,9 +473,7 @@ private void assertSingleFieldRoundTrip( } private static byte[] intToBytes(int v) { - return new byte[] { - (byte) (v >>> 24), (byte) (v >>> 16), (byte) (v >>> 8), (byte) v - }; + return new byte[] {(byte) (v >>> 24), (byte) (v >>> 16), (byte) (v >>> 8), (byte) v}; } private byte[] toBytes(String[] value) { From 68e413078181d95ef37f52dc3d34e1d42b6777b8 Mon Sep 17 00:00:00 2001 From: fhan Date: Fri, 15 May 2026 12:46:21 +0800 Subject: [PATCH 6/9] [lake/hudi] fix HudiKeyEncoder's lossy issue when encoding for primary key --- .../main/java/org/apache/fluss/row/encode/KeyEncoder.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java index 1f22c3b3ce..fe52e926cc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java @@ -77,6 +77,14 @@ static KeyEncoder ofPrimaryKeyEncoder( Optional optKvFormatVersion = tableConfig.getKvFormatVersion(); DataLakeFormat dataLakeFormat = tableConfig.getDataLakeFormat().orElse(null); int kvFormatVersion = optKvFormatVersion.orElse(1); + + // Hudi's HudiKeyEncoder is lossy (4-byte hash); it must NOT be used for + // primary key encoding because different keys with the same List#hashCode + // would collide. Use CompactedKeyEncoder instead. + if (dataLakeFormat == DataLakeFormat.HUDI) { + return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields); + } + if (kvFormatVersion == 1) { return of(rowType, keyFields, dataLakeFormat); } From b909b807984382a51f0237dd49014f8765ad0d1d Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 19 May 2026 15:38:30 +0800 Subject: [PATCH 7/9] [lake/hudi] align Hudi's BucketIdentifier.getBucketId logic and add type rejection impls. --- .../fluss/row/encode/hudi/HudiKeyEncoder.java | 193 +++++++++++++++-- .../bucketing/HudiBucketingFunctionTest.java | 203 ++++++++++++++---- 2 files changed, 331 insertions(+), 65 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java index ae5ab770fe..2eeaa5a220 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java @@ -21,49 +21,150 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.encode.KeyEncoder; import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeRoot; import org.apache.fluss.types.RowType; import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; import java.util.List; +import java.util.Set; /** - * An implementation of {@link KeyEncoder} to follow Hudi's encoding strategy. + * An implementation of {@link KeyEncoder} that follows Hudi's bucket-key encoding strategy. * - *

The encoded bytes are a 4-byte big-endian representation of {@code List.hashCode()} - * over the stringified key fields, which matches the way Hudi's {@code BucketIdentifier} hashes a - * record key. Null fields are replaced by {@link #NULL_RECORDKEY_PLACEHOLDER} so that an explicit - * null and the literal string {@code "null"} no longer collide in the hash space. + *

What this encoder must reproduce

+ * + * The bucket id Hudi computes in production is: + * + *
{@code
+ * BucketIdentifier.getBucketId(HoodieKey, indexKeyFields, numBuckets)
+ *   = (KeyGenUtils.extractRecordKeysByFields(recordKey, fields).hashCode()
+ *      & Integer.MAX_VALUE) % numBuckets
+ * }
+ * + * In other words, Hudi parses a {@code "f1:v1,f2:v2,..."} record-key string back into a {@code + * List} (with {@code "__null__"} → {@code null} and {@code "__empty__"} → {@code ""}) and + * then takes {@link java.util.List#hashCode()}. + * + *

This encoder produces a list element-wise equivalent to the one Hudi reconstructs, + * and hashes it with the same {@code List#hashCode()} contract. The encoded bytes are a 4-byte + * big-endian representation of that hash, decoded symmetrically by {@code HudiBucketingFunction}. + * + *

Why a list of nullable {@code String}s (and not stringified placeholders)

+ * + * Hudi's parser turns the literal {@code "__null__"}/{@code "__empty__"} substrings back into + * {@code null}/{@code ""} before hashing. {@code List#hashCode} treats a {@code null} + * element as contributing {@code 0} and a non-null element as contributing its {@code String} + * hash. Hashing the placeholder string directly would therefore give a different bucket id than + * Hudi for any row with a null bucket-key field. + * + *

Why we reject values containing {@code ','}

+ * + * {@code ','} is Hudi's record-part separator and is not escaped in Hudi's + * record-key serialization. Any bucket-key value containing {@code ','} is therefore ambiguous on + * Hudi's parse path and would produce a different reconstructed list than the one we hash here. + * We refuse such values up front with {@link IllegalArgumentException} so callers cannot silently + * desync from Hudi's bucket layout. ({@code ':'} on the other hand is safe — Hudi's parser has a + * dedicated look-ahead loop that handles values containing {@code ':'}, e.g. timestamps like + * {@code "2023-10-25T10:01:13.182Z"}.) + * + *

Supported key field types

+ * + * See {@link #SUPPORTED_BUCKET_KEY_TYPE_ROOTS}. Composite or binary types (ARRAY/MAP/ROW/ + * BINARY/VARBINARY) would fall back to {@code Object#toString()} and produce instance-bound, + * non-reproducible bucket ids; they are rejected at construction time. */ public class HudiKeyEncoder implements KeyEncoder { /** - * Placeholder used to represent a {@code null} key field when computing the record-key hash. It - * is intentionally aligned with Hudi's {@code KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER} so that - * the resulting bucket id stays identical to what Hudi would compute on its side. + * Placeholder used by Hudi's serialization for a {@code null} bucket-key field. Kept here for + * cross-validation in tests — the encoder itself never hashes this literal because Hudi's + * parser turns it back into {@code null} before hashing. */ public static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + /** Hudi's record-part separator. Values containing it are rejected. */ + private static final char RECORD_KEY_PARTS_SEPARATOR = ','; + + /** + * The set of {@link DataTypeRoot}s allowed as Hudi bucket key fields. These are exactly the + * scalar types for which {@code toString()} (or a deterministic equivalent) round-trips + * through Hudi's record-key serialization. + */ + public static final Set SUPPORTED_BUCKET_KEY_TYPE_ROOTS = + Collections.unmodifiableSet( + EnumSet.of( + DataTypeRoot.CHAR, + DataTypeRoot.STRING, + DataTypeRoot.BOOLEAN, + DataTypeRoot.DECIMAL, + DataTypeRoot.TINYINT, + DataTypeRoot.SMALLINT, + DataTypeRoot.INTEGER, + DataTypeRoot.BIGINT, + DataTypeRoot.FLOAT, + DataTypeRoot.DOUBLE, + DataTypeRoot.DATE, + DataTypeRoot.TIME_WITHOUT_TIME_ZONE, + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)); + private final InternalRow.FieldGetter[] fieldGetters; + private final String[] keyFieldNames; public HudiKeyEncoder(RowType rowType, List keys) { // for getting key fields out of fluss internal row fieldGetters = new InternalRow.FieldGetter[keys.size()]; + keyFieldNames = new String[keys.size()]; for (int i = 0; i < keys.size(); i++) { - int keyIndex = rowType.getFieldIndex(keys.get(i)); + String keyField = keys.get(i); + int keyIndex = rowType.getFieldIndex(keyField); + if (keyIndex < 0) { + throw new IllegalArgumentException( + "Bucket key field '" + keyField + "' does not exist in row type."); + } DataType keyDataType = rowType.getTypeAt(keyIndex); + validateSupportedBucketKeyType(keyField, keyDataType); fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, keyIndex); + keyFieldNames[i] = keyField; + } + } + + private static void validateSupportedBucketKeyType(String keyField, DataType dataType) { + DataTypeRoot typeRoot = dataType.getTypeRoot(); + if (!SUPPORTED_BUCKET_KEY_TYPE_ROOTS.contains(typeRoot)) { + throw new IllegalArgumentException( + "Unsupported Hudi bucket key type for field '" + + keyField + + "': " + + dataType + + ". A bucket key field must be a scalar type with a deterministic " + + "string form; composite or binary types (ARRAY/MAP/ROW/BINARY/VARBINARY) " + + "would otherwise fall back to Object#toString() and produce " + + "non-reproducible bucket ids. Supported type roots are: " + + SUPPORTED_BUCKET_KEY_TYPE_ROOTS); } } @Override public byte[] encodeKey(InternalRow row) { - // Build the same string list that Hudi would build out of a record key, so the - // resulting List#hashCode() — and therefore the bucket id — match Hudi's own - // BucketIdentifier#getBucketId. + // Build the same List that Hudi's KeyGenUtils.extractRecordKeysByFields would + // hand to List#hashCode() in BucketIdentifier#getBucketId. + // + // Hudi has two parse paths: + // * single-field record key (no ',' AND no ':' in the serialized form): the parser + // returns the raw recordKey verbatim — placeholders are NOT round-tripped, so + // "__null__"/"__empty__" stay as literal strings. + // * composite record key ("f1:v1,f2:v2,..."): the parser splits on ':'/',' and + // additionally turns "__null__" → null and "__empty__" → "" before hashing. + // + // We mirror the same two-mode behaviour so the resulting List#hashCode matches Hudi's. + boolean composite = fieldGetters.length > 1; List values = new ArrayList<>(fieldGetters.length); - for (InternalRow.FieldGetter fieldGetter : fieldGetters) { - Object value = fieldGetter.getFieldOrNull(row); - values.add(stringifyForRecordKey(value)); + for (int i = 0; i < fieldGetters.length; i++) { + Object value = fieldGetters[i].getFieldOrNull(row); + values.add(toHudiHashElement(keyFieldNames[i], value, composite)); } int hashCode = values.hashCode(); @@ -76,13 +177,63 @@ public byte[] encodeKey(InternalRow row) { }; } - private static String stringifyForRecordKey(Object value) { + /** + * Produces the same {@code String} element Hudi's parser would have placed in the list it + * hashes. The behaviour is mode-dependent because Hudi itself has two parse modes: + * + *
    + *
  • single-field ({@code composite == false}): Hudi returns the raw recordKey + * verbatim, so {@code "__null__"} stays as the literal string. We therefore emit the + * placeholder string for a null field, and reject {@code ','}-containing values only + * for safety (a single-field recordKey containing {@code ','} would actually trip the + * composite quick-path check on Hudi's side too). + *
  • composite ({@code composite == true}): Hudi parses {@code "__null__"} → + * {@code null} and {@code "__empty__"} → {@code ""} before hashing. We emit {@code + * null}/{@code ""} directly. Values literally equal to a placeholder, or containing + * {@code ','}, are rejected because they would either collide with Hudi's reserved + * sentinels or break Hudi's reverse parsing. + *
+ */ + private static String toHudiHashElement(String fieldName, Object value, boolean composite) { if (value == null) { - return NULL_RECORDKEY_PLACEHOLDER; + // Composite mode: Hudi parses "__null__" back to null before hashing. + // Single-field mode: Hudi keeps the recordKey verbatim, so we keep the placeholder. + return composite ? null : NULL_RECORDKEY_PLACEHOLDER; + } + String stringValue = + (value instanceof BinaryString) ? value.toString() : String.valueOf(value); + if (composite && stringValue.isEmpty()) { + // Composite mode: Hudi parses "__empty__" back to "" before hashing. + return ""; + } + if (composite) { + rejectIfContainsRecordSeparator(fieldName, stringValue); + // Guard against literal placeholder collisions — if a user literally stores + // "__null__"/"__empty__" as the value, Hudi would round-trip it to null/"" and + // we'd silently disagree. + if (NULL_RECORDKEY_PLACEHOLDER.equals(stringValue) + || "__empty__".equals(stringValue)) { + throw new IllegalArgumentException( + "Bucket key field '" + + fieldName + + "' has value " + + stringValue + + " which collides with Hudi's reserved record-key placeholder; " + + "this value cannot be safely used as a composite Hudi bucket key."); + } } - if (value instanceof BinaryString) { - return value.toString(); + return stringValue; + } + + private static void rejectIfContainsRecordSeparator(String fieldName, String value) { + if (value.indexOf(RECORD_KEY_PARTS_SEPARATOR) >= 0) { + throw new IllegalArgumentException( + "Bucket key field '" + + fieldName + + "' has value containing ',', which is Hudi's record-part " + + "separator and is not escaped by Hudi's record-key serialization. " + + "Such a value would produce a bucket id divergent from Hudi's " + + "BucketIdentifier and is rejected to keep the bucket layout in sync."); } - return String.valueOf(value); } -} +} \ No newline at end of file diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java index d9a96248ca..a709b1ea2c 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java @@ -101,7 +101,9 @@ void testLongHash() { @Test void testStringHash() { - String testValue = "Hello Hudi, Fluss this side!"; + // NOTE: ',' is Hudi's record-part separator and is rejected by the encoder — use a + // delimiter-free message here. (See HudiKeyEncoder Javadoc for details.) + String testValue = "Hello Hudi - Fluss this side!"; int bucketNum = 10; String key = "name"; @@ -206,7 +208,7 @@ void testDateHash() { void testMultiKeysHashing() { int testIntValue = 42; long testLongValue = 1234567890123456789L; - String testStringValue = "Hello Hudi, Fluss this side!"; + String testStringValue = "Hello Hudi - Fluss this side!"; BigDecimal testValue = new BigDecimal("123.45"); Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2); int bucketNum = 10; @@ -261,47 +263,35 @@ void testMultiKeysHashing() { } @Test - void testNullFieldUsesPlaceholder() { + void testSingleFieldNull_keepsPlaceholderString_matchesHudi() { + // SINGLE-field mode: Hudi's KeyGenUtils.extractRecordKeysByFields takes the + // fast path (no ',' AND no ':' in the recordKey) and returns the raw string + // verbatim — i.e. the literal "__null__" — without round-tripping it back to + // a real null. The strong-aligned encoder must mirror that and hash the + // placeholder string, NOT a null element. int bucketNum = 10; String key = "name"; RowType rowType = RowType.of(new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); - // a row with an explicit null on the bucket key column GenericRow row = GenericRow.of((Object) null); HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); - // Encoded bytes must hash the placeholder, NOT the literal "null" string. byte[] ourEncodedKey = encoder.encodeKey(row); - byte[] placeholderBytes = toBytes(new String[] {NULL_RECORDKEY_PLACEHOLDER}); - byte[] javaNullLiteralBytes = toBytes(new String[] {"null"}); + byte[] placeholderStringBytes = toBytes(new String[] {NULL_RECORDKEY_PLACEHOLDER}); + byte[] literalNullStringBytes = toBytes(new String[] {"null"}); - assertThat(ourEncodedKey).isEqualTo(placeholderBytes); - assertThat(ourEncodedKey).isNotEqualTo(javaNullLiteralBytes); + assertThat(ourEncodedKey).isEqualTo(placeholderStringBytes); + assertThat(ourEncodedKey).isNotEqualTo(literalNullStringBytes); - int hudiBucket = BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); + // Cross-validate against Hudi's recordKey-string overload (production path). + int hudiBucket = + BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } - @Test - void testNullFieldDoesNotCollideWithLiteralNullString() { - // The literal string "null" must not produce the same encoded bytes as a real - // null value — a regression test for the previous String.valueOf(null) behavior. - String key = "name"; - RowType rowType = - RowType.of(new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); - - HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); - - byte[] encodedNull = encoder.encodeKey(GenericRow.of((Object) null)); - byte[] encodedLiteralNull = - encoder.encodeKey(GenericRow.of(BinaryString.fromString("null"))); - - assertThat(encodedNull).isNotEqualTo(encodedLiteralNull); - } - @Test void testBucketingRejectsInvalidBucketKey() { HudiBucketingFunction function = new HudiBucketingFunction(); @@ -340,10 +330,11 @@ void testBucketingRejectsNonPositiveNumBuckets() { } @Test - void testCompositeBucketKeyMatchesHudiFieldValueRecordKey() { - // Two-field bucket key: Hudi expects record key in "f1:v1,f2:v2" form when its - // BucketIdentifier sees a ':' in the record key, then extracts ["v1","v2"] and - // hashes that List. Fluss feeds the same ["v1","v2"] into List#hashCode(). + void testCompositeBucketKeyMatchesHudiRecordKeyOverload() { + // Strong alignment: go through Hudi's production parse path + // (BucketIdentifier#getBucketId(String recordKey, ...)) instead of the + // pre-parsed List overload, so any future divergence in parsing + // (including ':'-handling for timestamps) is caught. int bucketNum = 16; String f1 = "user_id"; String f2 = "region"; @@ -358,20 +349,19 @@ void testCompositeBucketKeyMatchesHudiFieldValueRecordKey() { HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); byte[] ourEncodedKey = encoder.encodeKey(row); - byte[] expected = toBytes(new String[] {String.valueOf(idValue), regionValue}); - assertThat(ourEncodedKey).isEqualTo(expected); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); - // Compare against Hudi's List-based overload to avoid Hudi's own - // recordKey-parsing path (which would split on ':' inside values like timestamps). + String recordKey = f1 + ":" + idValue + "," + f2 + ":" + regionValue; int hudiBucket = - BucketIdentifier.getBucketId( - Arrays.asList(String.valueOf(idValue), regionValue), bucketNum); - int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } @Test - void testCompositeBucketKeyWithNullFieldUsesPlaceholder() { + void testCompositeBucketKeyWithNullFieldMatchesHudiRecordKeyOverload() { + // Hudi serializes a null bucket-key field as "__null__" in the record key, then + // parses it back to a `null` element before hashing. The strong-aligned encoder + // must produce a bucket id identical to Hudi's recordKey-string parse path. int bucketNum = 8; String f1 = "user_id"; String f2 = "region"; @@ -385,13 +375,12 @@ void testCompositeBucketKeyWithNullFieldUsesPlaceholder() { HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); byte[] ourEncodedKey = encoder.encodeKey(row); - byte[] expected = toBytes(new String[] {"42", NULL_RECORDKEY_PLACEHOLDER}); - assertThat(ourEncodedKey).isEqualTo(expected); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + // The on-wire record key Hudi would have built for this row. + String recordKey = f1 + ":42," + f2 + ":" + NULL_RECORDKEY_PLACEHOLDER; int hudiBucket = - BucketIdentifier.getBucketId( - Arrays.asList("42", NULL_RECORDKEY_PLACEHOLDER), bucketNum); - int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } @@ -480,4 +469,130 @@ private byte[] toBytes(String[] value) { List values = Arrays.asList(value); return ByteBuffer.allocate(4).putInt(values.hashCode()).array(); } + + // ------------------------------------------------------------------ + // End-to-end tests against Hudi's recordKey-string overloads — these + // exercise the same parsing path Hudi takes in production (HoodieKey + // -> getHashKeys via ':' / ',' split) and therefore catch divergence + // that the List-based overload would silently hide. + // ------------------------------------------------------------------ + + @Test + void testCompositeBucketKeyMatchesHudiRecordKeyOverload_safeValues() { + // Values intentionally free of ':' and ',' — the encoder must agree with Hudi's + // record-key-parsing path under the documented contract. + int bucketNum = 16; + String f1 = "user_id"; + String f2 = "region"; + int idValue = 12345; + String regionValue = "cn-north"; + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING()}, + new String[] {f1, f2}); + GenericRow row = GenericRow.of(idValue, BinaryString.fromString(regionValue)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + + // Build Hudi's wire-format record key: "f1:v1,f2:v2", then go through the + // recordKey-string overload that splits on ':' / ',' before hashing. + String recordKey = f1 + ":" + idValue + "," + f2 + ":" + regionValue; + int hudiBucket = + BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testCompositeKeyWithColonInValue_matchesHudiRecordKeyOverload() { + // Hudi's KeyGenUtils.extractRecordKeysByFields has a dedicated look-ahead loop + // that correctly handles values containing ':' (e.g. timestamps like + // "2023-10-25T10:01:13.182Z"). The strong-aligned encoder must match Hudi + // bit-for-bit on this case — not merely "document the divergence". + int bucketNum = 8; + String f1 = "tenant"; + String f2 = "ts"; + String tenant = "acme"; + TimestampLtz ts = TimestampLtz.fromInstant(Instant.ofEpochMilli(1700000000000L)); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING(), DataTypes.TIMESTAMP_LTZ(6)}, + new String[] {f1, f2}); + GenericRow row = GenericRow.of(BinaryString.fromString(tenant), ts); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + + String recordKey = f1 + ":" + tenant + "," + f2 + ":" + ts.toString(); + int hudiBucket = + BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + + // Strong alignment: bucket ids MUST match. + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testRejectsValueContainingRecordSeparator() { + // ',' is Hudi's record-part separator and is NOT escaped by Hudi's record-key + // serialization. The encoder must refuse such values rather than silently + // diverge from Hudi's bucket layout. + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING(), DataTypes.STRING()}, + new String[] {"tenant", "name"}); + GenericRow row = + GenericRow.of( + BinaryString.fromString("acme"), + BinaryString.fromString("smith, john")); + HudiKeyEncoder encoder = + new HudiKeyEncoder(rowType, Arrays.asList("tenant", "name")); + + assertThatThrownBy(() -> encoder.encodeKey(row)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("'name'") + .hasMessageContaining("','"); + } + + @Test + void testRejectsLiteralPlaceholderValueInCompositeKey() { + // Composite-key mode: a user-supplied value equal to Hudi's reserved placeholder + // would be round-tripped to null by Hudi's parser, silently changing the bucket id. + // (In single-field mode the placeholder is kept verbatim by Hudi, so it's safe.) + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING(), DataTypes.STRING()}, + new String[] {"tenant", "name"}); + GenericRow row = + GenericRow.of( + BinaryString.fromString("acme"), + BinaryString.fromString(NULL_RECORDKEY_PLACEHOLDER)); + HudiKeyEncoder encoder = + new HudiKeyEncoder(rowType, Arrays.asList("tenant", "name")); + + assertThatThrownBy(() -> encoder.encodeKey(row)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("reserved record-key placeholder"); + } + + @Test + void testRejectsUnsupportedBucketKeyType() { + // BYTES / BINARY would otherwise fall through to Object#toString() and produce + // instance-bound bucket ids (e.g. "[B@1a2b3c"). The encoder must refuse them up + // front rather than silently corrupt the bucket layout. + RowType rowType = + RowType.of(new DataType[] {DataTypes.BYTES()}, new String[] {"payload"}); + + assertThatThrownBy( + () -> + new HudiKeyEncoder( + rowType, Collections.singletonList("payload"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported Hudi bucket key type") + .hasMessageContaining("payload"); + } } From 763fa9a55c8ff94c435813791e8c4987881bcdc7 Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 19 May 2026 15:51:52 +0800 Subject: [PATCH 8/9] [lake/hudi] fix checkstyle in HudiKeyEncoder --- .../apache/fluss/row/encode/hudi/HudiKeyEncoder.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java index 2eeaa5a220..9446a32deb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java @@ -35,7 +35,7 @@ * *

What this encoder must reproduce

* - * The bucket id Hudi computes in production is: + *

The bucket id Hudi computes in production is: * *

{@code
  * BucketIdentifier.getBucketId(HoodieKey, indexKeyFields, numBuckets)
@@ -43,7 +43,7 @@
  *      & Integer.MAX_VALUE) % numBuckets
  * }
* - * In other words, Hudi parses a {@code "f1:v1,f2:v2,..."} record-key string back into a {@code + *

In other words, Hudi parses a {@code "f1:v1,f2:v2,..."} record-key string back into a {@code * List} (with {@code "__null__"} → {@code null} and {@code "__empty__"} → {@code ""}) and * then takes {@link java.util.List#hashCode()}. * @@ -53,7 +53,7 @@ * *

Why a list of nullable {@code String}s (and not stringified placeholders)

* - * Hudi's parser turns the literal {@code "__null__"}/{@code "__empty__"} substrings back into + *

Hudi's parser turns the literal {@code "__null__"}/{@code "__empty__"} substrings back into * {@code null}/{@code ""} before hashing. {@code List#hashCode} treats a {@code null} * element as contributing {@code 0} and a non-null element as contributing its {@code String} * hash. Hashing the placeholder string directly would therefore give a different bucket id than @@ -61,7 +61,7 @@ * *

Why we reject values containing {@code ','}

* - * {@code ','} is Hudi's record-part separator and is not escaped in Hudi's + *

{@code ','} is Hudi's record-part separator and is not escaped in Hudi's * record-key serialization. Any bucket-key value containing {@code ','} is therefore ambiguous on * Hudi's parse path and would produce a different reconstructed list than the one we hash here. * We refuse such values up front with {@link IllegalArgumentException} so callers cannot silently @@ -71,7 +71,7 @@ * *

Supported key field types

* - * See {@link #SUPPORTED_BUCKET_KEY_TYPE_ROOTS}. Composite or binary types (ARRAY/MAP/ROW/ + *

See {@link #SUPPORTED_BUCKET_KEY_TYPE_ROOTS}. Composite or binary types (ARRAY/MAP/ROW/ * BINARY/VARBINARY) would fall back to {@code Object#toString()} and produce instance-bound, * non-reproducible bucket ids; they are rejected at construction time. */ From cb423695a4e77d8ba013b84b0678ba16610b8005 Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 19 May 2026 15:54:10 +0800 Subject: [PATCH 9/9] [lake/hudi] fix format violations --- .../fluss/row/encode/hudi/HudiKeyEncoder.java | 37 +++++++++---------- .../bucketing/HudiBucketingFunctionTest.java | 32 +++++----------- 2 files changed, 28 insertions(+), 41 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java index 9446a32deb..e0518b5487 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java @@ -55,19 +55,19 @@ * *

Hudi's parser turns the literal {@code "__null__"}/{@code "__empty__"} substrings back into * {@code null}/{@code ""} before hashing. {@code List#hashCode} treats a {@code null} - * element as contributing {@code 0} and a non-null element as contributing its {@code String} - * hash. Hashing the placeholder string directly would therefore give a different bucket id than - * Hudi for any row with a null bucket-key field. + * element as contributing {@code 0} and a non-null element as contributing its {@code String} hash. + * Hashing the placeholder string directly would therefore give a different bucket id than Hudi for + * any row with a null bucket-key field. * *

Why we reject values containing {@code ','}

* *

{@code ','} is Hudi's record-part separator and is not escaped in Hudi's * record-key serialization. Any bucket-key value containing {@code ','} is therefore ambiguous on - * Hudi's parse path and would produce a different reconstructed list than the one we hash here. - * We refuse such values up front with {@link IllegalArgumentException} so callers cannot silently + * Hudi's parse path and would produce a different reconstructed list than the one we hash here. We + * refuse such values up front with {@link IllegalArgumentException} so callers cannot silently * desync from Hudi's bucket layout. ({@code ':'} on the other hand is safe — Hudi's parser has a - * dedicated look-ahead loop that handles values containing {@code ':'}, e.g. timestamps like - * {@code "2023-10-25T10:01:13.182Z"}.) + * dedicated look-ahead loop that handles values containing {@code ':'}, e.g. timestamps like {@code + * "2023-10-25T10:01:13.182Z"}.) * *

Supported key field types

* @@ -89,8 +89,8 @@ public class HudiKeyEncoder implements KeyEncoder { /** * The set of {@link DataTypeRoot}s allowed as Hudi bucket key fields. These are exactly the - * scalar types for which {@code toString()} (or a deterministic equivalent) round-trips - * through Hudi's record-key serialization. + * scalar types for which {@code toString()} (or a deterministic equivalent) round-trips through + * Hudi's record-key serialization. */ public static final Set SUPPORTED_BUCKET_KEY_TYPE_ROOTS = Collections.unmodifiableSet( @@ -184,14 +184,14 @@ public byte[] encodeKey(InternalRow row) { *
    *
  • single-field ({@code composite == false}): Hudi returns the raw recordKey * verbatim, so {@code "__null__"} stays as the literal string. We therefore emit the - * placeholder string for a null field, and reject {@code ','}-containing values only - * for safety (a single-field recordKey containing {@code ','} would actually trip the + * placeholder string for a null field, and reject {@code ','}-containing values only for + * safety (a single-field recordKey containing {@code ','} would actually trip the * composite quick-path check on Hudi's side too). - *
  • composite ({@code composite == true}): Hudi parses {@code "__null__"} → - * {@code null} and {@code "__empty__"} → {@code ""} before hashing. We emit {@code - * null}/{@code ""} directly. Values literally equal to a placeholder, or containing - * {@code ','}, are rejected because they would either collide with Hudi's reserved - * sentinels or break Hudi's reverse parsing. + *
  • composite ({@code composite == true}): Hudi parses {@code "__null__"} → {@code + * null} and {@code "__empty__"} → {@code ""} before hashing. We emit {@code null}/{@code + * ""} directly. Values literally equal to a placeholder, or containing {@code ','}, are + * rejected because they would either collide with Hudi's reserved sentinels or break + * Hudi's reverse parsing. *
*/ private static String toHudiHashElement(String fieldName, Object value, boolean composite) { @@ -211,8 +211,7 @@ private static String toHudiHashElement(String fieldName, Object value, boolean // Guard against literal placeholder collisions — if a user literally stores // "__null__"/"__empty__" as the value, Hudi would round-trip it to null/"" and // we'd silently disagree. - if (NULL_RECORDKEY_PLACEHOLDER.equals(stringValue) - || "__empty__".equals(stringValue)) { + if (NULL_RECORDKEY_PLACEHOLDER.equals(stringValue) || "__empty__".equals(stringValue)) { throw new IllegalArgumentException( "Bucket key field '" + fieldName @@ -236,4 +235,4 @@ private static void rejectIfContainsRecordSeparator(String fieldName, String val + "BucketIdentifier and is rejected to keep the bucket layout in sync."); } } -} \ No newline at end of file +} diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java index a709b1ea2c..7038f87f65 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java @@ -286,8 +286,7 @@ void testSingleFieldNull_keepsPlaceholderString_matchesHudi() { assertThat(ourEncodedKey).isNotEqualTo(literalNullStringBytes); // Cross-validate against Hudi's recordKey-string overload (production path). - int hudiBucket = - BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); + int hudiBucket = BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } @@ -352,8 +351,7 @@ void testCompositeBucketKeyMatchesHudiRecordKeyOverload() { int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); String recordKey = f1 + ":" + idValue + "," + f2 + ":" + regionValue; - int hudiBucket = - BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + int hudiBucket = BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } @@ -379,8 +377,7 @@ void testCompositeBucketKeyWithNullFieldMatchesHudiRecordKeyOverload() { // The on-wire record key Hudi would have built for this row. String recordKey = f1 + ":42," + f2 + ":" + NULL_RECORDKEY_PLACEHOLDER; - int hudiBucket = - BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + int hudiBucket = BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } @@ -500,8 +497,7 @@ void testCompositeBucketKeyMatchesHudiRecordKeyOverload_safeValues() { // Build Hudi's wire-format record key: "f1:v1,f2:v2", then go through the // recordKey-string overload that splits on ':' / ',' before hashing. String recordKey = f1 + ":" + idValue + "," + f2 + ":" + regionValue; - int hudiBucket = - BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + int hudiBucket = BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); assertThat(ourBucket).isEqualTo(hudiBucket); } @@ -529,8 +525,7 @@ void testCompositeKeyWithColonInValue_matchesHudiRecordKeyOverload() { int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); String recordKey = f1 + ":" + tenant + "," + f2 + ":" + ts.toString(); - int hudiBucket = - BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + int hudiBucket = BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); // Strong alignment: bucket ids MUST match. assertThat(ourBucket).isEqualTo(hudiBucket); @@ -547,10 +542,8 @@ void testRejectsValueContainingRecordSeparator() { new String[] {"tenant", "name"}); GenericRow row = GenericRow.of( - BinaryString.fromString("acme"), - BinaryString.fromString("smith, john")); - HudiKeyEncoder encoder = - new HudiKeyEncoder(rowType, Arrays.asList("tenant", "name")); + BinaryString.fromString("acme"), BinaryString.fromString("smith, john")); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList("tenant", "name")); assertThatThrownBy(() -> encoder.encodeKey(row)) .isInstanceOf(IllegalArgumentException.class) @@ -571,8 +564,7 @@ void testRejectsLiteralPlaceholderValueInCompositeKey() { GenericRow.of( BinaryString.fromString("acme"), BinaryString.fromString(NULL_RECORDKEY_PLACEHOLDER)); - HudiKeyEncoder encoder = - new HudiKeyEncoder(rowType, Arrays.asList("tenant", "name")); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList("tenant", "name")); assertThatThrownBy(() -> encoder.encodeKey(row)) .isInstanceOf(IllegalArgumentException.class) @@ -584,13 +576,9 @@ void testRejectsUnsupportedBucketKeyType() { // BYTES / BINARY would otherwise fall through to Object#toString() and produce // instance-bound bucket ids (e.g. "[B@1a2b3c"). The encoder must refuse them up // front rather than silently corrupt the bucket layout. - RowType rowType = - RowType.of(new DataType[] {DataTypes.BYTES()}, new String[] {"payload"}); + RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new String[] {"payload"}); - assertThatThrownBy( - () -> - new HudiKeyEncoder( - rowType, Collections.singletonList("payload"))) + assertThatThrownBy(() -> new HudiKeyEncoder(rowType, Collections.singletonList("payload"))) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Unsupported Hudi bucket key type") .hasMessageContaining("payload");