From a8b702d954bab64c7dba37fe5bbb11448a65fb62 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Thu, 21 May 2026 18:06:24 +0800 Subject: [PATCH 1/3] add bloom filter folding to automatically size SBBF filters --- cpp/src/parquet/bloom_filter.cc | 75 ++++++++++++++++++- cpp/src/parquet/bloom_filter.h | 10 ++- .../bloom_filter_reader_writer_test.cc | 49 +++++++++++- cpp/src/parquet/bloom_filter_writer.cc | 21 ++++-- cpp/src/parquet/properties.h | 29 +++++-- cpp/src/parquet/properties_test.cc | 61 +++++++++++++++ 6 files changed, 226 insertions(+), 19 deletions(-) diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc index 577d26fe0078..48ab07604c78 100644 --- a/cpp/src/parquet/bloom_filter.cc +++ b/cpp/src/parquet/bloom_filter.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include #include #include @@ -345,9 +347,75 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const { PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_)); } +void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) { + const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp); + if (num_folds > 0) { + Fold(num_folds); + } +} + +uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const { + const uint32_t num_blocks = NumBlocks(); + if (num_blocks < 2) { + return 0; + } + DCHECK_EQ(num_blocks & (num_blocks - 1), 0); + + uint64_t total_set_bits = 0; + const auto* bitset32 = reinterpret_cast(data_->data()); + const uint32_t num_words = num_bytes_ / static_cast(sizeof(uint32_t)); + for (uint32_t i = 0; i < num_words; ++i) { + total_set_bits += static_cast(std::popcount(bitset32[i])); + } + + const double avg_fill = + static_cast(total_set_bits) / (static_cast(num_blocks) * 256.0); + const auto max_folds = static_cast(std::countr_zero(num_blocks)); + + if (avg_fill == 0.0) { + return max_folds; + } + + uint32_t num_folds = 0; + double one_minus_fk = 1.0 - avg_fill; + for (uint32_t i = 0; i < max_folds; ++i) { + one_minus_fk *= one_minus_fk; + const double fk = 1.0 - one_minus_fk; + const double estimated_fpp = std::pow(fk, kBitsSetPerBlock); + if (estimated_fpp > target_fpp) { + break; + } + ++num_folds; + } + return num_folds; +} + +void BlockSplitBloomFilter::Fold(uint32_t num_folds) { + DCHECK_GT(num_folds, 0); + + const uint32_t num_blocks = NumBlocks(); + const uint32_t group_size = UINT32_C(1) << num_folds; + DCHECK_LE(group_size, num_blocks); + + const uint32_t new_num_blocks = num_blocks / group_size; + auto* bitset32 = reinterpret_cast(data_->mutable_data()); + + for (uint32_t dst_block = 0; dst_block < new_num_blocks; ++dst_block) { + const uint32_t src_block = dst_block * group_size; + for (int word = 0; word < kBitsSetPerBlock; ++word) { + uint32_t merged = bitset32[src_block * kBitsSetPerBlock + word]; + for (uint32_t fold_block = 1; fold_block < group_size; ++fold_block) { + merged |= bitset32[(src_block + fold_block) * kBitsSetPerBlock + word]; + } + bitset32[dst_block * kBitsSetPerBlock + word] = merged; + } + } + + num_bytes_ = new_num_blocks * kBytesPerFilterBlock; +} + bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { - const uint32_t bucket_index = - static_cast(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32); + const uint32_t bucket_index = static_cast(((hash >> 32) * NumBlocks()) >> 32); const uint32_t key = static_cast(hash); const uint32_t* bitset32 = reinterpret_cast(data_->data()); @@ -363,8 +431,7 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { } void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) { - const uint32_t bucket_index = - static_cast(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32); + const uint32_t bucket_index = static_cast(((hash >> 32) * NumBlocks()) >> 32); const uint32_t key = static_cast(hash); uint32_t* bitset32 = reinterpret_cast(data_->mutable_data()); diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index cabcd5b4a5d7..4511ff7eea4a 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -230,7 +230,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// @param fpp The false positive probability. /// @return it always return a value between kMinimumBloomFilterBytes and /// kMaximumBloomFilterBytes, and the return value is always a power of 2 - static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) { + static uint32_t OptimalNumOfBytes(uint64_t ndv, double fpp) { uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp); ARROW_DCHECK(::arrow::bit_util::IsMultipleOf8(optimal_num_of_bits)); return optimal_num_of_bits >> 3; @@ -243,7 +243,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// @param fpp The false positive probability. /// @return it always return a value between kMinimumBloomFilterBytes * 8 and /// kMaximumBloomFilterBytes * 8, and the return value is always a power of 16 - static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) { + static uint32_t OptimalNumOfBits(uint64_t ndv, double fpp) { ARROW_DCHECK(fpp > 0.0 && fpp < 1.0); const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8)); uint32_t num_bits; @@ -276,6 +276,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { bool FindHash(uint64_t hash) const override; void InsertHash(uint64_t hash) override; void InsertHashes(const uint64_t* hashes, int num_values) override; + /// Fold the bloom filter down to the smallest size that still meets the target FPP + /// (False Positive Percentage). + void FoldToTargetFpp(double target_fpp); void WriteTo(ArrowOutputStream* sink) const override; uint32_t GetBitsetSize() const override { return num_bytes_; } @@ -350,6 +353,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { private: inline void InsertHashImpl(uint64_t hash); + uint32_t NumBlocks() const { return num_bytes_ / kBytesPerFilterBlock; } + uint32_t NumFoldsForTargetFpp(double target_fpp) const; + void Fold(uint32_t num_folds); // Bytes in a tiny Bloom filter block. static constexpr int kBytesPerFilterBlock = 32; diff --git a/cpp/src/parquet/bloom_filter_reader_writer_test.cc b/cpp/src/parquet/bloom_filter_reader_writer_test.cc index d646a2b8fc1b..0f17fd5dacea 100644 --- a/cpp/src/parquet/bloom_filter_reader_writer_test.cc +++ b/cpp/src/parquet/bloom_filter_reader_writer_test.cc @@ -93,7 +93,7 @@ TEST(BloomFilterBuilder, BasicRoundTrip) { BloomFilterOptions bloom_filter_options{100, 0.05}; const auto bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes( - bloom_filter_options.ndv, bloom_filter_options.fpp); + bloom_filter_options.ndv.value(), bloom_filter_options.fpp); WriterProperties::Builder properties_builder; properties_builder.enable_bloom_filter("c1", bloom_filter_options); auto writer_properties = properties_builder.build(); @@ -150,6 +150,53 @@ TEST(BloomFilterBuilder, BasicRoundTrip) { } } +TEST(BloomFilterBuilder, FoldsOverestimatedNdvBeforeWriting) { + SchemaDescriptor schema; + schema::NodePtr root = + schema::GroupNode::Make("schema", Repetition::REPEATED, {schema::ByteArray("c1")}); + schema.Init(root); + + BloomFilterOptions bloom_filter_options{.ndv = 1'000'000, .fpp = 0.05}; + const auto initial_bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes( + bloom_filter_options.ndv.value(), bloom_filter_options.fpp); + WriterProperties::Builder properties_builder; + properties_builder.enable_bloom_filter("c1", bloom_filter_options); + auto writer_properties = properties_builder.build(); + auto bloom_filter_builder = BloomFilterBuilder::Make(&schema, writer_properties.get()); + + bloom_filter_builder->AppendRowGroup(); + auto bloom_filter = bloom_filter_builder->CreateBloomFilter(/*column_ordinal=*/0); + ASSERT_NE(bloom_filter, nullptr); + ASSERT_EQ(initial_bitset_size, bloom_filter->GetBitsetSize()); + + std::vector hashes; + hashes.reserve(1000); + for (int32_t i = 0; i < 1000; ++i) { + const auto hash = bloom_filter->Hash(i); + hashes.push_back(hash); + bloom_filter->InsertHash(hash); + } + + auto sink = CreateOutputStream(); + auto locations = bloom_filter_builder->WriteTo(sink.get()); + ASSERT_EQ(locations.size(), 1); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + const auto& location = locations.front().second; + ReaderProperties reader_properties; + ::arrow::io::BufferReader reader( + ::arrow::SliceBuffer(buffer, location.offset, location.length)); + auto folded_filter = + parquet::BlockSplitBloomFilter::Deserialize(reader_properties, &reader); + + EXPECT_LT(folded_filter.GetBitsetSize(), initial_bitset_size); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(1000, bloom_filter_options.fpp), + folded_filter.GetBitsetSize()); + for (uint64_t hash : hashes) { + EXPECT_TRUE(folded_filter.FindHash(hash)); + } +} + TEST(BloomFilterBuilder, InvalidOperations) { SchemaDescriptor schema; schema::NodePtr root = schema::GroupNode::Make( diff --git a/cpp/src/parquet/bloom_filter_writer.cc b/cpp/src/parquet/bloom_filter_writer.cc index f06b866c30e2..b4a9e3a7d8c5 100644 --- a/cpp/src/parquet/bloom_filter_writer.cc +++ b/cpp/src/parquet/bloom_filter_writer.cc @@ -185,8 +185,12 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder { const WriterProperties* properties_; bool finished_ = false; - using RowGroupBloomFilters = - std::map>; + struct BloomFilterEntry { + std::unique_ptr filter; + double target_fpp; + }; + + using RowGroupBloomFilters = std::map; std::vector bloom_filters_; // indexed by row group ordinal }; @@ -214,9 +218,13 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) { throw ParquetException(ss.str()); } + ARROW_DCHECK(opts->ndv.has_value()); auto bf = std::make_unique(properties_->memory_pool()); - bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv, opts->fpp)); - return curr_rg_bfs.emplace(column_ordinal, std::move(bf)).first->second.get(); + bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv.value(), opts->fpp)); + return curr_rg_bfs + .emplace(column_ordinal, + BloomFilterEntry{.filter = std::move(bf), .target_fpp = opts->fpp}) + .first->second.filter.get(); } IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) { @@ -229,10 +237,11 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) for (size_t i = 0; i != bloom_filters_.size(); ++i) { auto& row_group_bloom_filters = bloom_filters_[i]; - for (const auto& [column_id, filter] : row_group_bloom_filters) { + for (auto& [column_id, entry] : row_group_bloom_filters) { // TODO(GH-43138): Determine the quality of bloom filter before writing it. PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell()); - filter->WriteTo(sink); + entry.filter->FoldToTargetFpp(entry.target_fpp); + entry.filter->WriteTo(sink); PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell()); if (pos - offset > std::numeric_limits::max()) { diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 6634bac4f684..76d625adf51f 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include #include @@ -174,11 +175,14 @@ struct PARQUET_EXPORT BloomFilterOptions { /// Expected number of distinct values (NDV) in the bloom filter. /// /// Bloom filters are most effective for high-cardinality columns. A good default - /// is to set ndv equal to the number of rows. Lower values reduce disk usage but - /// may not be worthwhile for very small NDVs. + /// is to set ndv equal to the number of rows. If unset, the writer resolves ndv + /// to the max row group row count. Lower values reduce disk usage but may not + /// be worthwhile for very small NDVs. /// - /// Increasing ndv (without increasing fpp) increases disk and memory usage. - int32_t ndv = 1 << 20; + /// Increasing ndv (without increasing fpp) increases memory usage. The writer + /// may fold the filter before serialization, but will not grow an undersized + /// filter. + std::optional ndv = std::nullopt; /// False-positive probability (FPP) of the bloom filter. /// @@ -256,6 +260,11 @@ class PARQUET_EXPORT ColumnProperties { "Bloom filter false positive probability must be in (0.0, 1.0), got " + std::to_string(bloom_filter_options.fpp)); } + if (bloom_filter_options.ndv.has_value() && bloom_filter_options.ndv.value() < 0) { + throw ParquetException( + "Bloom filter number of distinct values must be >= 0, got " + + std::to_string(bloom_filter_options.ndv.value())); + } bloom_filter_options_ = bloom_filter_options; } @@ -863,8 +872,16 @@ class PARQUET_EXPORT WriterProperties { get(item.first).set_statistics_enabled(item.second); for (const auto& item : page_index_enabled_) get(item.first).set_page_index_enabled(item.second); - for (const auto& item : bloom_filter_options_) - get(item.first).set_bloom_filter_options(item.second); + for (const auto& item : bloom_filter_options_) { + const auto& bloom_filter_options = item.second; + if (bloom_filter_options.ndv.has_value()) { + get(item.first).set_bloom_filter_options(bloom_filter_options); + } else { + auto resolved_options = bloom_filter_options; + resolved_options.ndv = max_row_group_length_; + get(item.first).set_bloom_filter_options(resolved_options); + } + } return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, diff --git a/cpp/src/parquet/properties_test.cc b/cpp/src/parquet/properties_test.cc index 0743b7ad4de9..8278b734a130 100644 --- a/cpp/src/parquet/properties_test.cc +++ b/cpp/src/parquet/properties_test.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include #include +#include #include #include "arrow/buffer.h" @@ -24,6 +26,7 @@ #include "parquet/file_reader.h" #include "parquet/properties.h" +#include "parquet/test_util.h" namespace parquet { @@ -115,6 +118,48 @@ TEST(TestWriterProperties, SetCodecOptions) { ->window_bits); } +TEST(TestWriterProperties, BloomFilterNdvDefaults) { + BloomFilterOptions options; + ASSERT_FALSE(options.ndv.has_value()); + options.fpp = 0.05; + + auto props = WriterProperties::Builder() + .max_row_group_length(12345) + ->enable_bloom_filter("a", options) + ->build(); + + const auto resolved = props->bloom_filter_options(ColumnPath::FromDotString("a")); + ASSERT_TRUE(resolved.has_value()); + ASSERT_TRUE(resolved->ndv.has_value()); + ASSERT_EQ(12345, resolved->ndv.value()); + ASSERT_EQ(options.fpp, resolved->fpp); +} + +TEST(TestWriterProperties, BloomFilterExplicitNdv) { + BloomFilterOptions options{.ndv = 777, .fpp = 0.05}; + + auto props = WriterProperties::Builder() + .max_row_group_length(12345) + ->enable_bloom_filter("a", options) + ->build(); + + const auto resolved = props->bloom_filter_options(ColumnPath::FromDotString("a")); + ASSERT_TRUE(resolved.has_value()); + ASSERT_TRUE(resolved->ndv.has_value()); + ASSERT_EQ(777, resolved->ndv.value()); +} + +TEST(TestWriterProperties, BloomFilterRejectsNegativeNdv) { + BloomFilterOptions options{.ndv = -1, .fpp = 0.05}; + + EXPECT_THROW_THAT( + [&]() { WriterProperties::Builder().enable_bloom_filter("a", options)->build(); }, + ParquetException, + ::testing::Property( + &ParquetException::what, + ::testing::HasSubstr("Bloom filter number of distinct values must be >= 0"))); +} + TEST(TestWriterProperties, ContentDefinedChunkingSettings) { WriterProperties::Builder builder; std::shared_ptr props = builder.build(); @@ -218,6 +263,15 @@ TEST_P(WriterPropertiesTest, RoundTripThroughBuilder) { column_properties.page_index_enabled()); ASSERT_EQ(round_tripped_col.statistics_enabled(), column_properties.statistics_enabled()); + const auto round_tripped_bloom_filter_options = + round_tripped_col.bloom_filter_options(); + const auto bloom_filter_options = column_properties.bloom_filter_options(); + ASSERT_EQ(round_tripped_bloom_filter_options.has_value(), + bloom_filter_options.has_value()); + if (bloom_filter_options.has_value()) { + ASSERT_EQ(round_tripped_bloom_filter_options->ndv, bloom_filter_options->ndv); + ASSERT_EQ(round_tripped_bloom_filter_options->fpp, bloom_filter_options->fpp); + } } } @@ -285,6 +339,13 @@ std::vector writer_properties_test_cases() { builder.enable_write_page_index(column_a); test_cases.emplace_back(builder.build(), "page_index_column_override"); } + { + WriterProperties::Builder builder; + builder.max_row_group_length(12345); + builder.enable_bloom_filter(column_a, + BloomFilterOptions{.ndv = std::nullopt, .fpp = 0.05}); + test_cases.emplace_back(builder.build(), "bloom_filter_column_override"); + } return test_cases; } From c7167bf958c2981238fd705917a9f747548e0ffe Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Thu, 21 May 2026 19:38:27 +0800 Subject: [PATCH 2/3] fix ci --- cpp/src/parquet/bloom_filter_writer.cc | 2 +- cpp/src/parquet/properties.h | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/bloom_filter_writer.cc b/cpp/src/parquet/bloom_filter_writer.cc index b4a9e3a7d8c5..dd0704af961e 100644 --- a/cpp/src/parquet/bloom_filter_writer.cc +++ b/cpp/src/parquet/bloom_filter_writer.cc @@ -210,7 +210,7 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) { CheckState(column_ordinal); - auto& curr_rg_bfs = *bloom_filters_.rbegin(); + auto& curr_rg_bfs = bloom_filters_.back(); if (curr_rg_bfs.find(column_ordinal) != curr_rg_bfs.cend()) { std::stringstream ss; ss << "Bloom filter already exists for column: " << column_ordinal diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 76d625adf51f..d5fa7ab12f40 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -261,9 +261,8 @@ class PARQUET_EXPORT ColumnProperties { std::to_string(bloom_filter_options.fpp)); } if (bloom_filter_options.ndv.has_value() && bloom_filter_options.ndv.value() < 0) { - throw ParquetException( - "Bloom filter number of distinct values must be >= 0, got " + - std::to_string(bloom_filter_options.ndv.value())); + throw ParquetException("Bloom filter number of distinct values must be >= 0, got " + + std::to_string(bloom_filter_options.ndv.value())); } bloom_filter_options_ = bloom_filter_options; } From 4495c539e67d8a8e469711acc255d5756a1c7856 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Thu, 21 May 2026 20:32:47 +0800 Subject: [PATCH 3/3] fix windows ci --- cpp/src/parquet/bloom_filter_writer.cc | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/bloom_filter_writer.cc b/cpp/src/parquet/bloom_filter_writer.cc index dd0704af961e..3230626ef456 100644 --- a/cpp/src/parquet/bloom_filter_writer.cc +++ b/cpp/src/parquet/bloom_filter_writer.cc @@ -185,12 +185,21 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder { const WriterProperties* properties_; bool finished_ = false; - struct BloomFilterEntry { - std::unique_ptr filter; - double target_fpp; + struct RowGroupBloomFilters { + RowGroupBloomFilters() = default; + RowGroupBloomFilters(RowGroupBloomFilters&&) noexcept = default; + RowGroupBloomFilters& operator=(RowGroupBloomFilters&&) noexcept = default; + RowGroupBloomFilters(const RowGroupBloomFilters&) = delete; + RowGroupBloomFilters& operator=(const RowGroupBloomFilters&) = delete; + + struct BloomFilterEntry { + std::unique_ptr filter; + double target_fpp; + }; + + std::map entries; }; - using RowGroupBloomFilters = std::map; std::vector bloom_filters_; // indexed by row group ordinal }; @@ -210,7 +219,7 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) { CheckState(column_ordinal); - auto& curr_rg_bfs = bloom_filters_.back(); + auto& curr_rg_bfs = bloom_filters_.back().entries; if (curr_rg_bfs.find(column_ordinal) != curr_rg_bfs.cend()) { std::stringstream ss; ss << "Bloom filter already exists for column: " << column_ordinal @@ -223,7 +232,8 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) { bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv.value(), opts->fpp)); return curr_rg_bfs .emplace(column_ordinal, - BloomFilterEntry{.filter = std::move(bf), .target_fpp = opts->fpp}) + RowGroupBloomFilters::BloomFilterEntry{.filter = std::move(bf), + .target_fpp = opts->fpp}) .first->second.filter.get(); } @@ -236,7 +246,7 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) IndexLocations locations; for (size_t i = 0; i != bloom_filters_.size(); ++i) { - auto& row_group_bloom_filters = bloom_filters_[i]; + auto& row_group_bloom_filters = bloom_filters_[i].entries; for (auto& [column_id, entry] : row_group_bloom_filters) { // TODO(GH-43138): Determine the quality of bloom filter before writing it. PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());