Skip to content

Commit 3594088

Browse files
committed
feat: support RIGHT/FULL JOIN with bloom filter skip and Phase 3-5
- Skip bloom filter for RIGHT/FULL joins to prevent false negatives - Add Phase 3-5 orchestration to collect and redistribute unmatched rows - Add NULL-padding for unmatched right rows with left-side NULLs - Add integration tests for bloom filter skip and Phase 3-5 RPC calls - Allow LEFT joins in distributed shuffle join (regression from previous commit) The bloom filter can cause false negatives (rows filtered when they shouldn't be), which violates outer join semantics. For RIGHT/FULL joins, all right rows must be sent to ensure correct results. Phase 3-5 collects unmatched rows after the join and adds them back with proper NULL-padding.
1 parent a1f9c0d commit 3594088

7 files changed

Lines changed: 841 additions & 46 deletions

File tree

include/common/cluster_manager.hpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,73 @@ class ClusterManager {
337337
local_num_hashes_map_.erase(context_id);
338338
}
339339

340+
/**
341+
* @brief Store local right table rows for outer join processing
342+
* Called during Phase 2 shuffle when sending right table rows to other nodes
343+
*/
344+
void set_local_right_rows(const std::string& context_id, const std::string& table_name,
345+
std::vector<executor::Tuple> rows) {
346+
const std::scoped_lock<std::mutex> lock(mutex_);
347+
local_right_table_rows_[context_id][table_name] = std::move(rows);
348+
}
349+
350+
/**
351+
* @brief Get stored local right table rows
352+
*/
353+
[[nodiscard]] std::vector<executor::Tuple> get_local_right_rows(
354+
const std::string& context_id, const std::string& table_name) const {
355+
const std::scoped_lock<std::mutex> lock(mutex_);
356+
auto ctx_it = local_right_table_rows_.find(context_id);
357+
if (ctx_it != local_right_table_rows_.end()) {
358+
auto table_it = ctx_it->second.find(table_name);
359+
if (table_it != ctx_it->second.end()) {
360+
return table_it->second;
361+
}
362+
}
363+
return {};
364+
}
365+
366+
/**
367+
* @brief Clear local right table rows for a context
368+
*/
369+
void clear_local_right_rows(const std::string& context_id) {
370+
const std::scoped_lock<std::mutex> lock(mutex_);
371+
local_right_table_rows_.erase(context_id);
372+
}
373+
374+
/**
375+
* @brief Store unmatched rows for a context (used by outer join processing)
376+
*/
377+
void set_unmatched_rows(const std::string& context_id, const std::string& table_name,
378+
std::vector<executor::Tuple> rows) {
379+
const std::scoped_lock<std::mutex> lock(mutex_);
380+
unmatched_rows_[context_id][table_name] = std::move(rows);
381+
}
382+
383+
/**
384+
* @brief Get stored unmatched rows for a context
385+
*/
386+
[[nodiscard]] std::vector<executor::Tuple> get_unmatched_rows(
387+
const std::string& context_id, const std::string& table_name) const {
388+
const std::scoped_lock<std::mutex> lock(mutex_);
389+
auto ctx_it = unmatched_rows_.find(context_id);
390+
if (ctx_it != unmatched_rows_.end()) {
391+
auto table_it = ctx_it->second.find(table_name);
392+
if (table_it != ctx_it->second.end()) {
393+
return table_it->second;
394+
}
395+
}
396+
return {};
397+
}
398+
399+
/**
400+
* @brief Clear unmatched rows for a context
401+
*/
402+
void clear_unmatched_rows(const std::string& context_id) {
403+
const std::scoped_lock<std::mutex> lock(mutex_);
404+
unmatched_rows_.erase(context_id);
405+
}
406+
340407
private:
341408
/**
342409
* @brief Stored bloom filter data for a context
@@ -365,6 +432,12 @@ class ClusterManager {
365432
std::unordered_map<std::string, std::vector<uint8_t>> local_bloom_bits_;
366433
std::unordered_map<std::string, size_t> local_expected_elements_map_;
367434
std::unordered_map<std::string, size_t> local_num_hashes_map_;
435+
/* context_id -> table_name -> local right table rows for outer join tracking */
436+
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
437+
local_right_table_rows_;
438+
/* context_id -> table_name -> unmatched rows for outer join NULL-padding */
439+
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
440+
unmatched_rows_;
368441
mutable std::mutex mutex_;
369442
};
370443

include/executor/operator.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,18 @@ class HashJoinOperator : public Operator {
358358

359359
void set_memory_resource(std::pmr::memory_resource* mr) override;
360360
void set_params(const std::vector<common::Value>* params) override;
361+
362+
/**
363+
* @brief Get unmatched right rows after join execution
364+
* @return Vector of tuples - the right-side rows that had no match
365+
*/
366+
[[nodiscard]] std::vector<Tuple> get_unmatched_right_rows() const;
367+
368+
/**
369+
* @brief Get join key values of unmatched right rows
370+
* @return Vector of strings - the join key values for unmatched right rows
371+
*/
372+
[[nodiscard]] std::vector<std::string> get_unmatched_right_keys() const;
361373
};
362374

