feat: bloom filter building from left table data#30
Conversation
- Build local bloom filter during Phase 1 left table scan - Collect and OR-aggregate bits via BloomFilterBits RPC - Broadcast aggregated filter via BloomFilterPush before Phase 2 - Apply sender-side filtering before PushData in Phase 2
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 11 minutes and 11 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (9)
📝 WalkthroughWalkthroughThis change introduces a distributed bloom filter collection mechanism. New ClusterManager APIs enable per-context storage and retrieval of local bloom filter bits. A new RPC message type (BloomFilterBits) carries filter state across nodes. The distributed executor now aggregates per-node bloom filters via bitwise OR during Phase 2. Local bloom filter construction occurs during shuffle operations and is persisted via ClusterManager. Changes
Sequence DiagramsequenceDiagram
participant Coordinator
participant DataNode1
participant DataNode2
participant ClusterMgr as Cluster Manager
Coordinator->>DataNode1: Phase 1: Execute shuffle (RPC)
DataNode1->>DataNode1: Scan tuples & build local bloom filter
DataNode1->>ClusterMgr: set_local_bloom_bits(context_id, bits, expected, hashes)
ClusterMgr->>ClusterMgr: Store filter bits & metadata
Coordinator->>DataNode2: Phase 1: Execute shuffle (RPC)
DataNode2->>DataNode2: Scan tuples & build local bloom filter
DataNode2->>ClusterMgr: set_local_bloom_bits(context_id, bits, expected, hashes)
ClusterMgr->>ClusterMgr: Store filter bits & metadata
Note over Coordinator: Phase 2: Aggregate
Coordinator->>DataNode1: Retrieve bloom bits (RpcType::BloomFilterBits)
DataNode1->>ClusterMgr: get_local_bloom_bits(context_id)
ClusterMgr-->>DataNode1: Return stored bits
DataNode1-->>Coordinator: BloomFilterBitsArgs response
Coordinator->>DataNode2: Retrieve bloom bits (RpcType::BloomFilterBits)
DataNode2->>ClusterMgr: get_local_bloom_bits(context_id)
ClusterMgr-->>DataNode2: Return stored bits
DataNode2-->>Coordinator: BloomFilterBitsArgs response
Coordinator->>Coordinator: Aggregate: OR all bits, sum expected, max hashes
Coordinator->>DataNode1: Broadcast aggregated filter (RpcType::BloomFilterPush)
Coordinator->>DataNode2: Broadcast aggregated filter (RpcType::BloomFilterPush)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
include/common/cluster_manager.hpp (1)
321-327:⚠️ Potential issue | 🟡 MinorLocal bloom filter entries are never cleaned up.
clear_bloom_filtercleans upbloom_filters_but doesn't clean uplocal_bloom_bits_. Over time, entries will accumulate for completed contexts.♻️ Proposed fix
void clear_bloom_filter(const std::string& context_id) { const std::scoped_lock<std::mutex> lock(mutex_); bloom_filters_.erase(context_id); + local_bloom_bits_.erase(context_id); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@include/common/cluster_manager.hpp` around lines 321 - 327, clear_bloom_filter currently erases from bloom_filters_ but neglects to remove entries from local_bloom_bits_, causing leaked per-context state; update clear_bloom_filter to also erase local_bloom_bits_. Use the same std::scoped_lock<std::mutex> lock(mutex_) already present to ensure thread safety and call local_bloom_bits_.erase(context_id) (or equivalent) alongside bloom_filters_.erase(context_id) inside the function.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@include/common/cluster_manager.hpp`:
- Around line 285-291: The method set_local_bloom_bits stores bits per
context_id but writes global metadata fields (local_expected_elements_ and
local_num_hashes_), causing races; change the metadata to be stored per-context
(e.g., replace local_expected_elements_ and local_num_hashes_ with maps keyed by
context_id, such as local_expected_elements_map_ and local_num_hashes_map_) and
assign into those maps inside set_local_bloom_bits (holding mutex_), and update
any corresponding getters/accessors and the analogous code mentioned for lines
353-356 to read metadata from the per-context maps instead of the old globals so
each context's bits/metadata remain matched and thread-safe.
In `@src/main.cpp`:
- Around line 519-544: The BloomFilterBits RPC handler currently uses global
metadata getters get_local_expected_elements() and get_local_num_hashes(),
causing mismatched metadata for multiple contexts; after ClusterManager is
updated to store per-context values, change those calls in the BloomFilterBits
handler to call the per-context variants (pass args.context_id) so
reply_args.expected_elements =
cluster_manager->get_local_expected_elements(args.context_id) and
reply_args.num_hashes = cluster_manager->get_local_num_hashes(args.context_id),
keeping reply_args.context_id and filter_data assignment via
get_local_bloom_bits(args.context_id) as-is.
---
Outside diff comments:
In `@include/common/cluster_manager.hpp`:
- Around line 321-327: clear_bloom_filter currently erases from bloom_filters_
but neglects to remove entries from local_bloom_bits_, causing leaked
per-context state; update clear_bloom_filter to also erase local_bloom_bits_.
Use the same std::scoped_lock<std::mutex> lock(mutex_) already present to ensure
thread safety and call local_bloom_bits_.erase(context_id) (or equivalent)
alongside bloom_filters_.erase(context_id) inside the function.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: da036d01-db6a-48ac-b39e-ccad3e7983c3
📒 Files selected for processing (5)
include/common/cluster_manager.hppinclude/network/rpc_message.hppsrc/distributed/distributed_executor.cppsrc/main.cpptests/distributed_tests.cpp
- Store expected_elements and num_hashes in per-context maps instead of globals - Update set_local_bloom_bits to write to per-context maps - Update getters to take context_id parameter - clear_bloom_filter now also erases local_bloom_bits and metadata maps - Update BloomFilterBits handler to use per-context getters
The distributed shuffle join algorithm only supports INNER joins. LEFT, RIGHT, and FULL outer joins require different handling (e.g., broadcasting the outer table, or double-shuffle with side tables) that is not yet implemented. Instead of producing incorrect results, we now return a clear error message. Also add unit tests RightJoinRejection and FullJoinRejection to verify this behavior.
The distributed shuffle join can correctly handle LEFT joins in the current implementation because each node executes the query locally and LEFT join only requires preserving unmatched left-table rows (which are already local to each node). RIGHT and FULL joins require tracking unmatched rows across partitions which is not yet implemented. Update error message to say "INNER and LEFT" instead of just "INNER".
c92194f to
e35159e
Compare
- Skip bloom filter for RIGHT/FULL joins to prevent false negatives - Add integration tests for bloom filter skip - Allow LEFT joins in distributed shuffle join The bloom filter can cause false negatives (rows filtered when they shouldn't be), which violates outer join semantics. For RIGHT/FULL joins, all right rows must be sent to ensure correct results. NOTE: Phase 3-5 for collecting unmatched outer rows is temporarily disabled due to column indexing issues with non-SELECT * queries. The local executor on each data node handles unmatched right rows correctly for RIGHT JOIN. FULL JOIN unmatched left rows will be implemented separately.
ef30b18 to
c171913
Compare
Summary
Test plan
Summary by CodeRabbit
New Features
Tests