diff --git a/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java b/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java index 915fbd172f..5291ccdc39 100644 --- a/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java +++ b/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java @@ -43,6 +43,8 @@ static BucketingFunction of(@Nullable DataLakeFormat lakeFormat) { return new FlussBucketingFunction(); } else if (lakeFormat == DataLakeFormat.ICEBERG) { return new IcebergBucketingFunction(); + } else if (lakeFormat == DataLakeFormat.HUDI) { + return new HudiBucketingFunction(); } else { throw new UnsupportedOperationException("Unsupported lake format: " + lakeFormat); } diff --git a/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java new file mode 100644 index 0000000000..0799d64b5f --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.bucketing; + +/** + * An implementation of {@link BucketingFunction} to follow Hudi's bucketing strategy. + * + *

The bucket id is computed in the same way as Hudi's {@code + * org.apache.hudi.index.bucket.BucketIdentifier#getBucketId(String, String, int)}: take a 32-bit + * integer hash that is encoded as a fixed 4-byte big-endian array by {@code HudiKeyEncoder}, mask + * its sign bit and modulo by {@code numBuckets}. + */ +public class HudiBucketingFunction implements BucketingFunction { + + /** Length of a Hudi-encoded bucket key, in bytes (a single big-endian {@code int}). */ + private static final int ENCODED_KEY_LENGTH = 4; + + @Override + public int bucketing(byte[] bucketKey, int numBuckets) { + if (bucketKey == null) { + throw new IllegalArgumentException("bucketKey must not be null"); + } + if (bucketKey.length != ENCODED_KEY_LENGTH) { + throw new IllegalArgumentException( + "bucketKey must be exactly " + + ENCODED_KEY_LENGTH + + " bytes for Hudi bucketing, but got " + + bucketKey.length + + " bytes. The bucket key bytes are expected to be produced by HudiKeyEncoder."); + } + if (numBuckets <= 0) { + throw new IllegalArgumentException( + "numBuckets must be positive, but got " + numBuckets); + } + + // Decode 4-byte big-endian int produced by HudiKeyEncoder via bit operations + // to avoid allocating a ByteBuffer instance on this hot path. + int restored = + ((bucketKey[0] & 0xFF) << 24) + | ((bucketKey[1] & 0xFF) << 16) + | ((bucketKey[2] & 0xFF) << 8) + | (bucketKey[3] & 0xFF); + + return (restored & Integer.MAX_VALUE) % numBuckets; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java index 4d59be33b0..fe52e926cc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java @@ -20,6 +20,7 @@ import org.apache.fluss.config.TableConfig; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.hudi.HudiKeyEncoder; import org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder; import org.apache.fluss.row.encode.paimon.PaimonKeyEncoder; import org.apache.fluss.types.RowType; @@ -76,6 +77,14 @@ static KeyEncoder ofPrimaryKeyEncoder( Optional optKvFormatVersion = tableConfig.getKvFormatVersion(); DataLakeFormat dataLakeFormat = tableConfig.getDataLakeFormat().orElse(null); int kvFormatVersion = optKvFormatVersion.orElse(1); + + // Hudi's HudiKeyEncoder is lossy (4-byte hash); it must NOT be used for + // primary key encoding because different keys with the same List#hashCode + // would collide. Use CompactedKeyEncoder instead. + if (dataLakeFormat == DataLakeFormat.HUDI) { + return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields); + } + if (kvFormatVersion == 1) { return of(rowType, keyFields, dataLakeFormat); } @@ -129,6 +138,8 @@ static KeyEncoder of( return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields); } else if (lakeFormat == DataLakeFormat.ICEBERG) { return new IcebergKeyEncoder(rowType, keyFields); + } else if (lakeFormat == DataLakeFormat.HUDI) { + return new HudiKeyEncoder(rowType, keyFields); } else { throw new UnsupportedOperationException("Unsupported datalake format: " + lakeFormat); } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java new file mode 100644 index 0000000000..e0518b5487 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.encode.hudi; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.types.RowType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; + +/** + * An implementation of {@link KeyEncoder} that follows Hudi's bucket-key encoding strategy. + * + *

What this encoder must reproduce

+ * + *

The bucket id Hudi computes in production is: + * + *

{@code
+ * BucketIdentifier.getBucketId(HoodieKey, indexKeyFields, numBuckets)
+ *   = (KeyGenUtils.extractRecordKeysByFields(recordKey, fields).hashCode()
+ *      & Integer.MAX_VALUE) % numBuckets
+ * }
+ * + *

In other words, Hudi parses a {@code "f1:v1,f2:v2,..."} record-key string back into a {@code + * List} (with {@code "__null__"} → {@code null} and {@code "__empty__"} → {@code ""}) and + * then takes {@link java.util.List#hashCode()}. + * + *

This encoder produces a list element-wise equivalent to the one Hudi reconstructs, + * and hashes it with the same {@code List#hashCode()} contract. The encoded bytes are a 4-byte + * big-endian representation of that hash, decoded symmetrically by {@code HudiBucketingFunction}. + * + *

Why a list of nullable {@code String}s (and not stringified placeholders)

+ * + *

Hudi's parser turns the literal {@code "__null__"}/{@code "__empty__"} substrings back into + * {@code null}/{@code ""} before hashing. {@code List#hashCode} treats a {@code null} + * element as contributing {@code 0} and a non-null element as contributing its {@code String} hash. + * Hashing the placeholder string directly would therefore give a different bucket id than Hudi for + * any row with a null bucket-key field. + * + *

Why we reject values containing {@code ','}

+ * + *

{@code ','} is Hudi's record-part separator and is not escaped in Hudi's + * record-key serialization. Any bucket-key value containing {@code ','} is therefore ambiguous on + * Hudi's parse path and would produce a different reconstructed list than the one we hash here. We + * refuse such values up front with {@link IllegalArgumentException} so callers cannot silently + * desync from Hudi's bucket layout. ({@code ':'} on the other hand is safe — Hudi's parser has a + * dedicated look-ahead loop that handles values containing {@code ':'}, e.g. timestamps like {@code + * "2023-10-25T10:01:13.182Z"}.) + * + *

Supported key field types

+ * + *

See {@link #SUPPORTED_BUCKET_KEY_TYPE_ROOTS}. Composite or binary types (ARRAY/MAP/ROW/ + * BINARY/VARBINARY) would fall back to {@code Object#toString()} and produce instance-bound, + * non-reproducible bucket ids; they are rejected at construction time. + */ +public class HudiKeyEncoder implements KeyEncoder { + + /** + * Placeholder used by Hudi's serialization for a {@code null} bucket-key field. Kept here for + * cross-validation in tests — the encoder itself never hashes this literal because Hudi's + * parser turns it back into {@code null} before hashing. + */ + public static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + + /** Hudi's record-part separator. Values containing it are rejected. */ + private static final char RECORD_KEY_PARTS_SEPARATOR = ','; + + /** + * The set of {@link DataTypeRoot}s allowed as Hudi bucket key fields. These are exactly the + * scalar types for which {@code toString()} (or a deterministic equivalent) round-trips through + * Hudi's record-key serialization. + */ + public static final Set SUPPORTED_BUCKET_KEY_TYPE_ROOTS = + Collections.unmodifiableSet( + EnumSet.of( + DataTypeRoot.CHAR, + DataTypeRoot.STRING, + DataTypeRoot.BOOLEAN, + DataTypeRoot.DECIMAL, + DataTypeRoot.TINYINT, + DataTypeRoot.SMALLINT, + DataTypeRoot.INTEGER, + DataTypeRoot.BIGINT, + DataTypeRoot.FLOAT, + DataTypeRoot.DOUBLE, + DataTypeRoot.DATE, + DataTypeRoot.TIME_WITHOUT_TIME_ZONE, + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)); + + private final InternalRow.FieldGetter[] fieldGetters; + private final String[] keyFieldNames; + + public HudiKeyEncoder(RowType rowType, List keys) { + // for getting key fields out of fluss internal row + fieldGetters = new InternalRow.FieldGetter[keys.size()]; + keyFieldNames = new String[keys.size()]; + for (int i = 0; i < keys.size(); i++) { + String keyField = keys.get(i); + int keyIndex = rowType.getFieldIndex(keyField); + if (keyIndex < 0) { + throw new IllegalArgumentException( + "Bucket key field '" + keyField + "' does not exist in row type."); + } + DataType keyDataType = rowType.getTypeAt(keyIndex); + validateSupportedBucketKeyType(keyField, keyDataType); + fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, keyIndex); + keyFieldNames[i] = keyField; + } + } + + private static void validateSupportedBucketKeyType(String keyField, DataType dataType) { + DataTypeRoot typeRoot = dataType.getTypeRoot(); + if (!SUPPORTED_BUCKET_KEY_TYPE_ROOTS.contains(typeRoot)) { + throw new IllegalArgumentException( + "Unsupported Hudi bucket key type for field '" + + keyField + + "': " + + dataType + + ". A bucket key field must be a scalar type with a deterministic " + + "string form; composite or binary types (ARRAY/MAP/ROW/BINARY/VARBINARY) " + + "would otherwise fall back to Object#toString() and produce " + + "non-reproducible bucket ids. Supported type roots are: " + + SUPPORTED_BUCKET_KEY_TYPE_ROOTS); + } + } + + @Override + public byte[] encodeKey(InternalRow row) { + // Build the same List that Hudi's KeyGenUtils.extractRecordKeysByFields would + // hand to List#hashCode() in BucketIdentifier#getBucketId. + // + // Hudi has two parse paths: + // * single-field record key (no ',' AND no ':' in the serialized form): the parser + // returns the raw recordKey verbatim — placeholders are NOT round-tripped, so + // "__null__"/"__empty__" stay as literal strings. + // * composite record key ("f1:v1,f2:v2,..."): the parser splits on ':'/',' and + // additionally turns "__null__" → null and "__empty__" → "" before hashing. + // + // We mirror the same two-mode behaviour so the resulting List#hashCode matches Hudi's. + boolean composite = fieldGetters.length > 1; + List values = new ArrayList<>(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + Object value = fieldGetters[i].getFieldOrNull(row); + values.add(toHudiHashElement(keyFieldNames[i], value, composite)); + } + int hashCode = values.hashCode(); + + // 4-byte big-endian, decoded symmetrically by HudiBucketingFunction. + return new byte[] { + (byte) (hashCode >>> 24), + (byte) (hashCode >>> 16), + (byte) (hashCode >>> 8), + (byte) hashCode + }; + } + + /** + * Produces the same {@code String} element Hudi's parser would have placed in the list it + * hashes. The behaviour is mode-dependent because Hudi itself has two parse modes: + * + *

+ */ + private static String toHudiHashElement(String fieldName, Object value, boolean composite) { + if (value == null) { + // Composite mode: Hudi parses "__null__" back to null before hashing. + // Single-field mode: Hudi keeps the recordKey verbatim, so we keep the placeholder. + return composite ? null : NULL_RECORDKEY_PLACEHOLDER; + } + String stringValue = + (value instanceof BinaryString) ? value.toString() : String.valueOf(value); + if (composite && stringValue.isEmpty()) { + // Composite mode: Hudi parses "__empty__" back to "" before hashing. + return ""; + } + if (composite) { + rejectIfContainsRecordSeparator(fieldName, stringValue); + // Guard against literal placeholder collisions — if a user literally stores + // "__null__"/"__empty__" as the value, Hudi would round-trip it to null/"" and + // we'd silently disagree. + if (NULL_RECORDKEY_PLACEHOLDER.equals(stringValue) || "__empty__".equals(stringValue)) { + throw new IllegalArgumentException( + "Bucket key field '" + + fieldName + + "' has value " + + stringValue + + " which collides with Hudi's reserved record-key placeholder; " + + "this value cannot be safely used as a composite Hudi bucket key."); + } + } + return stringValue; + } + + private static void rejectIfContainsRecordSeparator(String fieldName, String value) { + if (value.indexOf(RECORD_KEY_PARTS_SEPARATOR) >= 0) { + throw new IllegalArgumentException( + "Bucket key field '" + + fieldName + + "' has value containing ',', which is Hudi's record-part " + + "separator and is not escaped by Hudi's record-key serialization. " + + "Such a value would produce a bucket id divergent from Hudi's " + + "BucketIdentifier and is rejected to keep the bucket layout in sync."); + } + } +} diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index ddf6b1329d..9fbce449f5 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -32,6 +32,43 @@ + + org.apache.hudi + hudi-flink${flink.major.version}-bundle + ${hudi.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-api + + + org.apache.commons + commons-lang3 + + + org.apache.curator + curator-client + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + org.apache.zookeeper + zookeeper + + + + org.apache.fluss fluss-common diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java new file mode 100644 index 0000000000..7038f87f65 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.bucketing; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.encode.hudi.HudiKeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.row.encode.hudi.HudiKeyEncoder.NULL_RECORDKEY_PLACEHOLDER; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit test for {@link HudiBucketingFunction}. */ +class HudiBucketingFunctionTest { + + @Test + void testIntegerHash() { + int testValue = 42; + int bucketNum = 10; + String key = "id"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + String recordKey = String.valueOf(testValue); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {recordKey}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testLongHash() { + long testValue = 1234567890123456789L; + int bucketNum = 10; + String key = "id"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + String recordKey = String.valueOf(testValue); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {recordKey}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testStringHash() { + // NOTE: ',' is Hudi's record-part separator and is rejected by the encoder — use a + // delimiter-free message here. (See HudiKeyEncoder Javadoc for details.) + String testValue = "Hello Hudi - Fluss this side!"; + int bucketNum = 10; + String key = "name"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {key}); + + GenericRow row = GenericRow.of(BinaryString.fromString(testValue)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testDecimalHash() { + BigDecimal testValue = new BigDecimal("123.45"); + Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2); + int bucketNum = 10; + String key = "amount"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new String[] {key}); + + GenericRow row = GenericRow.of(decimal); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue.toPlainString()}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue.toPlainString(), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testTimestampEncodingHash() throws IOException { + long millis = 1698235273182L; + int nanos = 123456; + long micros = millis * 1000 + (nanos / 1000); + TimestampNtz testValue = TimestampNtz.fromMillis(millis, nanos); + int bucketNum = 10; + String key = "event_time"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue.toString()}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue.toString(), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testDateHash() { + int dateValue = 19655; + int bucketNum = 10; + String key = "date"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new String[] {key}); + GenericRow row = GenericRow.of(dateValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {String.valueOf(dateValue)}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(String.valueOf(dateValue), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testMultiKeysHashing() { + int testIntValue = 42; + long testLongValue = 1234567890123456789L; + String testStringValue = "Hello Hudi - Fluss this side!"; + BigDecimal testValue = new BigDecimal("123.45"); + Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2); + int bucketNum = 10; + String key = "age,id,name,amount"; + String recordKey = + "age:" + + testIntValue + + ",id:" + + testLongValue + + ",name:" + + testStringValue + + ",amount:" + + testValue; + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.DECIMAL(10, 2) + }, + new String[] {"age", "id", "name", "amount"}); + GenericRow row = + GenericRow.of( + testIntValue, + testLongValue, + BinaryString.fromString(testStringValue), + decimal); + HudiKeyEncoder encoder = + new HudiKeyEncoder(rowType, Arrays.asList("age", "id", "name", "amount")); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = + toBytes( + new String[] { + String.valueOf(testIntValue), + String.valueOf(testLongValue), + testStringValue, + String.valueOf(decimal) + }); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testSingleFieldNull_keepsPlaceholderString_matchesHudi() { + // SINGLE-field mode: Hudi's KeyGenUtils.extractRecordKeysByFields takes the + // fast path (no ',' AND no ':' in the recordKey) and returns the raw string + // verbatim — i.e. the literal "__null__" — without round-tripping it back to + // a real null. The strong-aligned encoder must mirror that and hash the + // placeholder string, NOT a null element. + int bucketNum = 10; + String key = "name"; + + RowType rowType = + RowType.of(new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); + + GenericRow row = GenericRow.of((Object) null); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] placeholderStringBytes = toBytes(new String[] {NULL_RECORDKEY_PLACEHOLDER}); + byte[] literalNullStringBytes = toBytes(new String[] {"null"}); + + assertThat(ourEncodedKey).isEqualTo(placeholderStringBytes); + assertThat(ourEncodedKey).isNotEqualTo(literalNullStringBytes); + + // Cross-validate against Hudi's recordKey-string overload (production path). + int hudiBucket = BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testBucketingRejectsInvalidBucketKey() { + HudiBucketingFunction function = new HudiBucketingFunction(); + + assertThatThrownBy(() -> function.bucketing(null, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be null"); + + // length 0 — wrong length + assertThatThrownBy(() -> function.bucketing(new byte[0], 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + + // length < 4 — used to throw BufferUnderflowException, now must be IAE + assertThatThrownBy(() -> function.bucketing(new byte[] {1, 2, 3}, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + + // length > 4 — used to silently truncate, now must be IAE + assertThatThrownBy(() -> function.bucketing(new byte[] {1, 2, 3, 4, 5}, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + } + + @Test + void testBucketingRejectsNonPositiveNumBuckets() { + HudiBucketingFunction function = new HudiBucketingFunction(); + byte[] anyKey = new byte[] {0, 0, 0, 1}; + + assertThatThrownBy(() -> function.bucketing(anyKey, 0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be positive"); + assertThatThrownBy(() -> function.bucketing(anyKey, -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be positive"); + } + + @Test + void testCompositeBucketKeyMatchesHudiRecordKeyOverload() { + // Strong alignment: go through Hudi's production parse path + // (BucketIdentifier#getBucketId(String recordKey, ...)) instead of the + // pre-parsed List overload, so any future divergence in parsing + // (including ':'-handling for timestamps) is caught. + int bucketNum = 16; + String f1 = "user_id"; + String f2 = "region"; + int idValue = 12345; + String regionValue = "cn-north"; + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING()}, + new String[] {f1, f2}); + GenericRow row = GenericRow.of(idValue, BinaryString.fromString(regionValue)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + + String recordKey = f1 + ":" + idValue + "," + f2 + ":" + regionValue; + int hudiBucket = BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testCompositeBucketKeyWithNullFieldMatchesHudiRecordKeyOverload() { + // Hudi serializes a null bucket-key field as "__null__" in the record key, then + // parses it back to a `null` element before hashing. The strong-aligned encoder + // must produce a bucket id identical to Hudi's recordKey-string parse path. + int bucketNum = 8; + String f1 = "user_id"; + String f2 = "region"; + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().copy(true), DataTypes.STRING().copy(true)}, + new String[] {f1, f2}); + // f2 is null on purpose + GenericRow row = GenericRow.of(42, null); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + + // The on-wire record key Hudi would have built for this row. + String recordKey = f1 + ":42," + f2 + ":" + NULL_RECORDKEY_PLACEHOLDER; + int hudiBucket = BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testBooleanAndIntegralTypes() { + int bucketNum = 10; + // BOOLEAN + assertSingleFieldRoundTrip( + DataTypes.BOOLEAN(), true, String.valueOf(true), "flag", bucketNum); + assertSingleFieldRoundTrip( + DataTypes.BOOLEAN(), false, String.valueOf(false), "flag", bucketNum); + // TINYINT / SMALLINT + assertSingleFieldRoundTrip(DataTypes.TINYINT(), (byte) 7, "7", "b", bucketNum); + assertSingleFieldRoundTrip(DataTypes.SMALLINT(), (short) 12345, "12345", "s", bucketNum); + // FLOAT (use a value whose Float.toString is stable across JVMs) + assertSingleFieldRoundTrip(DataTypes.FLOAT(), 3.5f, Float.toString(3.5f), "f", bucketNum); + } + + @Test + void testDateAndTimeTypes() { + int bucketNum = 10; + // DATE is stored as days-since-epoch int internally; record key is its String value. + int dateDays = 19852; + assertSingleFieldRoundTrip( + DataTypes.DATE(), dateDays, String.valueOf(dateDays), "d", bucketNum); + // TIME is stored as ms-of-day int. + int timeMillis = 12345678; + assertSingleFieldRoundTrip( + DataTypes.TIME(), timeMillis, String.valueOf(timeMillis), "t", bucketNum); + } + + @Test + void testTimestampLtzType() throws IOException { + int bucketNum = 10; + TimestampLtz ts = TimestampLtz.fromInstant(Instant.ofEpochMilli(1700000000000L)); + RowType rowType = + RowType.of(new DataType[] {DataTypes.TIMESTAMP_LTZ(6)}, new String[] {"ts"}); + GenericRow row = GenericRow.of(ts); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList("ts")); + byte[] enc = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {ts.toString()}); + assertThat(enc).isEqualTo(expected); + + int hudiBucket = + BucketIdentifier.getBucketId(Collections.singletonList(ts.toString()), bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(enc, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testBucketingNumBucketsBoundaryValues() { + HudiBucketingFunction f = new HudiBucketingFunction(); + // numBuckets == 1 => bucket id always 0 + for (int sample : new int[] {0, 1, -1, Integer.MAX_VALUE, Integer.MIN_VALUE}) { + byte[] key = intToBytes(sample); + assertThat(f.bucketing(key, 1)).isEqualTo(0); + } + // numBuckets == Integer.MAX_VALUE: bucket id == hash & MAX_VALUE + int hash = Integer.MIN_VALUE; // stress sign-bit handling + byte[] key = intToBytes(hash); + int expected = (hash & Integer.MAX_VALUE) % Integer.MAX_VALUE; + assertThat(f.bucketing(key, Integer.MAX_VALUE)).isEqualTo(expected); + // bucket id must always be in [0, numBuckets) + assertThat(f.bucketing(key, 7)).isGreaterThanOrEqualTo(0).isLessThan(7); + } + + private void assertSingleFieldRoundTrip( + DataType dataType, Object value, String stringified, String key, int bucketNum) { + RowType rowType = RowType.of(new DataType[] {dataType}, new String[] {key}); + GenericRow row = GenericRow.of(value); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {stringified}); + assertThat(ourEncodedKey).isEqualTo(expected); + + int hudiBucket = BucketIdentifier.getBucketId(stringified, key, bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + private static byte[] intToBytes(int v) { + return new byte[] {(byte) (v >>> 24), (byte) (v >>> 16), (byte) (v >>> 8), (byte) v}; + } + + private byte[] toBytes(String[] value) { + List values = Arrays.asList(value); + return ByteBuffer.allocate(4).putInt(values.hashCode()).array(); + } + + // ------------------------------------------------------------------ + // End-to-end tests against Hudi's recordKey-string overloads — these + // exercise the same parsing path Hudi takes in production (HoodieKey + // -> getHashKeys via ':' / ',' split) and therefore catch divergence + // that the List-based overload would silently hide. + // ------------------------------------------------------------------ + + @Test + void testCompositeBucketKeyMatchesHudiRecordKeyOverload_safeValues() { + // Values intentionally free of ':' and ',' — the encoder must agree with Hudi's + // record-key-parsing path under the documented contract. + int bucketNum = 16; + String f1 = "user_id"; + String f2 = "region"; + int idValue = 12345; + String regionValue = "cn-north"; + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING()}, + new String[] {f1, f2}); + GenericRow row = GenericRow.of(idValue, BinaryString.fromString(regionValue)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + + // Build Hudi's wire-format record key: "f1:v1,f2:v2", then go through the + // recordKey-string overload that splits on ':' / ',' before hashing. + String recordKey = f1 + ":" + idValue + "," + f2 + ":" + regionValue; + int hudiBucket = BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testCompositeKeyWithColonInValue_matchesHudiRecordKeyOverload() { + // Hudi's KeyGenUtils.extractRecordKeysByFields has a dedicated look-ahead loop + // that correctly handles values containing ':' (e.g. timestamps like + // "2023-10-25T10:01:13.182Z"). The strong-aligned encoder must match Hudi + // bit-for-bit on this case — not merely "document the divergence". + int bucketNum = 8; + String f1 = "tenant"; + String f2 = "ts"; + String tenant = "acme"; + TimestampLtz ts = TimestampLtz.fromInstant(Instant.ofEpochMilli(1700000000000L)); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING(), DataTypes.TIMESTAMP_LTZ(6)}, + new String[] {f1, f2}); + GenericRow row = GenericRow.of(BinaryString.fromString(tenant), ts); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + + String recordKey = f1 + ":" + tenant + "," + f2 + ":" + ts.toString(); + int hudiBucket = BucketIdentifier.getBucketId(recordKey, f1 + "," + f2, bucketNum); + + // Strong alignment: bucket ids MUST match. + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testRejectsValueContainingRecordSeparator() { + // ',' is Hudi's record-part separator and is NOT escaped by Hudi's record-key + // serialization. The encoder must refuse such values rather than silently + // diverge from Hudi's bucket layout. + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING(), DataTypes.STRING()}, + new String[] {"tenant", "name"}); + GenericRow row = + GenericRow.of( + BinaryString.fromString("acme"), BinaryString.fromString("smith, john")); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList("tenant", "name")); + + assertThatThrownBy(() -> encoder.encodeKey(row)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("'name'") + .hasMessageContaining("','"); + } + + @Test + void testRejectsLiteralPlaceholderValueInCompositeKey() { + // Composite-key mode: a user-supplied value equal to Hudi's reserved placeholder + // would be round-tripped to null by Hudi's parser, silently changing the bucket id. + // (In single-field mode the placeholder is kept verbatim by Hudi, so it's safe.) + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING(), DataTypes.STRING()}, + new String[] {"tenant", "name"}); + GenericRow row = + GenericRow.of( + BinaryString.fromString("acme"), + BinaryString.fromString(NULL_RECORDKEY_PLACEHOLDER)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList("tenant", "name")); + + assertThatThrownBy(() -> encoder.encodeKey(row)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("reserved record-key placeholder"); + } + + @Test + void testRejectsUnsupportedBucketKeyType() { + // BYTES / BINARY would otherwise fall through to Object#toString() and produce + // instance-bound bucket ids (e.g. "[B@1a2b3c"). The encoder must refuse them up + // front rather than silently corrupt the bucket layout. + RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new String[] {"payload"}); + + assertThatThrownBy(() -> new HudiKeyEncoder(rowType, Collections.singletonList("payload"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported Hudi bucket key type") + .hasMessageContaining("payload"); + } +}