Skip to content

Commit b0da5b6

Browse files
committed
fix: distributed executor, main.cpp, tests and docs
- Fix Phase 1 error to include node.id and reply.error_msg - Move bloom filtering to sender side in ShuffleFragment handler - Remove receiver-side bloom check in PushData handler - Fix tests to only assert no-false-negative property - Remove duplicate section in SQLITE_COMPARISON.md
1 parent 2481607 commit b0da5b6

4 files changed

Lines changed: 54 additions & 68 deletions

File tree

docs/performance/SQLITE_COMPARISON.md

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,16 @@ We addressed the gaps via the following optimizations:
3939
2. **Pinned Page Iteration**: Modifying our `HeapTable::Iterator` to hold pages pinned across slot iteration avoids repetitive atomic checks and LRU updates per-row.
4040
3. **Batch Insert Mode**: Skipping single-row undo logs and exclusive locks to exploit pure in-memory bump allocation. This drove the `INSERT` speedup well past SQLite limits, as we write raw tuples uninterrupted.
4141

42-
## 6. Post-Optimization Enhancements
43-
We addressed the gaps via the following optimizations:
44-
1. **Buffer Pool Bypass (`fetch_page_by_id`)**: Reduced global std::mutex latch contention by explicitly caching ID lookups, yielding a ~30% improvement in scan logic.
45-
2. **Pinned Page Iteration**: Modifying our `HeapTable::Iterator` to hold pages pinned across slot iteration avoids repetitive atomic checks and LRU updates per-row.
46-
3. **Batch Insert Mode**: Skipping single-row undo logs and exclusive locks to exploit pure in-memory bump allocation. This drove the `INSERT` speedup well past SQLite limits, as we write raw tuples uninterrupted.
47-
48-
## 7. Distributed Join Optimization: Bloom Filters
42+
## 6. Distributed Join Optimization: Bloom Filters
4943

5044
### Problem
5145
Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage.
5246

5347
### Solution: Bloom Filter Integration
5448
Implemented bloom filters to filter tuples at the source before network transmission:
55-
- **One-sided bloom filter**: Built from the inner/right table, applied to filter the outer/left table
56-
- **Distributed construction**: Each data node builds bloom filter locally during its scan phase
57-
- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes
49+
- **One-sided bloom filter**: Built from the left/build table, applied to filter the right/probe table
50+
- **Distributed construction**: Each data node constructs its local bloom during the left/build scan phase
51+
- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes before the right/probe shuffle
5852

