From 3530dbd413f7bdc7f54d0b136fd8904dcc902b8d Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Thu, 14 May 2026 15:54:23 +0000 Subject: [PATCH 1/4] [FIP-37] Add bitmap infrastructure: BitmapUtils, RoaringBitmapSerializer, AbstractRbAggFunction --- fluss-flink/fluss-flink-common/pom.xml | 7 ++ .../bitmap/AbstractRbAggFunction.java | 77 ++++++++++++ .../flink/functions/bitmap/BitmapUtils.java | 70 +++++++++++ .../bitmap/RoaringBitmapSerializer.java | 113 ++++++++++++++++++ .../bitmap/RoaringBitmapTypeInfo.java | 95 +++++++++++++++ .../functions/bitmap/BitmapUtilsTest.java | 91 ++++++++++++++ 6 files changed, 453 insertions(+) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java diff --git a/fluss-flink/fluss-flink-common/pom.xml b/fluss-flink/fluss-flink-common/pom.xml index dab880b58b..7ed46425ed 100644 --- a/fluss-flink/fluss-flink-common/pom.xml +++ b/fluss-flink/fluss-flink-common/pom.xml @@ -95,6 +95,13 @@ provided + + + org.roaringbitmap + RoaringBitmap + ${roaringbitmap.version} + + org.apache.fluss diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java new file mode 100644 index 0000000000..6a390fa132 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.AggregateFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * Shared base for bitmap aggregate UDFs that use {@link RoaringBitmap} as the accumulator. + * + *

The {@code @FunctionHint} annotation with {@code accumulator = @DataTypeHint("RAW")} tells + * Flink's Table planner to skip reflection-based POJO extraction and instead use the {@link + * TypeInformation} returned by {@link #getAccumulatorType()}, which provides the custom {@link + * RoaringBitmapSerializer}. Without this annotation, Flink attempts POJO field extraction on + * RoaringBitmap and fails. + */ +@FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = RoaringBitmap.class)) +abstract class AbstractRbAggFunction extends AggregateFunction { + + @Override + public RoaringBitmap createAccumulator() { + return new RoaringBitmap(); + } + + /** Merges multiple accumulators — required for session window aggregation. */ + public void merge(RoaringBitmap acc, Iterable it) { + for (RoaringBitmap other : it) { + if (other != null) { + acc.or(other); + } + } + } + + public void resetAccumulator(RoaringBitmap acc) { + acc.clear(); + } + + @Override + @Nullable + public byte[] getValue(RoaringBitmap acc) { + if (acc == null || acc.isEmpty()) { + return null; + } + try { + return BitmapUtils.toBytes(acc); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize bitmap accumulator.", e); + } + } + + @Override + public TypeInformation getAccumulatorType() { + return RoaringBitmapTypeInfo.INSTANCE; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java new file mode 100644 index 0000000000..48c249d64e --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Utility methods for serializing and deserializing {@link RoaringBitmap}. + * + *

