Skip to content

Commit c72a39e

Browse files
authored
Merge pull request #30 from poyrazK/feature/bloom-filter-building-v2
feat: bloom filter building from left table data
2 parents 692f068 + 654d198 commit c72a39e

9 files changed

Lines changed: 1095 additions & 42 deletions

File tree

docs/performance/SQLITE_COMPARISON.md

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,34 +45,37 @@ We addressed the gaps via the following optimizations:
4545
Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage.
4646

4747
### Solution: Bloom Filter Integration
48-
Implemented bloom filters to filter tuples at the source before network transmission:
49-
- **One-sided bloom filter**: Built from the left/build table, applied to filter the right/probe table
50-
- **Distributed construction**: Each data node constructs its local bloom during the left/build scan phase
51-
- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes before the right/probe shuffle
48+
Implemented bloom filters to filter tuples at the source before network transmission using a 3-phase approach:
49+
- **Phase 1 (Local Build)**: Each data node scans its local left/build table partition, extracts join key values, and builds a local bloom filter
50+
- **Phase 2 (Bit Aggregation)**: Coordinator sends `BloomFilterBits` RPC to each data node; each responds with local bloom bits; coordinator OR-aggregates all bits into a single filter
51+
- **Phase 3 (Sender-Side Filter)**: Coordinator broadcasts aggregated filter via `BloomFilterPush` RPC; before sending right/probe tuples, `ShuffleFragment` handler checks `might_contain()` and skips tuples that will definitely not match
5252

5353
### Architecture
5454
```
55-
[Phase 1: Shuffle Left] [Phase 2: Shuffle Right]
56-
| |
57-
v v
58-
Build local bloom Apply bloom filter
59-
from join keys before buffering
60-
| |
61-
+---- BloomFilterPush ----->---+
62-
(filter metadata) |
63-
v
64-
Filtered tuples buffered
55+
Phase 1: Scan Left Phase 2: Aggregate Bits Phase 3: Filter Right
56+
| | |
57+
v v v
58+
Build local bloom <---> BloomFilterBits RPC <-------- Aggregate & Broadcast
59+
on each data node (OR-aggregate bits) via BloomFilterPush
60+
| | |
61+
| v v
62+
+-----------------> BloomFilterPush might_contain() check
63+
(metadata only) | before PushData
64+
|
65+
v
66+
Filtered tuples buffered
6567
```
6668