5953
### Architecture
6054
```

src/distributed/distributed_executor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,9 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
230230
}
231231
auto reply = network::QueryResultsReply::deserialize(resp);
232232
if (!reply.success) {
233-
phase1_success = false;
234-
break;
233+
QueryResult res;
234+
res.set_error("Shuffle failed on node " + node.id + ": " + reply.error_msg);
235+
return res;
235236
}
236237
}
237238

src/main.cpp

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -472,50 +472,9 @@ int main(int argc, char* argv[]) {
472472
(void)h;
473473
auto args = cloudsql::network::PushDataArgs::deserialize(p);
474474
if (cluster_manager != nullptr) {
475-
// Apply bloom filter if available for this context
476-
if (cluster_manager->has_bloom_filter(args.context_id)) {
477-
auto bloom = cluster_manager->get_bloom_filter(args.context_id);
478-
std::string probe_key_col =
479-
cluster_manager->get_probe_key_col(args.context_id);
480-
481-
// Get probe table schema to find key column index
482-
auto table_meta_opt = catalog->get_table_by_name(args.table_name);
483-
if (table_meta_opt.has_value() && !probe_key_col.empty()) {
484-
const auto* table_meta = table_meta_opt.value();
485-
size_t key_idx = static_cast<size_t>(-1);
486-
for (size_t i = 0; i < table_meta->columns.size(); ++i) {
487-
if (table_meta->columns[i].name == probe_key_col) {
488-
key_idx = i;
489-
break;
490-
}
491-
}
492-
493-
if (key_idx != static_cast<size_t>(-1)) {
494-
// Filter rows using bloom filter
495-
std::vector<cloudsql::executor::Tuple> filtered_rows;
496-
filtered_rows.reserve(args.rows.size());
497-
for (auto& row : args.rows) {
498-
if (bloom.might_contain(row.get(key_idx))) {
499-
filtered_rows.push_back(std::move(row));
500-
}
501-
}
502-
cluster_manager->buffer_shuffle_data(
503-
args.context_id, args.table_name,
504-
std::move(filtered_rows));
505-
} else {
506-
// Key column not found, buffer as-is
507-
cluster_manager->buffer_shuffle_data(
508-
args.context_id, args.table_name, std::move(args.rows));
509-
}
510-
} else {
511-
// No metadata, buffer as-is
512-
cluster_manager->buffer_shuffle_data(
513-
args.context_id, args.table_name, std::move(args.rows));
514-
}
515-
} else {
516-
cluster_manager->buffer_shuffle_data(
517-
args.context_id, args.table_name, std::move(args.rows));
518-
}
475+
// Receiver-side: buffer data as-is (bloom filtering done on sender)
476+
cluster_manager->buffer_shuffle_data(args.context_id, args.table_name,
477+
std::move(args.rows));
519478
}
520479

521480
cloudsql::network::QueryResultsReply reply;
@@ -630,10 +589,43 @@ int main(int argc, char* argv[]) {
630589
continue;
631590
}
632591

592+
// Apply bloom filter on sender side before sending
593+
std::vector<cloudsql::executor::Tuple> rows_to_send = std::move(rows);
594+
if (cluster_manager->has_bloom_filter(args.context_id)) {
595+
auto bloom = cluster_manager->get_bloom_filter(args.context_id);
596+
std::string probe_key_col = cluster_manager->get_probe_key_col(args.context_id);
597+
598+
if (!probe_key_col.empty()) {
599+
// Find key column index in current table
600+
auto table_meta_opt = catalog->get_table_by_name(args.table_name);
601+
if (table_meta_opt.has_value()) {
602+
const auto* table_meta = table_meta_opt.value();
603+
size_t key_idx = static_cast<size_t>(-1);
604+
for (size_t i = 0; i < table_meta->columns.size(); ++i) {
605+
if (table_meta->columns[i].name == probe_key_col) {
606+
key_idx = i;
607+
break;
608+
}
609+
}
610+
611+
if (key_idx != static_cast<size_t>(-1)) {
612+
std::vector<cloudsql::executor::Tuple> filtered;
613+
filtered.reserve(rows_to_send.size());
614+
for (auto& row : rows_to_send) {
615+
if (bloom.might_contain(row.get(key_idx))) {
616+
filtered.push_back(std::move(row));
617+
}
618+
}
619+
rows_to_send = std::move(filtered);
620+
}
621+
}
622+
}
623+
}
624+
633625
cloudsql::network::PushDataArgs push_args;
634626
push_args.context_id = args.context_id;
635627
push_args.table_name = args.table_name;
636-
push_args.rows = std::move(rows);
628+
push_args.rows = std::move(rows_to_send);
637629
std::vector<uint8_t> resp;
638630
if (!client.call(cloudsql::network::RpcType::PushData,
639631
push_args.serialize(), resp)) {

tests/bloom_filter_test.cpp

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,23 +145,20 @@ TEST(BloomFilterTests, DuplicateInsertions) {
145145
* @brief Tests different value types.
146146
*/
147147
TEST(BloomFilterTests, DifferentValueTypes) {
148-
BloomFilter bf(100);
148+
BloomFilter bf(1000); // Large filter to minimize false positives
149149

150150
bf.insert(Value::make_int64(1));
151151
bf.insert(Value::make_int64(2));
152152
bf.insert(Value::make_float64(3.14));
153153
bf.insert(Value::make_text("string"));
154154
bf.insert(Value::make_bool(true));
155155

156+
// Verify no-false-negative: inserted values must be found
156157
EXPECT_TRUE(bf.might_contain(Value::make_int64(1)));
157158
EXPECT_TRUE(bf.might_contain(Value::make_int64(2)));
158159
EXPECT_TRUE(bf.might_contain(Value::make_float64(3.14)));
159160
EXPECT_TRUE(bf.might_contain(Value::make_text("string")));
160161
EXPECT_TRUE(bf.might_contain(Value::make_bool(true)));
161-
162-
// Non-inserted
163-
EXPECT_FALSE(bf.might_contain(Value::make_int64(999)));
164-
EXPECT_FALSE(bf.might_contain(Value::make_text("not inserted")));
165162
}
166163

167164
/**
@@ -239,6 +236,11 @@ TEST(BloomFilterTests, BloomFilterApplicationLogic) {
239236
bf.insert(Value::make_int64(20));
240237
bf.insert(Value::make_int64(30));
241238

239+
// Verify no-false-negative: inserted values must be found via might_contain
240+
EXPECT_TRUE(bf.might_contain(Value::make_int64(10)));
241+
EXPECT_TRUE(bf.might_contain(Value::make_int64(20)));
242+
EXPECT_TRUE(bf.might_contain(Value::make_int64(30)));
243+
242244
// Simulate tuple filtering (as done in PushData handler)
243245
std::vector<cloudsql::executor::Tuple> tuples;
244246
tuples.push_back(
@@ -257,18 +259,15 @@ TEST(BloomFilterTests, BloomFilterApplicationLogic) {
257259
}
258260
}
259261

260-
// Should have 2 matches (10 and 20)
261-
EXPECT_EQ(filtered.size(), 2);
262-
263-
// Verify the filtered values (matches may be in different order due to move)
262+
// Verify found values in filtered list
264263
bool found_10 = false;
265264
bool found_20 = false;
266265
for (auto& row : filtered) {
267266
if (row.get(0) == Value::make_int64(10)) found_10 = true;
268267
if (row.get(0) == Value::make_int64(20)) found_20 = true;
269268
}
270-
EXPECT_TRUE(found_10);
271-
EXPECT_TRUE(found_20);
269+
EXPECT_TRUE(found_10); // Inserted value must be found
270+
EXPECT_TRUE(found_20); // Inserted value must be found
272271
}
273272

274273
} // namespace

0 commit comments

Comments
 (0)