Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ static BucketingFunction of(@Nullable DataLakeFormat lakeFormat) {
return new FlussBucketingFunction();
} else if (lakeFormat == DataLakeFormat.ICEBERG) {
return new IcebergBucketingFunction();
} else if (lakeFormat == DataLakeFormat.HUDI) {
return new HudiBucketingFunction();
} else {
throw new UnsupportedOperationException("Unsupported lake format: " + lakeFormat);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.bucketing;

/**
* An implementation of {@link BucketingFunction} to follow Hudi's bucketing strategy.
*
* <p>The bucket id is computed in the same way as Hudi's {@code
* org.apache.hudi.index.bucket.BucketIdentifier#getBucketId(String, String, int)}: take a 32-bit
* integer hash that is encoded as a fixed 4-byte big-endian array by {@code HudiKeyEncoder}, mask
* its sign bit and modulo by {@code numBuckets}.
*/
public class HudiBucketingFunction implements BucketingFunction {

/** Length of a Hudi-encoded bucket key, in bytes (a single big-endian {@code int}). */
private static final int ENCODED_KEY_LENGTH = 4;

@Override
public int bucketing(byte[] bucketKey, int numBuckets) {
if (bucketKey == null) {
throw new IllegalArgumentException("bucketKey must not be null");
}
if (bucketKey.length != ENCODED_KEY_LENGTH) {
throw new IllegalArgumentException(
"bucketKey must be exactly "
+ ENCODED_KEY_LENGTH
+ " bytes for Hudi bucketing, but got "
+ bucketKey.length
+ " bytes. The bucket key bytes are expected to be produced by HudiKeyEncoder.");
}
if (numBuckets <= 0) {
throw new IllegalArgumentException(
"numBuckets must be positive, but got " + numBuckets);
}

// Decode 4-byte big-endian int produced by HudiKeyEncoder via bit operations
// to avoid allocating a ByteBuffer instance on this hot path.
int restored =
((bucketKey[0] & 0xFF) << 24)
| ((bucketKey[1] & 0xFF) << 16)
| ((bucketKey[2] & 0xFF) << 8)
| (bucketKey[3] & 0xFF);

return (restored & Integer.MAX_VALUE) % numBuckets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.hudi.HudiKeyEncoder;
import org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder;
import org.apache.fluss.row.encode.paimon.PaimonKeyEncoder;
import org.apache.fluss.types.RowType;
Expand Down Expand Up @@ -76,6 +77,14 @@ static KeyEncoder ofPrimaryKeyEncoder(
Optional<Integer> optKvFormatVersion = tableConfig.getKvFormatVersion();
DataLakeFormat dataLakeFormat = tableConfig.getDataLakeFormat().orElse(null);
int kvFormatVersion = optKvFormatVersion.orElse(1);

// Hudi's HudiKeyEncoder is lossy (4-byte hash); it must NOT be used for
// primary key encoding because different keys with the same List#hashCode
// would collide. Use CompactedKeyEncoder instead.
if (dataLakeFormat == DataLakeFormat.HUDI) {
return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields);
}

if (kvFormatVersion == 1) {
return of(rowType, keyFields, dataLakeFormat);
}
Expand Down Expand Up @@ -129,6 +138,8 @@ static KeyEncoder of(
return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields);
} else if (lakeFormat == DataLakeFormat.ICEBERG) {
return new IcebergKeyEncoder(rowType, keyFields);
} else if (lakeFormat == DataLakeFormat.HUDI) {
return new HudiKeyEncoder(rowType, keyFields);
Comment thread
fhan688 marked this conversation as resolved.
} else {
throw new UnsupportedOperationException("Unsupported datalake format: " + lakeFormat);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.row.encode.hudi;

import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.KeyEncoder;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeRoot;
import org.apache.fluss.types.RowType;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;

/**
* An implementation of {@link KeyEncoder} that follows Hudi's bucket-key encoding strategy.
*
* <h3>What this encoder must reproduce</h3>
*
* <p>The bucket id Hudi computes in production is:
*
* <pre>{@code
* BucketIdentifier.getBucketId(HoodieKey, indexKeyFields, numBuckets)
* = (KeyGenUtils.extractRecordKeysByFields(recordKey, fields).hashCode()
* & Integer.MAX_VALUE) % numBuckets
* }</pre>
*
* <p>In other words, Hudi parses a {@code "f1:v1,f2:v2,..."} record-key string back into a {@code
* List<String>} (with {@code "__null__"} → {@code null} and {@code "__empty__"} → {@code ""}) and
* then takes {@link java.util.List#hashCode()}.
*
* <p>This encoder produces a list <em>element-wise equivalent</em> to the one Hudi reconstructs,
* and hashes it with the same {@code List#hashCode()} contract. The encoded bytes are a 4-byte
* big-endian representation of that hash, decoded symmetrically by {@code HudiBucketingFunction}.
*
* <h3>Why a list of nullable {@code String}s (and not stringified placeholders)</h3>
*
* <p>Hudi's parser turns the literal {@code "__null__"}/{@code "__empty__"} substrings back into
* {@code null}/{@code ""} <em>before</em> hashing. {@code List#hashCode} treats a {@code null}
* element as contributing {@code 0} and a non-null element as contributing its {@code String} hash.
* Hashing the placeholder string directly would therefore give a different bucket id than Hudi for
* any row with a null bucket-key field.
*
* <h3>Why we reject values containing {@code ','}</h3>
*
* <p>{@code ','} is Hudi's record-part separator and is <strong>not escaped</strong> in Hudi's
* record-key serialization. Any bucket-key value containing {@code ','} is therefore ambiguous on
* Hudi's parse path and would produce a different reconstructed list than the one we hash here. We
* refuse such values up front with {@link IllegalArgumentException} so callers cannot silently
* desync from Hudi's bucket layout. ({@code ':'} on the other hand is safe — Hudi's parser has a
* dedicated look-ahead loop that handles values containing {@code ':'}, e.g. timestamps like {@code
* "2023-10-25T10:01:13.182Z"}.)
*
* <h3>Supported key field types</h3>
*
* <p>See {@link #SUPPORTED_BUCKET_KEY_TYPE_ROOTS}. Composite or binary types (ARRAY/MAP/ROW/
* BINARY/VARBINARY) would fall back to {@code Object#toString()} and produce instance-bound,
* non-reproducible bucket ids; they are rejected at construction time.
*/
public class HudiKeyEncoder implements KeyEncoder {

/**
* Placeholder used by Hudi's serialization for a {@code null} bucket-key field. Kept here for
* cross-validation in tests — the encoder itself never hashes this literal because Hudi's
* parser turns it back into {@code null} before hashing.
*/
public static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";

/** Hudi's record-part separator. Values containing it are rejected. */
private static final char RECORD_KEY_PARTS_SEPARATOR = ',';

/**
* The set of {@link DataTypeRoot}s allowed as Hudi bucket key fields. These are exactly the
* scalar types for which {@code toString()} (or a deterministic equivalent) round-trips through
* Hudi's record-key serialization.
*/
public static final Set<DataTypeRoot> SUPPORTED_BUCKET_KEY_TYPE_ROOTS =
Collections.unmodifiableSet(
EnumSet.of(
DataTypeRoot.CHAR,
DataTypeRoot.STRING,
DataTypeRoot.BOOLEAN,
DataTypeRoot.DECIMAL,
DataTypeRoot.TINYINT,
DataTypeRoot.SMALLINT,
DataTypeRoot.INTEGER,
DataTypeRoot.BIGINT,
DataTypeRoot.FLOAT,
DataTypeRoot.DOUBLE,
DataTypeRoot.DATE,
DataTypeRoot.TIME_WITHOUT_TIME_ZONE,
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE));

private final InternalRow.FieldGetter[] fieldGetters;
private final String[] keyFieldNames;

public HudiKeyEncoder(RowType rowType, List<String> keys) {
// for getting key fields out of fluss internal row
fieldGetters = new InternalRow.FieldGetter[keys.size()];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The encoded hash here is values.hashCode() over a List<String> built directly from each key field's toString(). However, Hudi's production path goes through BucketIdentifier#getBucketId(HoodieKey, indexKeyFields, numBuckets), which parses the record-key string by splitting on : and ,. As soon as a key field's string form contains : or , (very common for TIMESTAMP_LTZ, e.g. 2023-10-25T10:01:13.182Z, or any user string with a comma), the List<String> Hudi reconstructs differs from the one we hash here, and the resulting bucket id will diverge from Hudi's.
Note that HudiBucketingFunctionTest#testTimestampLtzType only validates against the BucketIdentifier.getBucketId(List<String>, int) overload, which sidesteps this parsing step. Please add an end-to-end test that goes through the HoodieKey overload, and either escape : / , inside stringifyForRecordKey, or document the limitation explicitly in the class Javadoc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — re-read Hudi's KeyGenUtils.extractRecordKeysByFields in Hudi master/release-1.x. The parser has two modes that we now mirror in HudiKeyEncoder:

Single-part recordKey (no , and no :) → returned verbatim, placeholder NOT round-tripped.
Composite recordKey → split on :/,, with "__null__"null and "__empty__""" before List#hashCode().
Two findings while implementing the alignment:

The :-in-value concern in your comment is actually handled correctly by Hudi's look-ahead loop (the commaPosition < keyValueSep1 retry), so TIMESTAMP_LTZ values like 2023-10-25T10:01:13.182Z are fine — confirmed by a new end-to-end test that goes through BucketIdentifier.getBucketId(recordKey, "f1,f2", n).
The real divergence sources are (a) values containing , (Hudi doesn't escape) and (b) null/empty placeholder round-trip in composite mode. Both are now handled: the encoder produces List<String> elements element-wise equal to Hudi's parsed list, and rejects unrepresentable inputs (, in value, literal placeholder collision) up front.
Result: bucket id is bit-for-bit equal to Hudi's BucketIdentifier across all 21 tests, including the recordKey-string overload that exercises Hudi's production parse path.

keyFieldNames = new String[keys.size()];
for (int i = 0; i < keys.size(); i++) {
String keyField = keys.get(i);
int keyIndex = rowType.getFieldIndex(keyField);
if (keyIndex < 0) {
throw new IllegalArgumentException(
"Bucket key field '" + keyField + "' does not exist in row type.");
}
DataType keyDataType = rowType.getTypeAt(keyIndex);
validateSupportedBucketKeyType(keyField, keyDataType);
fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, keyIndex);
keyFieldNames[i] = keyField;
}
}

private static void validateSupportedBucketKeyType(String keyField, DataType dataType) {
DataTypeRoot typeRoot = dataType.getTypeRoot();
if (!SUPPORTED_BUCKET_KEY_TYPE_ROOTS.contains(typeRoot)) {
throw new IllegalArgumentException(
"Unsupported Hudi bucket key type for field '"
+ keyField
+ "': "
+ dataType
+ ". A bucket key field must be a scalar type with a deterministic "
+ "string form; composite or binary types (ARRAY/MAP/ROW/BINARY/VARBINARY) "
+ "would otherwise fall back to Object#toString() and produce "
+ "non-reproducible bucket ids. Supported type roots are: "
+ SUPPORTED_BUCKET_KEY_TYPE_ROOTS);
}
}

@Override
public byte[] encodeKey(InternalRow row) {
// Build the same List<String> that Hudi's KeyGenUtils.extractRecordKeysByFields would
// hand to List#hashCode() in BucketIdentifier#getBucketId.
//
// Hudi has two parse paths:
// * single-field record key (no ',' AND no ':' in the serialized form): the parser
// returns the raw recordKey verbatim — placeholders are NOT round-tripped, so
// "__null__"/"__empty__" stay as literal strings.
// * composite record key ("f1:v1,f2:v2,..."): the parser splits on ':'/',' and
// additionally turns "__null__" → null and "__empty__" → "" before hashing.
//
// We mirror the same two-mode behaviour so the resulting List#hashCode matches Hudi's.
boolean composite = fieldGetters.length > 1;
List<String> values = new ArrayList<>(fieldGetters.length);
for (int i = 0; i < fieldGetters.length; i++) {
Object value = fieldGetters[i].getFieldOrNull(row);
values.add(toHudiHashElement(keyFieldNames[i], value, composite));
}
int hashCode = values.hashCode();

// 4-byte big-endian, decoded symmetrically by HudiBucketingFunction.
return new byte[] {
(byte) (hashCode >>> 24),
(byte) (hashCode >>> 16),
(byte) (hashCode >>> 8),
(byte) hashCode
};
}

/**
* Produces the same {@code String} element Hudi's parser would have placed in the list it
* hashes. The behaviour is mode-dependent because Hudi itself has two parse modes:
*
* <ul>
* <li><b>single-field</b> ({@code composite == false}): Hudi returns the raw recordKey
* verbatim, so {@code "__null__"} stays as the literal string. We therefore emit the
* placeholder string for a null field, and reject {@code ','}-containing values only for
* safety (a single-field recordKey containing {@code ','} would actually trip the
* composite quick-path check on Hudi's side too).
* <li><b>composite</b> ({@code composite == true}): Hudi parses {@code "__null__"} → {@code
* null} and {@code "__empty__"} → {@code ""} before hashing. We emit {@code null}/{@code
* ""} directly. Values literally equal to a placeholder, or containing {@code ','}, are
* rejected because they would either collide with Hudi's reserved sentinels or break
* Hudi's reverse parsing.
* </ul>
*/
private static String toHudiHashElement(String fieldName, Object value, boolean composite) {
if (value == null) {
// Composite mode: Hudi parses "__null__" back to null before hashing.
// Single-field mode: Hudi keeps the recordKey verbatim, so we keep the placeholder.
return composite ? null : NULL_RECORDKEY_PLACEHOLDER;
}
String stringValue =
(value instanceof BinaryString) ? value.toString() : String.valueOf(value);
if (composite && stringValue.isEmpty()) {
// Composite mode: Hudi parses "__empty__" back to "" before hashing.
return "";
}
if (composite) {
rejectIfContainsRecordSeparator(fieldName, stringValue);
// Guard against literal placeholder collisions — if a user literally stores
// "__null__"/"__empty__" as the value, Hudi would round-trip it to null/"" and
// we'd silently disagree.
if (NULL_RECORDKEY_PLACEHOLDER.equals(stringValue) || "__empty__".equals(stringValue)) {
throw new IllegalArgumentException(
"Bucket key field '"
+ fieldName
+ "' has value "
+ stringValue
+ " which collides with Hudi's reserved record-key placeholder; "
+ "this value cannot be safely used as a composite Hudi bucket key.");
}
}
return stringValue;
}

private static void rejectIfContainsRecordSeparator(String fieldName, String value) {
if (value.indexOf(RECORD_KEY_PARTS_SEPARATOR) >= 0) {
throw new IllegalArgumentException(
"Bucket key field '"
+ fieldName
+ "' has value containing ',', which is Hudi's record-part "
+ "separator and is not escaped by Hudi's record-key serialization. "
+ "Such a value would produce a bucket id divergent from Hudi's "
+ "BucketIdentifier and is rejected to keep the bucket layout in sync.");
}
}
}
37 changes: 37 additions & 0 deletions fluss-lake/fluss-lake-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,43 @@


<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink${flink.major.version}-bundle</artifactId>
<version>${hudi.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-common</artifactId>
Expand Down
Loading
Loading