diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc index 577d26fe007..48ab07604c7 100644 --- a/cpp/src/parquet/bloom_filter.cc +++ b/cpp/src/parquet/bloom_filter.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include #include #include @@ -345,9 +347,75 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const { PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_)); } +void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) { + const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp); + if (num_folds > 0) { + Fold(num_folds); + } +} + +uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const { + const uint32_t num_blocks = NumBlocks(); + if (num_blocks < 2) { + return 0; + } + DCHECK_EQ(num_blocks & (num_blocks - 1), 0); + + uint64_t total_set_bits = 0; + const auto* bitset32 = reinterpret_cast(data_->data()); + const uint32_t num_words = num_bytes_ / static_cast(sizeof(uint32_t)); + for (uint32_t i = 0; i < num_words; ++i) { + total_set_bits += static_cast(std::popcount(bitset32[i])); + } + + const double avg_fill = + static_cast(total_set_bits) / (static_cast(num_blocks) * 256.0); + const auto max_folds = static_cast(std::countr_zero(num_blocks)); + + if (avg_fill == 0.0) { + return max_folds; + } + + uint32_t num_folds = 0; + double one_minus_fk = 1.0 - avg_fill; + for (uint32_t i = 0; i < max_folds; ++i) { + one_minus_fk *= one_minus_fk; + const double fk = 1.0 - one_minus_fk; + const double estimated_fpp = std::pow(fk, kBitsSetPerBlock); + if (estimated_fpp > target_fpp) { + break; + } + ++num_folds; + } + return num_folds; +} + +void BlockSplitBloomFilter::Fold(uint32_t num_folds) { + DCHECK_GT(num_folds, 0); + + const uint32_t num_blocks = NumBlocks(); + const uint32_t group_size = UINT32_C(1) << num_folds; + DCHECK_LE(group_size, num_blocks); + + const uint32_t new_num_blocks = num_blocks / group_size; + auto* bitset32 = reinterpret_cast(data_->mutable_data()); + + for (uint32_t dst_block = 0; dst_block < new_num_blocks; ++dst_block) { + const uint32_t src_block = dst_block * group_size; + for (int word = 0; word < kBitsSetPerBlock; ++word) { + uint32_t merged = bitset32[src_block * kBitsSetPerBlock + word]; + for (uint32_t fold_block = 1; fold_block < group_size; ++fold_block) { + merged |= bitset32[(src_block + fold_block) * kBitsSetPerBlock + word]; + } + bitset32[dst_block * kBitsSetPerBlock + word] = merged; + } + } + + num_bytes_ = new_num_blocks * kBytesPerFilterBlock; +} + bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { - const uint32_t bucket_index = - static_cast(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32); + const uint32_t bucket_index = static_cast(((hash >> 32) * NumBlocks()) >> 32); const uint32_t key = static_cast(hash); const uint32_t* bitset32 = reinterpret_cast(data_->data()); @@ -363,8 +431,7 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { } void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) { - const uint32_t bucket_index = - static_cast(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32); + const uint32_t bucket_index = static_cast(((hash >> 32) * NumBlocks()) >> 32); const uint32_t key = static_cast(hash); uint32_t* bitset32 = reinterpret_cast(data_->mutable_data()); diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index cabcd5b4a5d..4511ff7eea4 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -230,7 +230,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// @param fpp The false positive probability. /// @return it always return a value between kMinimumBloomFilterBytes and /// kMaximumBloomFilterBytes, and the return value is always a power of 2 - static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) { + static uint32_t OptimalNumOfBytes(uint64_t ndv, double fpp) { uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp); ARROW_DCHECK(::arrow::bit_util::IsMultipleOf8(optimal_num_of_bits)); return optimal_num_of_bits >> 3; @@ -243,7 +243,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// @param fpp The false positive probability. /// @return it always return a value between kMinimumBloomFilterBytes * 8 and /// kMaximumBloomFilterBytes * 8, and the return value is always a power of 16 - static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) { + static uint32_t OptimalNumOfBits(uint64_t ndv, double fpp) { ARROW_DCHECK(fpp > 0.0 && fpp < 1.0); const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8)); uint32_t num_bits; @@ -276,6 +276,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { bool FindHash(uint64_t hash) const override; void InsertHash(uint64_t hash) override; void InsertHashes(const uint64_t* hashes, int num_values) override; + /// Fold the bloom filter down to the smallest size that still meets the target FPP + /// (False Positive Percentage). + void FoldToTargetFpp(double target_fpp); void WriteTo(ArrowOutputStream* sink) const override; uint32_t GetBitsetSize() const override { return num_bytes_; } @@ -350,6 +353,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { private: inline void InsertHashImpl(uint64_t hash); + uint32_t NumBlocks() const { return num_bytes_ / kBytesPerFilterBlock; } + uint32_t NumFoldsForTargetFpp(double target_fpp) const; + void Fold(uint32_t num_folds); // Bytes in a tiny Bloom filter block. static constexpr int kBytesPerFilterBlock = 32; diff --git a/cpp/src/parquet/bloom_filter_reader_writer_test.cc b/cpp/src/parquet/bloom_filter_reader_writer_test.cc index d646a2b8fc1..0f17fd5dace 100644 --- a/cpp/src/parquet/bloom_filter_reader_writer_test.cc +++ b/cpp/src/parquet/bloom_filter_reader_writer_test.cc @@ -93,7 +93,7 @@ TEST(BloomFilterBuilder, BasicRoundTrip) { BloomFilterOptions bloom_filter_options{100, 0.05}; const auto bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes( - bloom_filter_options.ndv, bloom_filter_options.fpp); + bloom_filter_options.ndv.value(), bloom_filter_options.fpp); WriterProperties::Builder properties_builder; properties_builder.enable_bloom_filter("c1", bloom_filter_options); auto writer_properties = properties_builder.build(); @@ -150,6 +150,53 @@ TEST(BloomFilterBuilder, BasicRoundTrip) { } } +TEST(BloomFilterBuilder, FoldsOverestimatedNdvBeforeWriting) { + SchemaDescriptor schema; + schema::NodePtr root = + schema::GroupNode::Make("schema", Repetition::REPEATED, {schema::ByteArray("c1")}); + schema.Init(root); + + BloomFilterOptions bloom_filter_options{.ndv = 1'000'000, .fpp = 0.05}; + const auto initial_bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes( + bloom_filter_options.ndv.value(), bloom_filter_options.fpp); + WriterProperties::Builder properties_builder; + properties_builder.enable_bloom_filter("c1", bloom_filter_options); + auto writer_properties = properties_builder.build(); + auto bloom_filter_builder = BloomFilterBuilder::Make(&schema, writer_properties.get()); + + bloom_filter_builder->AppendRowGroup(); + auto bloom_filter = bloom_filter_builder->CreateBloomFilter(/*column_ordinal=*/0); + ASSERT_NE(bloom_filter, nullptr); + ASSERT_EQ(initial_bitset_size, bloom_filter->GetBitsetSize()); + + std::vector hashes; + hashes.reserve(1000); + for (int32_t i = 0; i < 1000; ++i) { + const auto hash = bloom_filter->Hash(i); + hashes.push_back(hash); + bloom_filter->InsertHash(hash); + } + + auto sink = CreateOutputStream(); + auto locations = bloom_filter_builder->WriteTo(sink.get()); + ASSERT_EQ(locations.size(), 1); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + const auto& location = locations.front().second; + ReaderProperties reader_properties; + ::arrow::io::BufferReader reader( + ::arrow::SliceBuffer(buffer, location.offset, location.length)); + auto folded_filter = + parquet::BlockSplitBloomFilter::Deserialize(reader_properties, &reader); + + EXPECT_LT(folded_filter.GetBitsetSize(), initial_bitset_size); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(1000, bloom_filter_options.fpp), + folded_filter.GetBitsetSize()); + for (uint64_t hash : hashes) { + EXPECT_TRUE(folded_filter.FindHash(hash)); + } +} + TEST(BloomFilterBuilder, InvalidOperations) { SchemaDescriptor schema; schema::NodePtr root = schema::GroupNode::Make( diff --git a/cpp/src/parquet/bloom_filter_writer.cc b/cpp/src/parquet/bloom_filter_writer.cc index f06b866c30e..3230626ef45 100644 --- a/cpp/src/parquet/bloom_filter_writer.cc +++ b/cpp/src/parquet/bloom_filter_writer.cc @@ -185,8 +185,21 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder { const WriterProperties* properties_; bool finished_ = false; - using RowGroupBloomFilters = - std::map>; + struct RowGroupBloomFilters { + RowGroupBloomFilters() = default; + RowGroupBloomFilters(RowGroupBloomFilters&&) noexcept = default; + RowGroupBloomFilters& operator=(RowGroupBloomFilters&&) noexcept = default; + RowGroupBloomFilters(const RowGroupBloomFilters&) = delete; + RowGroupBloomFilters& operator=(const RowGroupBloomFilters&) = delete; + + struct BloomFilterEntry { + std::unique_ptr filter; + double target_fpp; + }; + + std::map entries; + }; + std::vector bloom_filters_; // indexed by row group ordinal }; @@ -206,7 +219,7 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) { CheckState(column_ordinal); - auto& curr_rg_bfs = *bloom_filters_.rbegin(); + auto& curr_rg_bfs = bloom_filters_.back().entries; if (curr_rg_bfs.find(column_ordinal) != curr_rg_bfs.cend()) { std::stringstream ss; ss << "Bloom filter already exists for column: " << column_ordinal @@ -214,9 +227,14 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) { throw ParquetException(ss.str()); } + ARROW_DCHECK(opts->ndv.has_value()); auto bf = std::make_unique(properties_->memory_pool()); - bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv, opts->fpp)); - return curr_rg_bfs.emplace(column_ordinal, std::move(bf)).first->second.get(); + bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv.value(), opts->fpp)); + return curr_rg_bfs + .emplace(column_ordinal, + RowGroupBloomFilters::BloomFilterEntry{.filter = std::move(bf), + .target_fpp = opts->fpp}) + .first->second.filter.get(); } IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) { @@ -228,11 +246,12 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) IndexLocations locations; for (size_t i = 0; i != bloom_filters_.size(); ++i) { - auto& row_group_bloom_filters = bloom_filters_[i]; - for (const auto& [column_id, filter] : row_group_bloom_filters) { + auto& row_group_bloom_filters = bloom_filters_[i].entries; + for (auto& [column_id, entry] : row_group_bloom_filters) { // TODO(GH-43138): Determine the quality of bloom filter before writing it. PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell()); - filter->WriteTo(sink); + entry.filter->FoldToTargetFpp(entry.target_fpp); + entry.filter->WriteTo(sink); PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell()); if (pos - offset > std::numeric_limits::max()) { diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 6634bac4f68..d5fa7ab12f4 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include #include @@ -174,11 +175,14 @@ struct PARQUET_EXPORT BloomFilterOptions { /// Expected number of distinct values (NDV) in the bloom filter. /// /// Bloom filters are most effective for high-cardinality columns. A good default - /// is to set ndv equal to the number of rows. Lower values reduce disk usage but - /// may not be worthwhile for very small NDVs. + /// is to set ndv equal to the number of rows. If unset, the writer resolves ndv + /// to the max row group row count. Lower values reduce disk usage but may not + /// be worthwhile for very small NDVs. /// - /// Increasing ndv (without increasing fpp) increases disk and memory usage. - int32_t ndv = 1 << 20; + /// Increasing ndv (without increasing fpp) increases memory usage. The writer + /// may fold the filter before serialization, but will not grow an undersized + /// filter. + std::optional ndv = std::nullopt; /// False-positive probability (FPP) of the bloom filter. /// @@ -256,6 +260,10 @@ class PARQUET_EXPORT ColumnProperties { "Bloom filter false positive probability must be in (0.0, 1.0), got " + std::to_string(bloom_filter_options.fpp)); } + if (bloom_filter_options.ndv.has_value() && bloom_filter_options.ndv.value() < 0) { + throw ParquetException("Bloom filter number of distinct values must be >= 0, got " + + std::to_string(bloom_filter_options.ndv.value())); + } bloom_filter_options_ = bloom_filter_options; } @@ -863,8 +871,16 @@ class PARQUET_EXPORT WriterProperties { get(item.first).set_statistics_enabled(item.second); for (const auto& item : page_index_enabled_) get(item.first).set_page_index_enabled(item.second); - for (const auto& item : bloom_filter_options_) - get(item.first).set_bloom_filter_options(item.second); + for (const auto& item : bloom_filter_options_) { + const auto& bloom_filter_options = item.second; + if (bloom_filter_options.ndv.has_value()) { + get(item.first).set_bloom_filter_options(bloom_filter_options); + } else { + auto resolved_options = bloom_filter_options; + resolved_options.ndv = max_row_group_length_; + get(item.first).set_bloom_filter_options(resolved_options); + } + } return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, diff --git a/cpp/src/parquet/properties_test.cc b/cpp/src/parquet/properties_test.cc index 0743b7ad4de..8278b734a13 100644 --- a/cpp/src/parquet/properties_test.cc +++ b/cpp/src/parquet/properties_test.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include #include +#include #include #include "arrow/buffer.h" @@ -24,6 +26,7 @@ #include "parquet/file_reader.h" #include "parquet/properties.h" +#include "parquet/test_util.h" namespace parquet { @@ -115,6 +118,48 @@ TEST(TestWriterProperties, SetCodecOptions) { ->window_bits); } +TEST(TestWriterProperties, BloomFilterNdvDefaults) { + BloomFilterOptions options; + ASSERT_FALSE(options.ndv.has_value()); + options.fpp = 0.05; + + auto props = WriterProperties::Builder() + .max_row_group_length(12345) + ->enable_bloom_filter("a", options) + ->build(); + + const auto resolved = props->bloom_filter_options(ColumnPath::FromDotString("a")); + ASSERT_TRUE(resolved.has_value()); + ASSERT_TRUE(resolved->ndv.has_value()); + ASSERT_EQ(12345, resolved->ndv.value()); + ASSERT_EQ(options.fpp, resolved->fpp); +} + +TEST(TestWriterProperties, BloomFilterExplicitNdv) { + BloomFilterOptions options{.ndv = 777, .fpp = 0.05}; + + auto props = WriterProperties::Builder() + .max_row_group_length(12345) + ->enable_bloom_filter("a", options) + ->build(); + + const auto resolved = props->bloom_filter_options(ColumnPath::FromDotString("a")); + ASSERT_TRUE(resolved.has_value()); + ASSERT_TRUE(resolved->ndv.has_value()); + ASSERT_EQ(777, resolved->ndv.value()); +} + +TEST(TestWriterProperties, BloomFilterRejectsNegativeNdv) { + BloomFilterOptions options{.ndv = -1, .fpp = 0.05}; + + EXPECT_THROW_THAT( + [&]() { WriterProperties::Builder().enable_bloom_filter("a", options)->build(); }, + ParquetException, + ::testing::Property( + &ParquetException::what, + ::testing::HasSubstr("Bloom filter number of distinct values must be >= 0"))); +} + TEST(TestWriterProperties, ContentDefinedChunkingSettings) { WriterProperties::Builder builder; std::shared_ptr props = builder.build(); @@ -218,6 +263,15 @@ TEST_P(WriterPropertiesTest, RoundTripThroughBuilder) { column_properties.page_index_enabled()); ASSERT_EQ(round_tripped_col.statistics_enabled(), column_properties.statistics_enabled()); + const auto round_tripped_bloom_filter_options = + round_tripped_col.bloom_filter_options(); + const auto bloom_filter_options = column_properties.bloom_filter_options(); + ASSERT_EQ(round_tripped_bloom_filter_options.has_value(), + bloom_filter_options.has_value()); + if (bloom_filter_options.has_value()) { + ASSERT_EQ(round_tripped_bloom_filter_options->ndv, bloom_filter_options->ndv); + ASSERT_EQ(round_tripped_bloom_filter_options->fpp, bloom_filter_options->fpp); + } } } @@ -285,6 +339,13 @@ std::vector writer_properties_test_cases() { builder.enable_write_page_index(column_a); test_cases.emplace_back(builder.build(), "page_index_column_override"); } + { + WriterProperties::Builder builder; + builder.max_row_group_length(12345); + builder.enable_bloom_filter(column_a, + BloomFilterOptions{.ndv = std::nullopt, .fpp = 0.05}); + test_cases.emplace_back(builder.build(), "bloom_filter_column_override"); + } return test_cases; }