Skip to content

Commit 92ee3a1

Browse files
committed
feat: enable Phase 3-5 for FULL JOIN to collect unmatched LEFT rows
- Track outer_join_left_key in addition to outer_join_right_key - Enable Phase 3-5 for FULL JOIN (not RIGHT JOIN - RIGHT works via local executor) - Add LEFT-side Phase 3: UnmatchedLeftRowsReport RPC to each data node - Add LEFT-side Phase 4: FetchUnmatchedLeftRows RPC to collect unmatched rows - Aggregate unmatched LEFT rows into final results This implements Option C symmetric handling for FULL JOIN: - Unmatched RIGHT rows collected via Phase 3-4 (existing) - Unmatched LEFT rows collected via LEFT-side Phase 3-4 (new)
1 parent 6bb047c commit 92ee3a1

1 file changed

Lines changed: 107 additions & 9 deletions

File tree

src/distributed/distributed_executor.cpp

Lines changed: 107 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
184184
bool is_outer_join_join_query = false;
185185
std::string outer_join_left_table;
186186
std::string outer_join_right_table;
187+
std::string outer_join_left_key;
187188
std::string outer_join_right_key;
188189
parser::SelectStatement::JoinType outer_join_type = parser::SelectStatement::JoinType::Inner;
189190

@@ -234,6 +235,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
234235
is_outer_join_join_query = true;
235236
outer_join_left_table = left_table;
236237
outer_join_right_table = right_table;
238+
outer_join_left_key = left_key;
237239
outer_join_right_key = right_key;
238240
outer_join_type = join.type;
239241
}
@@ -611,15 +613,15 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
611613
}
612614
}
613615

614-
// Phase 3-5: Currently disabled for all outer joins due to issues with column indexing
615-
// when SELECT doesn't use SELECT * (causes duplicate rows instead of correct results).
616+
// Phase 3-5: For FULL JOIN, collect unmatched LEFT rows from data nodes
617+
// LEFT rows are emitted during probe phase when no match found, but we need to
618+
// COLLECT them from all data nodes for the coordinator's final result.
616619
//
617-
// For RIGHT JOIN: Local executor on each data node correctly handles unmatched right rows.
618-
// For FULL JOIN: Unmatched LEFT rows are not collected (to be implemented in separate PR).
620+
// For RIGHT JOIN: Local executor on each data node correctly handles unmatched right rows
621+
// (no collection needed - each node emits them locally).
619622
//
620-
// TODO: Re-enable Phase 3-5 for FULL JOIN once column indexing is fixed to properly
621-
// identify which rows were unmatched during the distributed join.
622-
if (false && is_outer_join_join_query && all_success) {
623+
// This block is only enabled for FULL JOIN.
624+
if (outer_join_type == parser::SelectStatement::JoinType::Full && all_success) {
623625
// Extract matched right keys from aggregated results
624626
// The right key column is at a known position in the result schema
625627
std::vector<std::string> matched_keys;
@@ -643,7 +645,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
643645
}
644646
}
645647

646-
// Phase 3: Ask each node to scan local table and store unmatched rows
648+
// Phase 3: Ask each node to scan local right table and store unmatched rows
647649
// First, compute the left column count for NULL-padding
648650
uint32_t left_column_count = 0;
649651
if (!outer_join_left_table.empty()) {
@@ -713,7 +715,7 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
713715
}));
714716
}
715717

