Skip to content

Commit e35159e

Browse files
committed
feat: support RIGHT/FULL JOIN with bloom filter skip
- Skip bloom filter for RIGHT/FULL joins to prevent false negatives - Add Phase 3-5 orchestration for FULL JOIN unmatched LEFT rows - Add NULL-padding for unmatched right rows with left-side NULLs - Add integration tests for bloom filter skip and Phase 3-5 - Allow LEFT joins in distributed shuffle join The bloom filter can cause false negatives (rows filtered when they shouldn't be), which violates outer join semantics. For RIGHT/FULL joins, all right rows must be sent to ensure correct results. NOTE: Phase 3-5 is disabled for RIGHT JOIN because the local executor on each data node already correctly handles unmatched right rows via the hash table scan. Phase 3-5 is only needed for FULL JOIN to collect unmatched LEFT rows.
1 parent a1f9c0d commit e35159e

7 files changed

Lines changed: 826 additions & 46 deletions

File tree

include/common/cluster_manager.hpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,73 @@ class ClusterManager {
337337
local_num_hashes_map_.erase(context_id);
338338
}
339339

340+
/**
341+
* @brief Store local right table rows for outer join processing
342+
* Called during Phase 2 shuffle when sending right table rows to other nodes
343+
*/
344+
void set_local_right_rows(const std::string& context_id, const std::string& table_name,
345+
std::vector<executor::Tuple> rows) {
346+
const std::scoped_lock<std::mutex> lock(mutex_);
347+
local_right_table_rows_[context_id][table_name] = std::move(rows);
348+
}
349+
350+
/**
351+
* @brief Get stored local right table rows
352+
*/
353+
[[nodiscard]] std::vector<executor::Tuple> get_local_right_rows(
354+
const std::string& context_id, const std::string& table_name) const {
355+
const std::scoped_lock<std::mutex> lock(mutex_);
356+
auto ctx_it = local_right_table_rows_.find(context_id);
357+
if (ctx_it != local_right_table_rows_.end()) {
358+
auto table_it = ctx_it->second.find(table_name);
359+
if (table_it != ctx_it->second.end()) {
360+
return table_it->second;
361+
}
362+
}
363+
return {};
364+
}
365+
366+
/**
367+
* @brief Clear local right table rows for a context
368+
*/
369+
void clear_local_right_rows(const std::string& context_id) {
370+
const std::scoped_lock<std::mutex> lock(mutex_);
371+
local_right_table_rows_.erase(context_id);
372+
}
373+
374+
/**
375+
* @brief Store unmatched rows for a context (used by outer join processing)
376+
*/
377+
void set_unmatched_rows(const std::string& context_id, const std::string& table_name,
378+
std::vector<executor::Tuple> rows) {
379+
const std::scoped_lock<std::mutex> lock(mutex_);
380+
unmatched_rows_[context_id][table_name] = std::move(rows);
381+
}
382+
383+
/**
384+
* @brief Get stored unmatched rows for a context
385+
*/
386+
[[nodiscard]] std::vector<executor::Tuple> get_unmatched_rows(
387+
const std::string& context_id, const std::string& table_name) const {
388+
const std::scoped_lock<std::mutex> lock(mutex_);
389+
auto ctx_it = unmatched_rows_.find(context_id);
390+
if (ctx_it != unmatched_rows_.end()) {
391+
auto table_it = ctx_it->second.find(table_name);
392+
if (table_it != ctx_it->second.end()) {
393+
return table_it->second;
394+
}
395+
}
396+
return {};
397+
}
398+
399+
/**
400+
* @brief Clear unmatched rows for a context
401+
*/
402+
void clear_unmatched_rows(const std::string& context_id) {
403+
const std::scoped_lock<std::mutex> lock(mutex_);
404+
unmatched_rows_.erase(context_id);
405+
}
406+
340407
private:
341408
/**
342409
* @brief Stored bloom filter data for a context
@@ -365,6 +432,12 @@ class ClusterManager {
365432
std::unordered_map<std::string, std::vector<uint8_t>> local_bloom_bits_;
366433
std::unordered_map<std::string, size_t> local_expected_elements_map_;
367434
std::unordered_map<std::string, size_t> local_num_hashes_map_;
435+
/* context_id -> table_name -> local right table rows for outer join tracking */
436+
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
437+
local_right_table_rows_;
438+
/* context_id -> table_name -> unmatched rows for outer join NULL-padding */
439+
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
440+
unmatched_rows_;
368441
mutable std::mutex mutex_;
369442
};
370443

include/executor/operator.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,18 @@ class HashJoinOperator : public Operator {
358358

359359
void set_memory_resource(std::pmr::memory_resource* mr) override;
360360
void set_params(const std::vector<common::Value>* params) override;
361+
362+
/**
363+
* @brief Get unmatched right rows after join execution
364+
* @return Vector of tuples - the right-side rows that had no match
365+
*/
366+
[[nodiscard]] std::vector<Tuple> get_unmatched_right_rows() const;
367+
368+
/**
369+
* @brief Get join key values of unmatched right rows
370+
* @return Vector of strings - the join key values for unmatched right rows
371+
*/
372+
[[nodiscard]] std::vector<std::string> get_unmatched_right_keys() const;
361373
};
362374

