Skip to content

Commit a24c986

Browse files
committed
test: add bloom filter tests and update documentation
- Add 10 unit tests in tests/bloom_filter_test.cpp - Test BloomFilterArgs serialization round-trip - Test ClusterManager bloom filter storage operations - Test bloom filter application logic (PushData simulation) - Update PHASE_6_DISTRIBUTED_JOIN.md with bloom filter docs - Update docs/phases/README.md with bloom filter feature - Update SQLITE_COMPARISON.md with Section 7: Bloom Filter Optimization - Add bloom_filter.cpp and bloom_filter_tests to CMakeLists.txt
1 parent d9a1bd8 commit a24c986

5 files changed

Lines changed: 327 additions & 1 deletion

File tree

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ set(CORE_SOURCES
7272
src/distributed/raft_group.cpp
7373
src/distributed/raft_manager.cpp
7474
src/distributed/distributed_executor.cpp
75+
src/common/bloom_filter.cpp
7576
src/storage/columnar_table.cpp
7677
)
7778

@@ -117,6 +118,7 @@ if(BUILD_TESTS)
117118
add_cloudsql_test(catalog_coverage_tests tests/catalog_coverage_tests.cpp)
118119
add_cloudsql_test(transaction_coverage_tests tests/transaction_coverage_tests.cpp)
119120
add_cloudsql_test(utils_coverage_tests tests/utils_coverage_tests.cpp)
121+
add_cloudsql_test(bloom_filter_tests tests/bloom_filter_test.cpp)
120122
add_cloudsql_test(cloudSQL_tests tests/cloudSQL_tests.cpp)
121123
add_cloudsql_test(server_tests tests/server_tests.cpp)
122124
add_cloudsql_test(statement_tests tests/statement_tests.cpp)

docs/performance/SQLITE_COMPARISON.md

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,53 @@ We addressed the gaps via the following optimizations:
3939
2. **Pinned Page Iteration**: Modifying our `HeapTable::Iterator` to hold pages pinned across slot iteration avoids repetitive atomic checks and LRU updates per-row.
4040
3. **Batch Insert Mode**: Skipping single-row undo logs and exclusive locks to exploit pure in-memory bump allocation. This drove the `INSERT` speedup well past SQLite limits, as we write raw tuples uninterrupted.
4141

