Skip to content

Commit ceaae2e

Browse files
authored
Merge pull request #25 from poyrazK/release/bloom-filter-v2
feat: bloom filter integration for distributed joins
2 parents 805194e + 0991945 commit ceaae2e

12 files changed

Lines changed: 917 additions & 3 deletions

File tree

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ set(CORE_SOURCES
7272
src/distributed/raft_group.cpp
7373
src/distributed/raft_manager.cpp
7474
src/distributed/distributed_executor.cpp
75+
src/common/bloom_filter.cpp
7576
src/storage/columnar_table.cpp
7677
)
7778

@@ -117,6 +118,7 @@ if(BUILD_TESTS)
117118
add_cloudsql_test(catalog_coverage_tests tests/catalog_coverage_tests.cpp)
118119
add_cloudsql_test(transaction_coverage_tests tests/transaction_coverage_tests.cpp)
119120
add_cloudsql_test(utils_coverage_tests tests/utils_coverage_tests.cpp)
121+
add_cloudsql_test(bloom_filter_tests tests/bloom_filter_test.cpp)
120122
add_cloudsql_test(cloudSQL_tests tests/cloudSQL_tests.cpp)
121123
add_cloudsql_test(server_tests tests/server_tests.cpp)
122124
add_cloudsql_test(statement_tests tests/statement_tests.cpp)

docs/performance/SQLITE_COMPARISON.md

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,48 @@ We addressed the gaps via the following optimizations:
3939
2. **Pinned Page Iteration**: Modifying our `HeapTable::Iterator` to hold pages pinned across slot iteration avoids repetitive atomic checks and LRU updates per-row.
4040
3. **Batch Insert Mode**: Skipping single-row undo logs and exclusive locks to exploit pure in-memory bump allocation. This drove the `INSERT` speedup well past SQLite limits, as we write raw tuples uninterrupted.
4141