363375
/**

include/network/rpc_message.hpp

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ enum class RpcType : uint8_t {
3535
ShuffleFragment = 10,
3636
BloomFilterPush = 11,
3737
BloomFilterBits = 12,
38+
UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join
39+
UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding
40+
FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node
3841
Error = 255
3942
};
4043

@@ -566,6 +569,136 @@ struct BloomFilterBitsArgs {
566569
}
567570
};
568571

572+
/**
573+
* @brief Arguments for UnmatchedRowsReport RPC
574+
* @note Data node reports unmatched right row keys to coordinator after local join
575+
*/
576+
struct UnmatchedRowsReportArgs {
577+
std::string context_id;
578+
std::string right_table;
579+
std::string join_key_col; // Which column was the join key
580+
std::vector<std::string> unmatched_keys; // Key values that had no match
581+
uint32_t left_column_count = 0; // Number of left table columns for NULL-padding
582+
583+
[[nodiscard]] std::vector<uint8_t> serialize() const {
584+
std::vector<uint8_t> out;
585+
Serializer::serialize_string(context_id, out);
586+
Serializer::serialize_string(right_table, out);
587+
Serializer::serialize_string(join_key_col, out);
588+
589+
// Serialize left column count
590+
const uint32_t left_count = left_column_count;
591+
const size_t lc_off = out.size();
592+
out.resize(lc_off + Serializer::VAL_SIZE_32);
593+
std::memcpy(out.data() + lc_off, &left_count, Serializer::VAL_SIZE_32);
594+
595+
// Serialize unmatched keys count
596+
const uint32_t count = static_cast<uint32_t>(unmatched_keys.size());
597+
const size_t off = out.size();
598+
out.resize(off + Serializer::VAL_SIZE_32);
599+
std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32);
600+
601+
// Serialize each key
602+
for (const auto& key : unmatched_keys) {
603+
Serializer::serialize_string(key, out);
604+
}
605+
return out;
606+
}
607+
608+
static UnmatchedRowsReportArgs deserialize(const std::vector<uint8_t>& in) {
609+
UnmatchedRowsReportArgs args;
610+
size_t offset = 0;
611+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
612+
args.right_table = Serializer::deserialize_string(in.data(), offset, in.size());
613+
args.join_key_col = Serializer::deserialize_string(in.data(), offset, in.size());
614+
615+
// Deserialize left column count
616+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
617+
std::memcpy(&args.left_column_count, in.data() + offset, Serializer::VAL_SIZE_32);
618+
offset += Serializer::VAL_SIZE_32;
619+
}
620+
621+
uint32_t count = 0;
622+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
623+
std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32);
624+
offset += Serializer::VAL_SIZE_32;
625+
}
626+
627+
for (uint32_t i = 0; i < count; ++i) {
628+
args.unmatched_keys.push_back(
629+
Serializer::deserialize_string(in.data(), offset, in.size()));
630+
}
631+
return args;
632+
}
633+
};
634+
635+
/**
636+
* @brief Arguments for UnmatchedRowsPush RPC
637+
* @note Coordinator sends unmatched rows to data nodes for NULL-padding
638+
*/
639+
struct UnmatchedRowsPushArgs {
640+
std::string context_id;
641+
std::vector<executor::Tuple> unmatched_rows; // Right rows needing NULL padding
642+
643+
[[nodiscard]] std::vector<uint8_t> serialize() const {
644+
std::vector<uint8_t> out;
645+
Serializer::serialize_string(context_id, out);
646+
647+
// Serialize row count
648+
const uint32_t count = static_cast<uint32_t>(unmatched_rows.size());
649+
const size_t off = out.size();
650+
out.resize(off + Serializer::VAL_SIZE_32);
651+
std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32);
652+
653+
// Serialize each row
654+
for (const auto& row : unmatched_rows) {
655+
Serializer::serialize_tuple(row, out);
656+
}
657+
return out;
658+
}
659+
660+
static UnmatchedRowsPushArgs deserialize(const std::vector<uint8_t>& in) {
661+
UnmatchedRowsPushArgs args;
662+
size_t offset = 0;
663+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
664+
665+
uint32_t count = 0;
666+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
667+
std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32);
668+
offset += Serializer::VAL_SIZE_32;
669+
}
670+
671+
for (uint32_t i = 0; i < count; ++i) {
672+
args.unmatched_rows.push_back(Serializer::deserialize_tuple(in.data(), offset, in.size()));
673+
}
674+
return args;
675+
}
676+
};
677+
678+
/**
679+
* @brief Arguments for FetchUnmatchedRows RPC
680+
* @note Coordinator fetches stored unmatched rows from a data node
681+
*/
682+
struct FetchUnmatchedRowsArgs {
683+
std::string context_id;
684+
std::string table_name;
685+
686+
[[nodiscard]] std::vector<uint8_t> serialize() const {
687+
std::vector<uint8_t> out;
688+
Serializer::serialize_string(context_id, out);
689+
Serializer::serialize_string(table_name, out);
690+
return out;
691+
}
692+
693+
static FetchUnmatchedRowsArgs deserialize(const std::vector<uint8_t>& in) {
694+
FetchUnmatchedRowsArgs args;
695+
size_t offset = 0;
696+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
697+
args.table_name = Serializer::deserialize_string(in.data(), offset, in.size());
698+
return args;
699+
}
700+
};
701+
569702
/**
570703
* @brief Arguments for TxnPrepare/Commit/Abort RPC
571704
*/

0 commit comments

Comments
 (0)