42-
## 6. Future Roadmap
42+
## 6. Post-Optimization Enhancements
43+
We addressed the gaps via the following optimizations:
44+
1. **Buffer Pool Bypass (`fetch_page_by_id`)**: Reduced global std::mutex latch contention by explicitly caching ID lookups, yielding a ~30% improvement in scan logic.
45+
2. **Pinned Page Iteration**: Modifying our `HeapTable::Iterator` to hold pages pinned across slot iteration avoids repetitive atomic checks and LRU updates per-row.
46+
3. **Batch Insert Mode**: Skipping single-row undo logs and exclusive locks to exploit pure in-memory bump allocation. This drove the `INSERT` speedup well past SQLite limits, as we write raw tuples uninterrupted.
47+
48+
## 7. Distributed Join Optimization: Bloom Filters
49+
50+
### Problem
51+
Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage.
52+
53+
### Solution: Bloom Filter Integration
54+
Implemented bloom filters to filter tuples at the source before network transmission:
55+
- **One-sided bloom filter**: Built from the inner/right table, applied to filter the outer/left table
56+
- **Distributed construction**: Each data node builds bloom filter locally during its scan phase
57+
- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes
58+
59+
### Architecture
60+
```
61+
[Phase 1: Shuffle Left] [Phase 2: Shuffle Right]
62+
| |
63+
v v
64+
Build local bloom Apply bloom filter
65+
from join keys before buffering
66+
| |
67+
+---- BloomFilterPush ----->---+
68+
(filter metadata) |
69+
v
70+
Filtered tuples buffered
71+
```
72+
73+
### Key Components
74+
| Component | Location | Purpose |
75+
|-----------|----------|---------|
76+
| `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
77+
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
78+
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
79+
| `PushData` handler | `src/main.cpp` | Applies bloom filter before buffering |
80+
| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
81+
82+
### Test Coverage
83+
- 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic
84+
- Tests located in `tests/bloom_filter_test.cpp`
85+
86+
## 8. Future Roadmap
4387
With the scan gap closed, our focus shifts to higher-level analytical throughput:
4488
* **Stage 1: SIMD-Accelerated Filtering**: Utilize AVX-512/NEON instructions to filter multiple rows in a single CPU cycle.
4589
* **Stage 2: Vectorized Execution**: Move from row-at-a-time `TupleView` to batch-at-a-time `VectorBatch` processing.
4690
* **Stage 3: Columnar Storage**: Transition from row-oriented heap files to columnar persistence for extreme analytical scanning.
91+
* **Stage 4: Distributed Hash Join**: Enhance the single `HashJoinOperator` with parallel partitioned hash join for multi-node execution.

docs/phases/PHASE_6_DISTRIBUTED_JOIN.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Introduced isolated staging areas for inter-node data movement.
1414
Developed a dedicated binary protocol for efficient data redistribution.
1515
- **ShuffleFragment**: Metadata describing the fragment being pushed (target context, source node, schema).
1616
- **PushData**: High-speed binary payload containing the actual tuple data for the shuffle phase.
17+
- **BloomFilterPush**: Bloom filter metadata broadcast to enable tuple filtering before network transmission.
1718

1819
### 3. Two-Phase Join Orchestration (`distributed/distributed_executor.cpp`)
1920
Implemented the control logic for distributed shuffle joins.
@@ -24,9 +25,17 @@ Implemented the control logic for distributed shuffle joins.
2425
Seamlessly integrated shuffle buffers into the Volcano execution model.
2526
- **Vectorized Buffering**: Optimized the `BufferScanOperator` to handle large volumes of redistributed data with minimal overhead.
2627

28+
### 5. Bloom Filter Optimization (`common/bloom_filter.hpp`)
29+
Added probabilistic filtering to reduce network traffic in shuffle joins.
30+
- **MurmurHash3-based BloomFilter**: Configurable false positive rate (default 1%) with optimal bit count and hash function calculation.
31+
- **Filter Construction**: Built during Phase 1 scan, stored in `ClusterManager` per context.
32+
- **Filter Application**: `PushData` handler checks `might_contain()` before buffering, skipping tuples that will definitely not match.
33+
2734
## Lessons Learned
2835
- Shuffle joins significantly reduce network traffic compared to broadcast joins for large-to-large table joins.
2936
- Fine-grained locking in the shuffle buffers is critical for maintaining high throughput during the redistribution phase.
37+
- Bloom filters provide significant network traffic reduction when join selectivity is low, at the cost of a small false positive rate (typically <1%).
3038

3139
## Status: 100% Test Pass
3240
Verified the end-to-end shuffle join flow, including multi-node data movement and final result merging, through automated integration tests.
41+
- 10 unit tests for bloom filter implementation and integration (`tests/bloom_filter_test.cpp`)

docs/phases/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ This directory contains the technical documentation for the lifecycle of the clo
4141
- Context-aware Shuffle infrastructure in `ClusterManager`.
4242
- Implementation of `ShuffleFragment` and `PushData` RPC protocols.
4343
- Two-phase Shuffle Join orchestration in `DistributedExecutor`.
44+
- **Bloom Filter Optimization**: Probabilistic tuple filtering to reduce network traffic in shuffle joins.
4445

4546
### [Phase 7: Replication & High Availability](./PHASE_7_REPLICATION_HA.md)
4647
**Focus**: Fault Tolerance & Data Redundancy.

tests/bloom_filter_test.cpp

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/**
2+
* @file bloom_filter_test.cpp
3+
* @brief Unit tests for BloomFilter implementation
4+
*/
5+
6+
#include <gtest/gtest.h>
7+
8+
#include <vector>
9+
10+
#include "common/bloom_filter.hpp"
11+
#include "common/cluster_manager.hpp"
12+
#include "common/value.hpp"
13+
#include "executor/types.hpp"
14+
#include "network/rpc_message.hpp"
15+
16+
using namespace cloudsql::common;
17+
using namespace cloudsql::network;
18+
using namespace cloudsql::cluster;
19+
20+
namespace {
21+
22+
/**
23+
* @brief Tests basic bloom filter insertion and membership.
24+
*/
25+
TEST(BloomFilterTests, BasicInsertAndQuery) {
26+
BloomFilter bf(100); // Expect 100 elements
27+
28+
Value v1 = Value::make_int64(42);
29+
Value v2 = Value::make_int64(100);
30+
Value v3 = Value::make_text("hello");
31+
32+
bf.insert(v1);
33+
bf.insert(v2);
34+
bf.insert(v3);
35+
36+
// All inserted values should be found
37+
EXPECT_TRUE(bf.might_contain(v1));
38+
EXPECT_TRUE(bf.might_contain(v2));
39+
EXPECT_TRUE(bf.might_contain(v3));
40+
41+
// Non-inserted values might or might not be found (false positive possible)
42+
// But with 100 elements in a properly sized filter, probability is low
43+
}
44+
45+
/**
46+
* @brief Tests that values not inserted return false.
47+
*/
48+
TEST(BloomFilterTests, NonInsertedValues) {
49+
BloomFilter bf(1000); // Large filter, low false positive rate
50+
51+
Value v1 = Value::make_int64(999);
52+
Value v2 = Value::make_text("nonexistent");
53+
54+
// Not inserted, should definitely not be found
55+
EXPECT_FALSE(bf.might_contain(v1));
56+
EXPECT_FALSE(bf.might_contain(v2));
57+
}
58+
59+
/**
60+
* @brief Tests serialization and deserialization.
61+
*/
62+
TEST(BloomFilterTests, SerializationRoundTrip) {
63+
BloomFilter bf(50);
64+
65+
// Insert some values
66+
for (int i = 0; i < 25; ++i) {
67+
bf.insert(Value::make_int64(i));
68+
}
69+
for (int i = 100; i < 125; ++i) {
70+
bf.insert(Value::make_text("text_" + std::to_string(i)));
71+
}
72+
73+
// Serialize
74+
std::vector<uint8_t> data = bf.serialize();
75+
EXPECT_FALSE(data.empty());
76+
77+
// Deserialize
78+
BloomFilter bf2(data.data(), data.size());
79+
80+
// Check metadata
81+
EXPECT_EQ(bf.num_hashes(), bf2.num_hashes());
82+
83+
// Check inserted values are found
84+
for (int i = 0; i < 25; ++i) {
85+
EXPECT_TRUE(bf2.might_contain(Value::make_int64(i)));
86+
}
87+
for (int i = 100; i < 125; ++i) {
88+
EXPECT_TRUE(bf2.might_contain(Value::make_text("text_" + std::to_string(i))));
89+
}
90+
}
91+
92+
/**
93+
* @brief Tests false positive rate with many insertions.
94+
*/
95+
TEST(BloomFilterTests, FalsePositiveRate) {
96+
BloomFilter bf(1000); // 1000 expected elements
97+
98+
// Insert 500 values
99+
for (int i = 0; i < 500; ++i) {
100+
bf.insert(Value::make_int64(i));
101+
}
102+
103+
// Check 1000 non-inserted values and count false positives
104+
int false_positives = 0;
105+
for (int i = 500; i < 1500; ++i) {
106+
if (bf.might_contain(Value::make_int64(i))) {
107+
++false_positives;
108+
}
109+
}
110+
111+
// With 1% target FPR, we expect roughly 10 false positives out of 1000
112+
// Allow some margin - shouldn't be more than 5% (50)
113+
EXPECT_LT(false_positives, 50);
114+
}
115+
116+
/**
117+
* @brief Tests empty bloom filter.
118+
*/
119+
TEST(BloomFilterTests, EmptyFilter) {
120+
BloomFilter bf(1); // Minimal filter
121+
122+
// Nothing inserted, nothing should be found
123+
EXPECT_FALSE(bf.might_contain(Value::make_int64(1)));
124+
EXPECT_FALSE(bf.might_contain(Value::make_text("test")));
125+
}
126+
127+
/**
128+
* @brief Tests that duplicate insertions don't cause issues.
129+
*/
130+
TEST(BloomFilterTests, DuplicateInsertions) {
131+
BloomFilter bf(100);
132+
133+
Value v = Value::make_int64(42);
134+
135+
bf.insert(v);
136+
bf.insert(v);
137+
bf.insert(v);
138+
139+
// Should still be found
140+
EXPECT_TRUE(bf.might_contain(v));
141+
}
142+
143+
/**
144+
* @brief Tests different value types.
145+
*/
146+
TEST(BloomFilterTests, DifferentValueTypes) {
147+
BloomFilter bf(100);
148+
149+
bf.insert(Value::make_int64(1));
150+
bf.insert(Value::make_int64(2));
151+
bf.insert(Value::make_float64(3.14));
152+
bf.insert(Value::make_text("string"));
153+
bf.insert(Value::make_bool(true));
154+
155+
EXPECT_TRUE(bf.might_contain(Value::make_int64(1)));
156+
EXPECT_TRUE(bf.might_contain(Value::make_int64(2)));
157+
EXPECT_TRUE(bf.might_contain(Value::make_float64(3.14)));
158+
EXPECT_TRUE(bf.might_contain(Value::make_text("string")));
159+
EXPECT_TRUE(bf.might_contain(Value::make_bool(true)));
160+
161+
// Non-inserted
162+
EXPECT_FALSE(bf.might_contain(Value::make_int64(999)));
163+
EXPECT_FALSE(bf.might_contain(Value::make_text("not inserted")));
164+
}
165+
166+
/**
167+
* @brief Tests BloomFilterArgs serialization round-trip.
168+
*/
169+
TEST(BloomFilterTests, BloomFilterArgsSerialization) {
170+
BloomFilterArgs args;
171+
args.context_id = "ctx_123";
172+
args.build_table = "users";
173+
args.probe_table = "orders";
174+
args.probe_key_col = "user_id";
175+
args.filter_data = {0x01, 0x02, 0x03};
176+
args.expected_elements = 1000;
177+
args.num_hashes = 4;
178+
179+
auto serialized = args.serialize();
180+
auto deserialized = BloomFilterArgs::deserialize(serialized);
181+
182+
EXPECT_EQ(args.context_id, deserialized.context_id);
183+
EXPECT_EQ(args.build_table, deserialized.build_table);
184+
EXPECT_EQ(args.probe_table, deserialized.probe_table);
185+
EXPECT_EQ(args.probe_key_col, deserialized.probe_key_col);
186+
EXPECT_EQ(args.expected_elements, deserialized.expected_elements);
187+
EXPECT_EQ(args.num_hashes, deserialized.num_hashes);
188+
ASSERT_EQ(args.filter_data.size(), deserialized.filter_data.size());
189+
EXPECT_EQ(args.filter_data, deserialized.filter_data);
190+
}
191+
192+
/**
193+
* @brief Tests ClusterManager bloom filter storage operations.
194+
*/
195+
TEST(BloomFilterTests, ClusterManagerBloomFilterStorage) {
196+
ClusterManager cm(nullptr);
197+
198+
// Create a real bloom filter and serialize it
199+
BloomFilter original(100);
200+
original.insert(Value::make_int64(10));
201+
original.insert(Value::make_int64(20));
202+
auto filter_data = original.serialize();
203+
204+
// Test set_bloom_filter and has_bloom_filter
205+
cm.set_bloom_filter("ctx1", "table_build", "table_probe", "key_col",
206+
filter_data, original.expected_elements(), original.num_hashes());
207+
EXPECT_TRUE(cm.has_bloom_filter("ctx1"));
208+
209+
// Test get_bloom_filter reconstructs correctly
210+
auto bf = cm.get_bloom_filter("ctx1");
211+
EXPECT_EQ(bf.expected_elements(), original.expected_elements());
212+
EXPECT_EQ(bf.num_hashes(), original.num_hashes());
213+
214+
// Test that inserted values are found in reconstructed filter
215+
EXPECT_TRUE(bf.might_contain(Value::make_int64(10)));
216+
EXPECT_TRUE(bf.might_contain(Value::make_int64(20)));
217+
218+
// Test non-existent context
219+
EXPECT_FALSE(cm.has_bloom_filter("nonexistent"));
220+
221+
// Test get_probe_table and get_probe_key_col
222+
cm.set_bloom_filter("ctx2", "build_t", "probe_t", "col_x", filter_data, 500, 3);
223+
EXPECT_EQ(cm.get_probe_table("ctx2"), "probe_t");
224+
EXPECT_EQ(cm.get_probe_key_col("ctx2"), "col_x");
225+
226+
// Test clear_bloom_filter
227+
cm.clear_bloom_filter("ctx1");
228+
EXPECT_FALSE(cm.has_bloom_filter("ctx1"));
229+
}
230+
231+
/**
232+
* @brief Tests bloom filter application logic (simulates PushData handler behavior).
233+
*/
234+
TEST(BloomFilterTests, BloomFilterApplicationLogic) {
235+
// Build bloom filter with known keys
236+
BloomFilter bf(100);
237+
bf.insert(Value::make_int64(10));
238+
bf.insert(Value::make_int64(20));
239+
bf.insert(Value::make_int64(30));
240+
241+
// Simulate tuple filtering (as done in PushData handler)
242+
std::vector<cloudsql::executor::Tuple> tuples;
243+
tuples.push_back(cloudsql::executor::Tuple(std::initializer_list<Value>{Value::make_int64(10)})); // match
244+
tuples.push_back(cloudsql::executor::Tuple(std::initializer_list<Value>{Value::make_int64(15)})); // no match
245+
tuples.push_back(cloudsql::executor::Tuple(std::initializer_list<Value>{Value::make_int64(20)})); // match
246+
tuples.push_back(cloudsql::executor::Tuple(std::initializer_list<Value>{Value::make_int64(99)})); // no match
247+
248+
std::vector<cloudsql::executor::Tuple> filtered;
249+
for (auto& row : tuples) {
250+
if (bf.might_contain(row.get(0))) {
251+
filtered.push_back(std::move(row));
252+
}
253+
}
254+
255+
// Should have 2 matches (10 and 20)
256+
EXPECT_EQ(filtered.size(), 2);
257+
258+
// Verify the filtered values (matches may be in different order due to move)
259+
bool found_10 = false;
260+
bool found_20 = false;
261+
for (auto& row : filtered) {
262+
if (row.get(0) == Value::make_int64(10)) found_10 = true;
263+
if (row.get(0) == Value::make_int64(20)) found_20 = true;
264+
}
265+
EXPECT_TRUE(found_10);
266+
EXPECT_TRUE(found_20);
267+
}
268+
269+
} // namespace

0 commit comments

Comments
 (0)