Skip to content

Commit 6f87a50

Browse files
committed
fix: add BloomFilterPush handler to ShuffleJoinOrchestration test
The bloom filter integration added a BloomFilterPush RPC call during distributed join execution, but the test mock server had no handler for it. This caused client.call() to hang indefinitely waiting for a response that never came. Root cause: distributed_executor.cpp sends BloomFilterPush after Phase 1 shuffle, but the test only set up handlers for ShuffleFragment, PushData, and ExecuteFragment. Fix: Add BloomFilterPush handler to the test mock servers.
1 parent 52fc398 commit 6f87a50

1 file changed

Lines changed: 5 additions & 0 deletions

File tree

tests/distributed_tests.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ TEST(DistributedExecutorTests, ShuffleJoinOrchestration) {
303303
std::atomic<int> shuffle_calls{0};
304304
std::atomic<int> push_calls{0};
305305
std::atomic<int> fragment_calls{0};
306+
std::atomic<int> bloom_filter_calls{0};
306307

307308
auto handler = [&](const RpcHeader& h, const std::vector<uint8_t>& p, int fd) {
308309
(void)p;
@@ -315,6 +316,8 @@ TEST(DistributedExecutorTests, ShuffleJoinOrchestration) {
315316
push_calls++;
316317
} else if (h.type == RpcType::ExecuteFragment) {
317318
fragment_calls++;
319+
} else if (h.type == RpcType::BloomFilterPush) {
320+
bloom_filter_calls++;
318321
}
319322

320323
auto resp_p = reply.serialize();
@@ -330,9 +333,11 @@ TEST(DistributedExecutorTests, ShuffleJoinOrchestration) {
330333
node1.set_handler(RpcType::ShuffleFragment, handler);
331334
node1.set_handler(RpcType::PushData, handler);
332335
node1.set_handler(RpcType::ExecuteFragment, handler);
336+
node1.set_handler(RpcType::BloomFilterPush, handler);
333337
node2.set_handler(RpcType::ShuffleFragment, handler);
334338
node2.set_handler(RpcType::PushData, handler);
335339
node2.set_handler(RpcType::ExecuteFragment, handler);
340+
node2.set_handler(RpcType::BloomFilterPush, handler);
336341

337342
ASSERT_TRUE(node1.start());
338343
ASSERT_TRUE(node2.start());

0 commit comments

Comments
 (0)