Uses the ByteBuffer-based serialization approach, which is the preferred method recommended by + * the RoaringBitmap library. This format is compatible with the server-side {@code + * RoaringBitmapUtils.serializeRoaringBitmap32} used by {@code FieldRoaringBitmap32Agg}. + */ +public final class BitmapUtils { + + private BitmapUtils() {} + + /** + * Serializes a {@link RoaringBitmap} to a byte array. + * + * @param bitmap the bitmap to serialize; null returns null + * @return serialized byte array, or null if input is null + */ + @Nullable + public static byte[] toBytes(@Nullable RoaringBitmap bitmap) throws IOException { + if (bitmap == null) { + return null; + } + bitmap.runOptimize(); + ByteBuffer buffer = ByteBuffer.allocate(bitmap.serializedSizeInBytes()); + bitmap.serialize(buffer); + return buffer.array(); + } + + /** + * Deserializes a {@link RoaringBitmap} from a byte array. + * + * @param bytes the serialized bitmap bytes; null returns null + * @return deserialized RoaringBitmap, or null if input is null + */ + @Nullable + public static RoaringBitmap fromBytes(@Nullable byte[] bytes) throws IOException { + if (bytes == null) { + return null; + } + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.deserialize(ByteBuffer.wrap(bytes)); + return bitmap; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java new file mode 100644 index 0000000000..e72f6ad590 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.roaringbitmap.RoaringBitmap; + +import java.io.IOException; + +/** + * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link RoaringBitmap}. + * + *

Used as the accumulator serializer for bitmap aggregate functions to ensure correct + * checkpoint/savepoint behavior. Without a custom serializer, Flink falls back to Kryo which is + * sensitive to internal class layout changes across RoaringBitmap library versions. + */ +public final class RoaringBitmapSerializer extends TypeSerializerSingleton { + + public static final RoaringBitmapSerializer INSTANCE = new RoaringBitmapSerializer(); + + private static final long serialVersionUID = 1L; + + private RoaringBitmapSerializer() {} + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public RoaringBitmap createInstance() { + return new RoaringBitmap(); + } + + @Override + public RoaringBitmap copy(RoaringBitmap from) { + return from.clone(); + } + + @Override + public RoaringBitmap copy(RoaringBitmap from, RoaringBitmap reuse) { + return from.clone(); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(RoaringBitmap record, DataOutputView target) throws IOException { + record.runOptimize(); + int size = record.serializedSizeInBytes(); + target.writeInt(size); + byte[] bytes = BitmapUtils.toBytes(record); + target.write(bytes); + } + + @Override + public RoaringBitmap deserialize(DataInputView source) throws IOException { + int size = source.readInt(); + byte[] bytes = new byte[size]; + source.readFully(bytes); + return BitmapUtils.fromBytes(bytes); + } + + @Override + public RoaringBitmap deserialize(RoaringBitmap reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int size = source.readInt(); + target.writeInt(size); + byte[] buffer = new byte[size]; + source.readFully(buffer); + target.write(buffer); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new RoaringBitmapSerializerSnapshot(); + } + + /** Snapshot for {@link RoaringBitmapSerializer}. */ + public static final class RoaringBitmapSerializerSnapshot + extends SimpleTypeSerializerSnapshot { + + public RoaringBitmapSerializerSnapshot() { + super(() -> INSTANCE); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java new file mode 100644 index 0000000000..cfb7771b6d --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.roaringbitmap.RoaringBitmap; + +import java.util.Objects; + +/** + * {@link TypeInformation} for {@link RoaringBitmap}. + * + *

Provides the custom {@link RoaringBitmapSerializer} to Flink's type system, ensuring correct + * checkpoint and savepoint behavior for bitmap aggregate function accumulators. + */ +public final class RoaringBitmapTypeInfo extends TypeInformation { + + public static final RoaringBitmapTypeInfo INSTANCE = new RoaringBitmapTypeInfo(); + + private static final long serialVersionUID = 1L; + + private RoaringBitmapTypeInfo() {} + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return RoaringBitmap.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer(ExecutionConfig config) { + return RoaringBitmapSerializer.INSTANCE; + } + + @Override + public String toString() { + return "RoaringBitmapTypeInfo"; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof RoaringBitmapTypeInfo; + } + + @Override + public int hashCode() { + return Objects.hash(getTypeClass()); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof RoaringBitmapTypeInfo; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java new file mode 100644 index 0000000000..590191c774 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.junit.jupiter.api.Test; +import org.roaringbitmap.RoaringBitmap; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link BitmapUtils}. */ +class BitmapUtilsTest { + + @Test + void testNullInputToBytes() throws IOException { + assertThat(BitmapUtils.toBytes(null)).isNull(); + } + + @Test + void testNullInputFromBytes() throws IOException { + assertThat(BitmapUtils.fromBytes(null)).isNull(); + } + + @Test + void testEmptyBitmapRoundTrip() throws IOException { + RoaringBitmap bitmap = new RoaringBitmap(); + byte[] bytes = BitmapUtils.toBytes(bitmap); + assertThat(bytes).isNotNull(); + RoaringBitmap result = BitmapUtils.fromBytes(bytes); + assertThat(result).isNotNull(); + assertThat(result.isEmpty()).isTrue(); + } + + @Test + void testKnownValuesRoundTrip() throws IOException { + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.add(1); + bitmap.add(100); + bitmap.add(1000); + bitmap.add(Integer.MAX_VALUE); + + byte[] bytes = BitmapUtils.toBytes(bitmap); + assertThat(bytes).isNotNull(); + + RoaringBitmap result = BitmapUtils.fromBytes(bytes); + assertThat(result).isNotNull(); + assertThat(result.getLongCardinality()).isEqualTo(4L); + assertThat(result.contains(1)).isTrue(); + assertThat(result.contains(100)).isTrue(); + assertThat(result.contains(1000)).isTrue(); + assertThat(result.contains(Integer.MAX_VALUE)).isTrue(); + assertThat(result.contains(2)).isFalse(); + } + + @Test + void testLargeCardinality() throws IOException { + RoaringBitmap bitmap = new RoaringBitmap(); + for (int i = 0; i < 100_000; i++) { + bitmap.add(i); + } + byte[] bytes = BitmapUtils.toBytes(bitmap); + RoaringBitmap result = BitmapUtils.fromBytes(bytes); + assertThat(result.getLongCardinality()).isEqualTo(100_000L); + } + + @Test + void testFormatCompatibleWithServerSerialization() throws IOException { + // This test verifies that our ByteBuffer-based serialization produces bytes + // that can be deserialized back correctly — same guarantee the server relies on. + RoaringBitmap original = RoaringBitmap.bitmapOf(42, 100, 200, 300); + byte[] bytes = BitmapUtils.toBytes(original); + RoaringBitmap restored = BitmapUtils.fromBytes(bytes); + assertThat(restored).isEqualTo(original); + } +} From 28d260f05e3beaa77b45e3dd6ae3884e448f0859 Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Fri, 15 May 2026 06:10:40 +0000 Subject: [PATCH 2/4] Add RoaringBitmapSerializer tests and fix jacoco coverage --- .../bitmap/RoaringBitmapSerializerTest.java | 150 ++++++++++++++++++ fluss-test-coverage/pom.xml | 3 + 2 files changed, 153 insertions(+) create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java new file mode 100644 index 0000000000..af9514d325 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; +import org.roaringbitmap.RoaringBitmap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link RoaringBitmapSerializer} and {@link RoaringBitmapTypeInfo}. */ +class RoaringBitmapSerializerTest { + + private final RoaringBitmapSerializer serializer = RoaringBitmapSerializer.INSTANCE; + + @Test + void testCreateInstance() { + RoaringBitmap instance = serializer.createInstance(); + assertThat(instance).isNotNull(); + assertThat(instance.isEmpty()).isTrue(); + } + + @Test + void testIsNotImmutable() { + assertThat(serializer.isImmutableType()).isFalse(); + } + + @Test + void testGetLengthIsMinusOne() { + assertThat(serializer.getLength()).isEqualTo(-1); + } + + @Test + void testSerializeDeserializeRoundTrip() throws Exception { + RoaringBitmap original = new RoaringBitmap(); + original.add(1); + original.add(100); + original.add(100_000); + + DataOutputSerializer out = new DataOutputSerializer(256); + serializer.serialize(original, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RoaringBitmap restored = serializer.deserialize(in); + + assertThat(restored).isEqualTo(original); + assertThat(restored.getLongCardinality()).isEqualTo(3L); + } + + @Test + void testDeserializeWithReuse() throws Exception { + RoaringBitmap original = RoaringBitmap.bitmapOf(42, 99, 1000); + + DataOutputSerializer out = new DataOutputSerializer(256); + serializer.serialize(original, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RoaringBitmap reuse = new RoaringBitmap(); + RoaringBitmap restored = serializer.deserialize(reuse, in); + + assertThat(restored).isEqualTo(original); + } + + @Test + void testCopy() { + RoaringBitmap original = RoaringBitmap.bitmapOf(1, 2, 3); + RoaringBitmap copy = serializer.copy(original); + + assertThat(copy).isEqualTo(original); + // Verify it is a deep copy + copy.add(999); + assertThat(original.contains(999)).isFalse(); + } + + @Test + void testCopyWithReuse() { + RoaringBitmap original = RoaringBitmap.bitmapOf(10, 20, 30); + RoaringBitmap reuse = new RoaringBitmap(); + RoaringBitmap copy = serializer.copy(original, reuse); + + assertThat(copy).isEqualTo(original); + } + + @Test + void testEmptyBitmapRoundTrip() throws Exception { + RoaringBitmap empty = new RoaringBitmap(); + + DataOutputSerializer out = new DataOutputSerializer(64); + serializer.serialize(empty, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RoaringBitmap restored = serializer.deserialize(in); + + assertThat(restored.isEmpty()).isTrue(); + } + + @Test + void testSnapshotConfiguration() { + assertThat(serializer.snapshotConfiguration()).isNotNull(); + } + + // RoaringBitmapTypeInfo tests + + @Test + void testTypeInfoGetTypeClass() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.getTypeClass()).isEqualTo(RoaringBitmap.class); + } + + @Test + void testTypeInfoCreateSerializer() { + TypeSerializer s = + RoaringBitmapTypeInfo.INSTANCE.createSerializer(new ExecutionConfig()); + assertThat(s).isInstanceOf(RoaringBitmapSerializer.class); + } + + @Test + void testTypeInfoEquality() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.equals(RoaringBitmapTypeInfo.INSTANCE)).isTrue(); + assertThat(RoaringBitmapTypeInfo.INSTANCE.equals("other")).isFalse(); + } + + @Test + void testTypeInfoIsNotKeyType() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.isKeyType()).isFalse(); + } + + @Test + void testTypeInfoArity() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.getArity()).isEqualTo(1); + assertThat(RoaringBitmapTypeInfo.INSTANCE.getTotalFields()).isEqualTo(1); + } +} diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 1df67094f1..9b52ffa321 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -484,6 +484,9 @@ org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint org.apache.fluss.flink.tiering.FlussLakeTiering + + org.apache.fluss.flink.functions.bitmap.AbstractRbAggFunction + org.apache.flink.table.catalog.* From b8cd74a125226a3dcda8c7c6b61846d4f0520198 Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Fri, 15 May 2026 17:01:27 +0000 Subject: [PATCH 3/4] Add missing RoaringBitmapTypeInfo coverage tests --- .../bitmap/RoaringBitmapSerializerTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java index af9514d325..68cbcafd73 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java @@ -147,4 +147,32 @@ void testTypeInfoArity() { assertThat(RoaringBitmapTypeInfo.INSTANCE.getArity()).isEqualTo(1); assertThat(RoaringBitmapTypeInfo.INSTANCE.getTotalFields()).isEqualTo(1); } + + @Test + void testTypeInfoIsNotBasicType() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.isBasicType()).isFalse(); + } + + @Test + void testTypeInfoIsNotTupleType() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.isTupleType()).isFalse(); + } + + @Test + void testTypeInfoToString() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.toString()).isEqualTo("RoaringBitmapTypeInfo"); + } + + @Test + void testTypeInfoHashCode() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.hashCode()) + .isEqualTo(RoaringBitmapTypeInfo.INSTANCE.hashCode()); + } + + @Test + void testTypeInfoCanEqual() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.canEqual(RoaringBitmapTypeInfo.INSTANCE)) + .isTrue(); + assertThat(RoaringBitmapTypeInfo.INSTANCE.canEqual("other")).isFalse(); + } } From fbe648456317ad0ef3fc377823638fb81986cb11 Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Mon, 18 May 2026 16:28:53 +0000 Subject: [PATCH 4/4] Address review comments for bitmap infrastructure --- .../bitmap/AbstractRbAggFunction.java | 6 +- .../bitmap/RoaringBitmapSerializer.java | 4 +- .../bitmap/RoaringBitmapTypeInfo.java | 5 +- .../bitmap/RoaringBitmapSerializerTest.java | 79 +++++++++++++++++-- fluss-test-coverage/pom.xml | 3 - 5 files changed, 85 insertions(+), 12 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java index 6a390fa132..1690a757f6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java @@ -17,6 +17,8 @@ package org.apache.fluss.flink.functions.bitmap; +import org.apache.fluss.exception.FlussRuntimeException; + import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; @@ -44,7 +46,7 @@ public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } - /** Merges multiple accumulators — required for session window aggregation. */ + /** Merges partial accumulators, required for two-phase aggregation in the Flink Table API. */ public void merge(RoaringBitmap acc, Iterable it) { for (RoaringBitmap other : it) { if (other != null) { @@ -66,7 +68,7 @@ public byte[] getValue(RoaringBitmap acc) { try { return BitmapUtils.toBytes(acc); } catch (IOException e) { - throw new RuntimeException("Failed to serialize bitmap accumulator.", e); + throw new FlussRuntimeException("Failed to serialize bitmap accumulator.", e); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java index e72f6ad590..b2279b9fcd 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java @@ -24,6 +24,8 @@ import org.apache.flink.core.memory.DataOutputView; import org.roaringbitmap.RoaringBitmap; +import javax.annotation.concurrent.ThreadSafe; + import java.io.IOException; /** @@ -33,6 +35,7 @@ * checkpoint/savepoint behavior. Without a custom serializer, Flink falls back to Kryo which is * sensitive to internal class layout changes across RoaringBitmap library versions. */ +@ThreadSafe public final class RoaringBitmapSerializer extends TypeSerializerSingleton { public static final RoaringBitmapSerializer INSTANCE = new RoaringBitmapSerializer(); @@ -68,7 +71,6 @@ public int getLength() { @Override public void serialize(RoaringBitmap record, DataOutputView target) throws IOException { - record.runOptimize(); int size = record.serializedSizeInBytes(); target.writeInt(size); byte[] bytes = BitmapUtils.toBytes(record); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java index cfb7771b6d..fe0905523a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.roaringbitmap.RoaringBitmap; +import javax.annotation.concurrent.ThreadSafe; + import java.util.Objects; /** @@ -30,6 +32,7 @@ *

Provides the custom {@link RoaringBitmapSerializer} to Flink's type system, ensuring correct * checkpoint and savepoint behavior for bitmap aggregate function accumulators. */ +@ThreadSafe public final class RoaringBitmapTypeInfo extends TypeInformation { public static final RoaringBitmapTypeInfo INSTANCE = new RoaringBitmapTypeInfo(); @@ -80,7 +83,7 @@ public String toString() { @Override public boolean equals(Object obj) { - return obj instanceof RoaringBitmapTypeInfo; + return obj instanceof RoaringBitmapTypeInfo && ((RoaringBitmapTypeInfo) obj).canEqual(this); } @Override diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java index 68cbcafd73..b6b6d19ca0 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.Test; import org.roaringbitmap.RoaringBitmap; +import java.util.Collections; + import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link RoaringBitmapSerializer} and {@link RoaringBitmapTypeInfo}. */ @@ -31,6 +33,18 @@ class RoaringBitmapSerializerTest { private final RoaringBitmapSerializer serializer = RoaringBitmapSerializer.INSTANCE; + /** Minimal concrete implementation used only for testing AbstractRbAggFunction. */ + private static final class TestRbAggFunction extends AbstractRbAggFunction { + + public void accumulate(RoaringBitmap acc, Integer value) { + if (value != null) { + acc.add(value); + } + } + } + + private final TestRbAggFunction aggFunction = new TestRbAggFunction(); + @Test void testCreateInstance() { RoaringBitmap instance = serializer.createInstance(); @@ -85,7 +99,6 @@ void testCopy() { RoaringBitmap copy = serializer.copy(original); assertThat(copy).isEqualTo(original); - // Verify it is a deep copy copy.add(999); assertThat(original.contains(999)).isFalse(); } @@ -117,8 +130,6 @@ void testSnapshotConfiguration() { assertThat(serializer.snapshotConfiguration()).isNotNull(); } - // RoaringBitmapTypeInfo tests - @Test void testTypeInfoGetTypeClass() { assertThat(RoaringBitmapTypeInfo.INSTANCE.getTypeClass()).isEqualTo(RoaringBitmap.class); @@ -165,8 +176,7 @@ void testTypeInfoToString() { @Test void testTypeInfoHashCode() { - assertThat(RoaringBitmapTypeInfo.INSTANCE.hashCode()) - .isEqualTo(RoaringBitmapTypeInfo.INSTANCE.hashCode()); + assertThat(RoaringBitmapTypeInfo.INSTANCE.hashCode()).isNotZero(); } @Test @@ -175,4 +185,63 @@ void testTypeInfoCanEqual() { .isTrue(); assertThat(RoaringBitmapTypeInfo.INSTANCE.canEqual("other")).isFalse(); } + + @Test + void testAggCreateAccumulator() { + RoaringBitmap acc = aggFunction.createAccumulator(); + assertThat(acc).isNotNull(); + assertThat(acc.isEmpty()).isTrue(); + } + + @Test + void testAggGetValue() throws Exception { + RoaringBitmap acc = aggFunction.createAccumulator(); + aggFunction.accumulate(acc, 1); + aggFunction.accumulate(acc, 2); + + byte[] result = aggFunction.getValue(acc); + + assertThat(result).isNotNull(); + RoaringBitmap restored = BitmapUtils.fromBytes(result); + assertThat(restored).isNotNull(); + assertThat(restored.getLongCardinality()).isEqualTo(2L); + assertThat(restored.contains(1)).isTrue(); + assertThat(restored.contains(2)).isTrue(); + } + + @Test + void testAggGetValueNullOnEmpty() { + RoaringBitmap acc = aggFunction.createAccumulator(); + assertThat(aggFunction.getValue(acc)).isNull(); + } + + @Test + void testAggMerge() { + RoaringBitmap acc1 = aggFunction.createAccumulator(); + aggFunction.accumulate(acc1, 1); + aggFunction.accumulate(acc1, 2); + + RoaringBitmap acc2 = aggFunction.createAccumulator(); + aggFunction.accumulate(acc2, 3); + + aggFunction.merge(acc1, Collections.singletonList(acc2)); + assertThat(acc1.getLongCardinality()).isEqualTo(3L); + assertThat(acc1.contains(3)).isTrue(); + } + + @Test + void testAggResetAccumulator() { + RoaringBitmap acc = aggFunction.createAccumulator(); + acc.add(1); + acc.add(2); + + aggFunction.resetAccumulator(acc); + + assertThat(acc.isEmpty()).isTrue(); + } + + @Test + void testAggGetAccumulatorType() { + assertThat(aggFunction.getAccumulatorType()).isInstanceOf(RoaringBitmapTypeInfo.class); + } } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 9b52ffa321..1df67094f1 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -484,9 +484,6 @@ org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint org.apache.fluss.flink.tiering.FlussLakeTiering - - org.apache.fluss.flink.functions.bitmap.AbstractRbAggFunction - org.apache.flink.table.catalog.*