42-
## 6. Future Roadmap
42+
## 6. Distributed Join Optimization: Bloom Filters
43+
44+
### Problem
45+
Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage.
46+
47+
### Solution: Bloom Filter Integration
48+
Implemented bloom filters to filter tuples at the source before network transmission:
49+
- **One-sided bloom filter**: Built from the left/build table, applied to filter the right/probe table
50+
- **Distributed construction**: Each data node constructs its local bloom during the left/build scan phase
51+
- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes before the right/probe shuffle
52+
53+
### Architecture
54+
```
55+
[Phase 1: Shuffle Left] [Phase 2: Shuffle Right]
56+
| |
57+
v v
58+
Build local bloom Apply bloom filter
59+
from join keys before buffering
60+
| |
61+
+---- BloomFilterPush ----->---+
62+
(filter metadata) |
63+
v
64+
Filtered tuples buffered
65+
```
66+
67+
### Key Components
68+
| Component | Location | Purpose |
69+
|-----------|----------|---------|
70+
| `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
71+
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
72+
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
73+
| `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples |
74+
| `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending |
75+
| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
76+
77+
### Test Coverage
78+
- 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic
79+
- Tests located in `tests/bloom_filter_test.cpp`
80+
81+
## 7. Future Roadmap
4382
With the scan gap closed, our focus shifts to higher-level analytical throughput:
4483
* **Stage 1: SIMD-Accelerated Filtering**: Utilize AVX-512/NEON instructions to filter multiple rows in a single CPU cycle.
4584
* **Stage 2: Vectorized Execution**: Move from row-at-a-time `TupleView` to batch-at-a-time `VectorBatch` processing.
4685
* **Stage 3: Columnar Storage**: Transition from row-oriented heap files to columnar persistence for extreme analytical scanning.
86+
* **Stage 4: Distributed Hash Join**: Enhance the single `HashJoinOperator` with parallel partitioned hash join for multi-node execution.

docs/phases/PHASE_6_DISTRIBUTED_JOIN.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Introduced isolated staging areas for inter-node data movement.
1414
Developed a dedicated binary protocol for efficient data redistribution.
1515
- **ShuffleFragment**: Metadata describing the fragment being pushed (target context, source node, schema).
1616
- **PushData**: High-speed binary payload containing the actual tuple data for the shuffle phase.
17+
- **BloomFilterPush**: Bloom filter metadata broadcast to enable tuple filtering before network transmission.
1718

1819
### 3. Two-Phase Join Orchestration (`distributed/distributed_executor.cpp`)
1920
Implemented the control logic for distributed shuffle joins.
@@ -24,9 +25,17 @@ Implemented the control logic for distributed shuffle joins.
2425
Seamlessly integrated shuffle buffers into the Volcano execution model.
2526
- **Vectorized Buffering**: Optimized the `BufferScanOperator` to handle large volumes of redistributed data with minimal overhead.
2627

28+
### 5. Bloom Filter Optimization (`common/bloom_filter.hpp`)
29+
Added probabilistic filtering to reduce network traffic in shuffle joins.
30+
- **MurmurHash3-based BloomFilter**: Configurable false positive rate (default 1%) with optimal bit count and hash function calculation.
31+
- **Filter Construction**: Built during Phase 1 scan, stored in `ClusterManager` per context.
32+
- **Filter Application**: `PushData` handler checks `might_contain()` before buffering, skipping tuples that will definitely not match.
33+
2734
## Lessons Learned
2835
- Shuffle joins significantly reduce network traffic compared to broadcast joins for large-to-large table joins.
2936
- Fine-grained locking in the shuffle buffers is critical for maintaining high throughput during the redistribution phase.
37+
- Bloom filters provide significant network traffic reduction when join selectivity is low, at the cost of a small false positive rate (typically <1%).
3038

3139
## Status: 100% Test Pass
3240
Verified the end-to-end shuffle join flow, including multi-node data movement and final result merging, through automated integration tests.
41+
- 10 unit tests for bloom filter implementation and integration (`tests/bloom_filter_test.cpp`)

docs/phases/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ This directory contains the technical documentation for the lifecycle of the clo
4141
- Context-aware Shuffle infrastructure in `ClusterManager`.
4242
- Implementation of `ShuffleFragment` and `PushData` RPC protocols.
4343
- Two-phase Shuffle Join orchestration in `DistributedExecutor`.
44+
- **Bloom Filter Optimization**: Probabilistic tuple filtering to reduce network traffic in shuffle joins.
4445

4546
### [Phase 7: Replication & High Availability](./PHASE_7_REPLICATION_HA.md)
4647
**Focus**: Fault Tolerance & Data Redundancy.

include/common/bloom_filter.hpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* @file bloom_filter.hpp
3+
* @brief Bloom filter implementation for distributed join optimization
4+
*/
5+
6+
#ifndef SQL_ENGINE_COMMON_BLOOM_FILTER_HPP
7+
#define SQL_ENGINE_COMMON_BLOOM_FILTER_HPP
8+
9+
#include <cstdint>
10+
#include <cstring>
11+
#include <vector>
12+
13+
#include "value.hpp"
14+
15+
namespace cloudsql {
16+
namespace common {
17+
18+
/**
19+
* @brief Bloom filter for probabilistic membership testing
20+
*
21+
* Used in distributed joins to filter tuples that cannot possibly
22+
* match before network transmission.
23+
*/
24+
class BloomFilter {
25+
public:
26+
/**
27+
* @brief Construct a bloom filter with expected elements and false positive rate
28+
* @param expected_elements Number of elements expected to be inserted
29+
* @param false_positive_rate Target false positive rate (default 0.01 = 1%)
30+
*/
31+
explicit BloomFilter(size_t expected_elements, double false_positive_rate = 0.01);
32+
33+
/**
34+
* @brief Construct from serialized data
35+
*/
36+
BloomFilter(const uint8_t* data, size_t size);
37+
38+
/**
39+
* @brief Insert a value into the bloom filter
40+
*/
41+
void insert(const Value& key);
42+
43+
/**
44+
* @brief Check if a value might be in the bloom filter
45+
* @return true if possibly present, false if definitely not present
46+
*/
47+
[[nodiscard]] bool might_contain(const Value& key) const;
48+
49+
/**
50+
* @brief Serialize the bloom filter for network transmission
51+
*/
52+
[[nodiscard]] std::vector<uint8_t> serialize() const;
53+
54+
/**
55+
* @brief Get the bit array size in bytes
56+
*/
57+
[[nodiscard]] size_t bit_size() const { return (num_bits_ + 7) / 8; }
58+
59+
/**
60+
* @brief Get number of hash functions used
61+
*/
62+
[[nodiscard]] size_t num_hashes() const { return num_hashes_; }
63+
64+
/**
65+
* @brief Get expected elements
66+
*/
67+
[[nodiscard]] size_t expected_elements() const { return expected_elements_; }
68+
69+
private:
70+
size_t num_bits_;
71+
size_t num_hashes_;
72+
size_t expected_elements_;
73+
std::vector<uint8_t> bits_;
74+
75+
size_t get_bit_position(size_t hash, size_t i) const;
76+
size_t murmur3_hash(const Value& key) const;
77+
size_t murmur3_hash(const uint8_t* data, size_t len, size_t seed) const;
78+
};
79+
80+
} // namespace common
81+
} // namespace cloudsql
82+
83+
#endif // SQL_ENGINE_COMMON_BLOOM_FILTER_HPP

include/common/cluster_manager.hpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <unordered_map>
1414
#include <vector>
1515

16+
#include "common/bloom_filter.hpp"
1617
#include "common/config.hpp"
1718
#include "executor/types.hpp"
1819

@@ -210,7 +211,95 @@ class ClusterManager {
210211
return data;
211212
}
212213

214+
/**
215+
* @brief Store a bloom filter for a shuffle context
216+
*/
217+
void set_bloom_filter(const std::string& context_id, const std::string& build_table,
218+
const std::string& probe_table, const std::string& probe_key_col,
219+
std::vector<uint8_t> filter_data, size_t expected_elements,
220+
size_t num_hashes) {
221+
const std::scoped_lock<std::mutex> lock(mutex_);
222+
auto& entry = bloom_filters_[context_id];
223+
entry.build_table = build_table;
224+
entry.probe_table = probe_table;
225+
entry.probe_key_col = probe_key_col;
226+
entry.filter_data = std::move(filter_data);
227+
entry.expected_elements = expected_elements;
228+
entry.num_hashes = num_hashes;
229+
}
230+
231+
/**
232+
* @brief Check if a bloom filter exists for a context
233+
* @note Returns false if filter_data is empty, so bloom filtering is skipped
234+
*/
235+
[[nodiscard]] bool has_bloom_filter(const std::string& context_id) const {
236+
const std::scoped_lock<std::mutex> lock(mutex_);
237+
auto it = bloom_filters_.find(context_id);
238+
if (it == bloom_filters_.end()) {
239+
return false;
240+
}
241+
// Only consider bloom filter valid if it has actual filter data
242+
return !it->second.filter_data.empty();
243+
}
244+
245+
/**
246+
* @brief Get bloom filter for a context (reconstructs BloomFilter object)
247+
*/
248+
[[nodiscard]] common::BloomFilter get_bloom_filter(const std::string& context_id) const {
249+
const std::scoped_lock<std::mutex> lock(mutex_);
250+
auto it = bloom_filters_.find(context_id);
251+
if (it != bloom_filters_.end() && !it->second.filter_data.empty()) {
252+
return common::BloomFilter(it->second.filter_data.data(),
253+
it->second.filter_data.size());
254+
}
255+
return common::BloomFilter(1); // Empty filter
256+
}
257+
258+
/**
259+
* @brief Get probe table name for a context
260+
*/
261+
[[nodiscard]] std::string get_probe_table(const std::string& context_id) const {
262+
const std::scoped_lock<std::mutex> lock(mutex_);
263+
auto it = bloom_filters_.find(context_id);
264+
if (it != bloom_filters_.end()) {
265+
return it->second.probe_table;
266+
}
267+
return "";
268+
}
269+
270+
/**
271+
* @brief Get probe key column for a context
272+
*/
273+
[[nodiscard]] std::string get_probe_key_col(const std::string& context_id) const {
274+
const std::scoped_lock<std::mutex> lock(mutex_);
275+
auto it = bloom_filters_.find(context_id);
276+
if (it != bloom_filters_.end()) {
277+
return it->second.probe_key_col;
278+
}
279+
return "";
280+
}
281+
282+
/**
283+
* @brief Clear bloom filter for a context
284+
*/
285+
void clear_bloom_filter(const std::string& context_id) {
286+
const std::scoped_lock<std::mutex> lock(mutex_);
287+
bloom_filters_.erase(context_id);
288+
}
289+
213290
private:
291+
/**
292+
* @brief Stored bloom filter data for a context
293+
*/
294+
struct BloomFilterEntry {
295+
std::string build_table;
296+
std::string probe_table;
297+
std::string probe_key_col; // Join key column on probe side
298+
std::vector<uint8_t> filter_data;
299+
size_t expected_elements = 0;
300+
size_t num_hashes = 0;
301+
};
302+
214303
const config::Config* config_;
215304
raft::RaftManager* raft_manager_;
216305
NodeInfo self_node_;
@@ -220,6 +309,8 @@ class ClusterManager {
220309
/* context_id -> table_name -> rows */
221310
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
222311
shuffle_buffers_;
312+
/* context_id -> bloom filter data */
313+
std::unordered_map<std::string, BloomFilterEntry> bloom_filters_;
223314
mutable std::mutex mutex_;
224315
};
225316

include/network/rpc_message.hpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ enum class RpcType : uint8_t {
3333
TxnAbort = 8,
3434
PushData = 9,
3535
ShuffleFragment = 10,
36+
BloomFilterPush = 11,
3637
Error = 255
3738
};
3839

@@ -439,6 +440,73 @@ struct ShuffleFragmentArgs {
439440
}
440441
};
441442

443+
/**
444+
* @brief Arguments for BloomFilterPush RPC
445+
*/
446+
struct BloomFilterArgs {
447+
std::string context_id;
448+
std::string build_table;
449+
std::string probe_table;
450+
std::string probe_key_col; // Join key column on probe side for filtering
451+
std::vector<uint8_t> filter_data;
452+
size_t expected_elements = 0;
453+
size_t num_hashes = 0;
454+
455+
[[nodiscard]] std::vector<uint8_t> serialize() const {
456+
std::vector<uint8_t> out;
457+
Serializer::serialize_string(context_id, out);
458+
Serializer::serialize_string(build_table, out);
459+
Serializer::serialize_string(probe_table, out);
460+
Serializer::serialize_string(probe_key_col, out);
461+
462+
// Serialize filter data (blob)
463+
const auto filter_len = static_cast<uint32_t>(filter_data.size());
464+
const size_t off = out.size();
465+
out.resize(off + Serializer::VAL_SIZE_32);
466+
std::memcpy(out.data() + off, &filter_len, Serializer::VAL_SIZE_32);
467+
out.insert(out.end(), filter_data.begin(), filter_data.end());
468+
469+
// Serialize metadata using fixed-width temporaries
470+
uint64_t tmp_expected = static_cast<uint64_t>(expected_elements);
471+
uint8_t tmp_hashes = static_cast<uint8_t>(num_hashes);
472+
const size_t off2 = out.size();
473+
out.resize(off2 + 9); // 8 bytes for expected_elements + 1 for num_hashes
474+
std::memcpy(out.data() + off2, &tmp_expected, 8);
475+
out[off2 + 8] = tmp_hashes;
476+
return out;
477+
}
478+
479+
static BloomFilterArgs deserialize(const std::vector<uint8_t>& in) {
480+
BloomFilterArgs args;
481+
size_t offset = 0;
482+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
483+
args.build_table = Serializer::deserialize_string(in.data(), offset, in.size());
484+
args.probe_table = Serializer::deserialize_string(in.data(), offset, in.size());
485+
args.probe_key_col = Serializer::deserialize_string(in.data(), offset, in.size());
486+
487+
uint32_t filter_len = 0;
488+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
489+
std::memcpy(&filter_len, in.data() + offset, Serializer::VAL_SIZE_32);
490+
offset += Serializer::VAL_SIZE_32;
491+
}
492+
if (offset + filter_len <= in.size()) {
493+
args.filter_data.resize(filter_len);
494+
std::memcpy(args.filter_data.data(), in.data() + offset, filter_len);
495+
offset += filter_len;
496+
}
497+
498+
// Deserialize metadata using fixed-width temporaries
499+
if (offset + 9 <= in.size()) {
500+
uint64_t tmp_expected = 0;
501+
std::memcpy(&tmp_expected, in.data() + offset, 8);
502+
args.expected_elements = static_cast<size_t>(tmp_expected);
503+
offset += 8;
504+
args.num_hashes = static_cast<size_t>(in[offset]);
505+
}
506+
return args;
507+
}
508+
};
509+
442510
/**
443511
* @brief Arguments for TxnPrepare/Commit/Abort RPC
444512
*/

0 commit comments

Comments
 (0)