6769
### Key Components
6870
| Component | Location | Purpose |
6971
|-----------|----------|---------|
7072
| `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
71-
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
72-
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
73-
| `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples |
74-
| `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending |
75-
| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
73+
| `BloomFilterBitsArgs` RPC | `include/network/rpc_message.hpp` | Local bloom bits from data nodes |
74+
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Aggregated filter broadcast |
75+
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores local and aggregated bloom filters |
76+
| `BloomFilterBits` handler | `src/main.cpp` | Returns local bloom bits to coordinator |
77+
| `ShuffleFragment` handler | `src/main.cpp` | Builds local bloom during Phase 1 scan |
78+
| Coordinator | `src/distributed/distributed_executor.cpp` | Collects bits, aggregates, broadcasts filter |
7679

7780
### Test Coverage
7881
- 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic

docs/phases/PHASE_6_DISTRIBUTED_JOIN.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ Seamlessly integrated shuffle buffers into the Volcano execution model.
2828
### 5. Bloom Filter Optimization (`common/bloom_filter.hpp`)
2929
Added probabilistic filtering to reduce network traffic in shuffle joins.
3030
- **MurmurHash3-based BloomFilter**: Configurable false positive rate (default 1%) with optimal bit count and hash function calculation.
31-
- **Filter Construction**: Built during Phase 1 scan, stored in `ClusterManager` per context.
32-
- **Filter Application**: `PushData` handler checks `might_contain()` before buffering, skipping tuples that will definitely not match.
31+
- **Distributed Construction**: Each data node builds a local bloom filter from its left/build table partition during Phase 1 scan.
32+
- **Bit Aggregation**: Coordinator collects local bloom bits from all data nodes via `BloomFilterBits` RPC and OR-aggregates them into a single filter.
33+
- **Sender-Side Filtering**: Aggregated filter is broadcast via `BloomFilterPush` before Phase 2; `ShuffleFragment` handler applies `might_contain()` before sending `PushData`, skipping tuples that will definitely not match.
3334

3435
## Lessons Learned
3536
- Shuffle joins significantly reduce network traffic compared to broadcast joins for large-to-large table joins.

include/common/cluster_manager.hpp

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,129 @@ class ClusterManager {
279279
return "";
280280
}
281281

282+
/**
283+
* @brief Store local bloom filter bits from this node (called on data nodes)
284+
*/
285+
void set_local_bloom_bits(const std::string& context_id, std::vector<uint8_t> bits,
286+
size_t expected_elements, size_t num_hashes) {
287+
const std::scoped_lock<std::mutex> lock(mutex_);
288+
local_bloom_bits_[context_id] = std::move(bits);
289+
local_expected_elements_map_[context_id] = expected_elements;
290+
local_num_hashes_map_[context_id] = num_hashes;
291+
}
292+
293+
/**
294+
* @brief Get stored local bloom filter bits for a context
295+
*/
296+
[[nodiscard]] std::vector<uint8_t> get_local_bloom_bits(const std::string& context_id) const {
297+
const std::scoped_lock<std::mutex> lock(mutex_);
298+
auto it = local_bloom_bits_.find(context_id);
299+
if (it != local_bloom_bits_.end()) {
300+
return it->second;
301+
}
302+
return {};
303+
}
304+
305+
/**
306+
* @brief Get expected_elements for local bloom filter
307+
*/
308+
[[nodiscard]] size_t get_local_expected_elements(const std::string& context_id) const {
309+
const std::scoped_lock<std::mutex> lock(mutex_);
310+
auto it = local_expected_elements_map_.find(context_id);
311+
if (it != local_expected_elements_map_.end()) {
312+
return it->second;
313+
}
314+
return 0;
315+
}
316+
317+
/**
318+
* @brief Get num_hashes for local bloom filter
319+
*/
320+
[[nodiscard]] size_t get_local_num_hashes(const std::string& context_id) const {
321+
const std::scoped_lock<std::mutex> lock(mutex_);
322+
auto it = local_num_hashes_map_.find(context_id);
323+
if (it != local_num_hashes_map_.end()) {
324+
return it->second;
325+
}
326+
return 0;
327+
}
328+
282329
/**
283330
* @brief Clear bloom filter for a context
284331
*/
285332
void clear_bloom_filter(const std::string& context_id) {
286333
const std::scoped_lock<std::mutex> lock(mutex_);
287334
bloom_filters_.erase(context_id);
335+
local_bloom_bits_.erase(context_id);
336+
local_expected_elements_map_.erase(context_id);
337+
local_num_hashes_map_.erase(context_id);
338+
}
339+
340+
/**
341+
* @brief Store local right table rows for outer join processing
342+
* Called during Phase 2 shuffle when sending right table rows to other nodes
343+
*/
344+
void set_local_right_rows(const std::string& context_id, const std::string& table_name,
345+
std::vector<executor::Tuple> rows) {
346+
const std::scoped_lock<std::mutex> lock(mutex_);
347+
local_right_table_rows_[context_id][table_name] = std::move(rows);
348+
}
349+
350+
/**
351+
* @brief Get stored local right table rows
352+
*/
353+
[[nodiscard]] std::vector<executor::Tuple> get_local_right_rows(
354+
const std::string& context_id, const std::string& table_name) const {
355+
const std::scoped_lock<std::mutex> lock(mutex_);
356+
auto ctx_it = local_right_table_rows_.find(context_id);
357+
if (ctx_it != local_right_table_rows_.end()) {
358+
auto table_it = ctx_it->second.find(table_name);
359+
if (table_it != ctx_it->second.end()) {
360+
return table_it->second;
361+
}
362+
}
363+
return {};
364+
}
365+
366+
/**
367+
* @brief Clear local right table rows for a context
368+
*/
369+
void clear_local_right_rows(const std::string& context_id) {
370+
const std::scoped_lock<std::mutex> lock(mutex_);
371+
local_right_table_rows_.erase(context_id);
372+
}
373+
374+
/**
375+
* @brief Store unmatched rows for a context (used by outer join processing)
376+
*/
377+
void set_unmatched_rows(const std::string& context_id, const std::string& table_name,
378+
std::vector<executor::Tuple> rows) {
379+
const std::scoped_lock<std::mutex> lock(mutex_);
380+
unmatched_rows_[context_id][table_name] = std::move(rows);
381+
}
382+
383+
/**
384+
* @brief Get stored unmatched rows for a context
385+
*/
386+
[[nodiscard]] std::vector<executor::Tuple> get_unmatched_rows(
387+
const std::string& context_id, const std::string& table_name) const {
388+
const std::scoped_lock<std::mutex> lock(mutex_);
389+
auto ctx_it = unmatched_rows_.find(context_id);
390+
if (ctx_it != unmatched_rows_.end()) {
391+
auto table_it = ctx_it->second.find(table_name);
392+
if (table_it != ctx_it->second.end()) {
393+
return table_it->second;
394+
}
395+
}
396+
return {};
397+
}
398+
399+
/**
400+
* @brief Clear unmatched rows for a context
401+
*/
402+
void clear_unmatched_rows(const std::string& context_id) {
403+
const std::scoped_lock<std::mutex> lock(mutex_);
404+
unmatched_rows_.erase(context_id);
288405
}
289406

290407
private:
@@ -311,6 +428,16 @@ class ClusterManager {
311428
shuffle_buffers_;
312429
/* context_id -> bloom filter data */
313430
std::unordered_map<std::string, BloomFilterEntry> bloom_filters_;
431+
/* context_id -> local bloom filter bits (for aggregation during distributed build) */
432+
std::unordered_map<std::string, std::vector<uint8_t>> local_bloom_bits_;
433+
std::unordered_map<std::string, size_t> local_expected_elements_map_;
434+
std::unordered_map<std::string, size_t> local_num_hashes_map_;
435+
/* context_id -> table_name -> local right table rows for outer join tracking */
436+
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
437+
local_right_table_rows_;
438+
/* context_id -> table_name -> unmatched rows for outer join NULL-padding */
439+
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
440+
unmatched_rows_;
314441
mutable std::mutex mutex_;
315442
};
316443

include/executor/operator.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,18 @@ class HashJoinOperator : public Operator {
358358

359359
void set_memory_resource(std::pmr::memory_resource* mr) override;
360360
void set_params(const std::vector<common::Value>* params) override;
361+
362+
/**
363+
* @brief Get unmatched right rows after join execution
364+
* @return Vector of tuples - the right-side rows that had no match
365+
*/
366+
[[nodiscard]] std::vector<Tuple> get_unmatched_right_rows() const;
367+
368+
/**
369+
* @brief Get join key values of unmatched right rows
370+
* @return Vector of strings - the join key values for unmatched right rows
371+
*/
372+
[[nodiscard]] std::vector<std::string> get_unmatched_right_keys() const;
361373
};
362374

363375
/**

0 commit comments

Comments
 (0)