716-
// Aggregate all unmatched rows from all nodes
718+
// Aggregate all unmatched RIGHT rows from all nodes
717719
for (auto& f : fetch_futures) {
718720
auto result = f.get();
719721
if (result.first) {
@@ -722,6 +724,102 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
722724
}
723725
}
724726
}
727+
728+
// === LEFT-side Phase 3-4 for FULL JOIN ===
729+
// Extract matched LEFT keys from aggregated results
730+
std::vector<std::string> matched_left_keys;
731+
size_t left_key_idx = static_cast<size_t>(-1);
732+
for (size_t i = 0; i < result_schema.columns().size(); ++i) {
733+
const auto& col = result_schema.columns()[i];
734+
if (col.name() == outer_join_left_key) {
735+
left_key_idx = i;
736+
break;
737+
}
738+
}
739+
if (left_key_idx != static_cast<size_t>(-1)) {
740+
for (const auto& row : aggregated_rows) {
741+
if (row.size() > left_key_idx) {
742+
matched_left_keys.push_back(row.get(left_key_idx).to_string());
743+
}
744+
}
745+
}
746+
747+
// LEFT-side Phase 3: Ask each node to scan local left table and store unmatched rows
748+
uint32_t right_column_count = 0;
749+
if (!outer_join_right_table.empty()) {
750+
auto right_table_info = catalog_.get_table_by_name(outer_join_right_table);
751+
if (right_table_info.has_value()) {
752+
right_column_count = static_cast<uint32_t>((*right_table_info)->columns.size());
753+
}
754+
}
755+
756+
std::vector<std::future<std::pair<bool, network::UnmatchedLeftRowsReportArgs>>>
757+
left_report_futures;
758+
759+
for (const auto& node : data_nodes) {
760+
left_report_futures.push_back(std::async(
761+
std::launch::async, [node, context_id, outer_join_left_table, outer_join_left_key,
762+
matched_left_keys, right_column_count]() {
763+
network::RpcClient client(node.address, node.cluster_port);
764+
network::UnmatchedLeftRowsReportArgs reply;
765+
if (client.connect()) {
766+
network::UnmatchedLeftRowsReportArgs report_args;
767+
report_args.context_id = context_id;
768+
report_args.left_table = outer_join_left_table;
769+
report_args.join_key_col = outer_join_left_key;
770+
report_args.unmatched_keys = matched_left_keys;
771+
report_args.right_column_count = right_column_count;
772+
773+
std::vector<uint8_t> resp;
774+
if (client.call(network::RpcType::UnmatchedLeftRowsReport,
775+
report_args.serialize(), resp)) {
776+
reply = network::UnmatchedLeftRowsReportArgs::deserialize(resp);
777+
return std::make_pair(true, reply);
778+
}
779+
}
780+
return std::make_pair(false, reply);
781+
}));
782+
}
783+
784+
// Wait for all LEFT report futures to complete
785+
for (auto& f : left_report_futures) {
786+
f.get();
787+
}
788+
789+
// LEFT-side Phase 4: Fetch stored unmatched LEFT rows from each node
790+
std::vector<std::future<std::pair<bool, std::vector<executor::Tuple>>>> left_fetch_futures;
791+
792+
for (const auto& node : data_nodes) {
793+
left_fetch_futures.push_back(std::async(
794+
std::launch::async, [node, context_id, outer_join_left_table]() {
795+
network::RpcClient client(node.address, node.cluster_port);
796+
std::vector<executor::Tuple> rows;
797+
if (client.connect()) {
798+
network::FetchUnmatchedLeftRowsArgs fetch_args;
799+
fetch_args.context_id = context_id;
800+
fetch_args.table_name = outer_join_left_table;
801+
802+
std::vector<uint8_t> resp;
803+
if (client.call(network::RpcType::FetchUnmatchedLeftRows,
804+
fetch_args.serialize(), resp)) {
805+
auto reply = network::UnmatchedRowsPushArgs::deserialize(resp);
806+
rows = std::move(reply.unmatched_rows);
807+
return std::make_pair(true, std::move(rows));
808+
}
809+
}
810+
return std::make_pair(false, std::move(rows));
811+
}));
812+
}
813+
814+
// Aggregate all unmatched LEFT rows from all nodes
815+
for (auto& f : left_fetch_futures) {
816+
auto result = f.get();
817+
if (result.first) {
818+
for (auto& row : result.second) {
819+
aggregated_rows.push_back(std::move(row));
820+
}
821+
}
822+
}
725823
}
726824

727825
if (all_success) {

0 commit comments

Comments
 (0)