363375
/**

include/network/rpc_message.hpp

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ enum class RpcType : uint8_t {
3535
ShuffleFragment = 10,
3636
BloomFilterPush = 11,
3737
BloomFilterBits = 12,
38+
UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join
39+
UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding
40+
FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node
3841
Error = 255
3942
};
4043

@@ -566,6 +569,136 @@ struct BloomFilterBitsArgs {
566569
}
567570
};
568571

572+
/**
573+
* @brief Arguments for UnmatchedRowsReport RPC
574+
* @note Data node reports unmatched right row keys to coordinator after local join
575+
*/
576+
struct UnmatchedRowsReportArgs {
577+
std::string context_id;
578+
std::string right_table;
579+
std::string join_key_col; // Which column was the join key
580+
std::vector<std::string> unmatched_keys; // Key values that had no match
581+
uint32_t left_column_count = 0; // Number of left table columns for NULL-padding
582+
583+
[[nodiscard]] std::vector<uint8_t> serialize() const {
584+
std::vector<uint8_t> out;
585+
Serializer::serialize_string(context_id, out);
586+
Serializer::serialize_string(right_table, out);
587+
Serializer::serialize_string(join_key_col, out);
588+
589+
// Serialize left column count
590+
const uint32_t left_count = left_column_count;
591+
const size_t lc_off = out.size();
592+
out.resize(lc_off + Serializer::VAL_SIZE_32);
593+
std::memcpy(out.data() + lc_off, &left_count, Serializer::VAL_SIZE_32);
594+
595+
// Serialize unmatched keys count
596+
const uint32_t count = static_cast<uint32_t>(unmatched_keys.size());
597+
const size_t off = out.size();
598+
out.resize(off + Serializer::VAL_SIZE_32);
599+
std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32);
600+
601+
// Serialize each key
602+
for (const auto& key : unmatched_keys) {
603+
Serializer::serialize_string(key, out);
604+
}
605+
return out;
606+
}
607+
608+
static UnmatchedRowsReportArgs deserialize(const std::vector<uint8_t>& in) {
609+
UnmatchedRowsReportArgs args;
610+
size_t offset = 0;
611+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
612+
args.right_table = Serializer::deserialize_string(in.data(), offset, in.size());
613+
args.join_key_col = Serializer::deserialize_string(in.data(), offset, in.size());
614+
615+
// Deserialize left column count
616+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
617+
std::memcpy(&args.left_column_count, in.data() + offset, Serializer::VAL_SIZE_32);
618+
offset += Serializer::VAL_SIZE_32;
619+
}
620+
621+
uint32_t count = 0;
622+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
623+
std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32);
624+
offset += Serializer::VAL_SIZE_32;
625+
}
626+
627+
for (uint32_t i = 0; i < count; ++i) {
628+
args.unmatched_keys.push_back(
629+
Serializer::deserialize_string(in.data(), offset, in.size()));
630+
}
631+
return args;
632+
}
633+
};
634+
635+
/**
636+
* @brief Arguments for UnmatchedRowsPush RPC
637+
* @note Coordinator sends unmatched rows to data nodes for NULL-padding
638+
*/
639+
struct UnmatchedRowsPushArgs {
640+
std::string context_id;
641+
std::vector<executor::Tuple> unmatched_rows; // Right rows needing NULL padding
642+
643+
[[nodiscard]] std::vector<uint8_t> serialize() const {
644+
std::vector<uint8_t> out;
645+
Serializer::serialize_string(context_id, out);
646+
647+
// Serialize row count
648+
const uint32_t count = static_cast<uint32_t>(unmatched_rows.size());
649+
const size_t off = out.size();
650+
out.resize(off + Serializer::VAL_SIZE_32);
651+
std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32);
652+
653+
// Serialize each row
654+
for (const auto& row : unmatched_rows) {
655+
Serializer::serialize_tuple(row, out);
656+
}
657+
return out;
658+
}
659+
660+
static UnmatchedRowsPushArgs deserialize(const std::vector<uint8_t>& in) {
661+
UnmatchedRowsPushArgs args;
662+
size_t offset = 0;
663+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
664+
665+
uint32_t count = 0;
666+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
667+
std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32);
668+
offset += Serializer::VAL_SIZE_32;
669+
}
670+
671+
for (uint32_t i = 0; i < count; ++i) {
672+
args.unmatched_rows.push_back(Serializer::deserialize_tuple(in.data(), offset, in.size()));
673+
}
674+
return args;
675+
}
676+
};
677+
678+
/**
679+
* @brief Arguments for FetchUnmatchedRows RPC
680+
* @note Coordinator fetches stored unmatched rows from a data node
681+
*/
682+
struct FetchUnmatchedRowsArgs {
683+
std::string context_id;
684+
std::string table_name;
685+
686+
[[nodiscard]] std::vector<uint8_t> serialize() const {
687+
std::vector<uint8_t> out;
688+
Serializer::serialize_string(context_id, out);
689+
Serializer::serialize_string(table_name, out);
690+
return out;
691+
}
692+
693+
static FetchUnmatchedRowsArgs deserialize(const std::vector<uint8_t>& in) {
694+
FetchUnmatchedRowsArgs args;
695+
size_t offset = 0;
696+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
697+
args.table_name = Serializer::deserialize_string(in.data(), offset, in.size());
698+
return args;
699+
}
700+
};
701+
569702
/**
570703
* @brief Arguments for TxnPrepare/Commit/Abort RPC
571704
*/

0 commit comments

Comments
 (0)