Skip to content

Commit b3e07d1

Browse files
authored
Merge pull request #41 from poyrazK/feature/full-join-option-c-infrastructure
feat: add LEFT-side infrastructure for Phase 3-5 FULL join support
2 parents 62cd75e + 6018443 commit b3e07d1

3 files changed

Lines changed: 277 additions & 0 deletions

File tree

include/common/cluster_manager.hpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,73 @@ class ClusterManager {
404404
unmatched_rows_.erase(context_id);
405405
}
406406

407+
/**
408+
* @brief Store local left table rows for outer join processing
409+
* Called during Phase 2 shuffle when sending left table rows to other nodes
410+
*/
411+
void set_local_left_rows(const std::string& context_id, const std::string& table_name,
412+
std::vector<executor::Tuple> rows) {
413+
const std::scoped_lock<std::mutex> lock(mutex_);
414+
local_left_table_rows_[context_id][table_name] = std::move(rows);
415+
}
416+
417+
/**
418+
* @brief Get stored local left table rows
419+
*/
420+
[[nodiscard]] std::vector<executor::Tuple> get_local_left_rows(
421+
const std::string& context_id, const std::string& table_name) const {
422+
const std::scoped_lock<std::mutex> lock(mutex_);
423+
auto ctx_it = local_left_table_rows_.find(context_id);
424+
if (ctx_it != local_left_table_rows_.end()) {
425+
auto table_it = ctx_it->second.find(table_name);
426+
if (table_it != ctx_it->second.end()) {
427+
return table_it->second;
428+
}
429+
}
430+
return {};
431+
}
432+
433+
/**
434+
* @brief Clear local left table rows for a context
435+
*/
436+
void clear_local_left_rows(const std::string& context_id) {
437+
const std::scoped_lock<std::mutex> lock(mutex_);
438+
local_left_table_rows_.erase(context_id);
439+
}
440+
441+
/**
442+
* @brief Store unmatched LEFT rows for a context (used by FULL join NULL-padding)
443+
*/
444+
void set_unmatched_left_rows(const std::string& context_id, const std::string& table_name,
445+
std::vector<executor::Tuple> rows) {
446+
const std::scoped_lock<std::mutex> lock(mutex_);
447+
unmatched_left_rows_[context_id][table_name] = std::move(rows);
448+
}
449+
450+
/**
451+
* @brief Get stored unmatched LEFT rows for a context
452+
*/
453+
[[nodiscard]] std::vector<executor::Tuple> get_unmatched_left_rows(
454+
const std::string& context_id, const std::string& table_name) const {
455+
const std::scoped_lock<std::mutex> lock(mutex_);
456+
auto ctx_it = unmatched_left_rows_.find(context_id);
457+
if (ctx_it != unmatched_left_rows_.end()) {
458+
auto table_it = ctx_it->second.find(table_name);
459+
if (table_it != ctx_it->second.end()) {
460+
return table_it->second;
461+
}
462+
}
463+
return {};
464+
}
465+
466+
/**
467+
* @brief Clear unmatched LEFT rows for a context
468+
*/
469+
void clear_unmatched_left_rows(const std::string& context_id) {
470+
const std::scoped_lock<std::mutex> lock(mutex_);
471+
unmatched_left_rows_.erase(context_id);
472+
}
473+
407474
private:
408475
/**
409476
* @brief Stored bloom filter data for a context
@@ -438,6 +505,12 @@ class ClusterManager {
438505
/* context_id -> table_name -> unmatched rows for outer join NULL-padding */
439506
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
440507
unmatched_rows_;
508+
/* context_id -> table_name -> local left table rows for outer join tracking */
509+
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
510+
local_left_table_rows_;
511+
/* context_id -> table_name -> unmatched LEFT rows for FULL join NULL-padding */
512+
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
513+
unmatched_left_rows_;
441514
mutable std::mutex mutex_;
442515
};
443516

include/network/rpc_message.hpp

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ enum class RpcType : uint8_t {
3838
UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join
3939
UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding
4040
FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node
41+
// LEFT-side counterparts for FULL join
42+
UnmatchedLeftRowsReport = 16, // Data node reports unmatched LEFT rows for FULL join
43+
FetchUnmatchedLeftRows = 17, // Coordinator fetches stored unmatched LEFT rows
4144
Error = 255
4245
};
4346

@@ -700,6 +703,93 @@ struct FetchUnmatchedRowsArgs {
700703
}
701704
};
702705

706+
/**
707+
* @brief Arguments for UnmatchedLeftRowsReport RPC
708+
* @note Data node reports unmatched LEFT row keys to coordinator after local FULL join
709+
*/
710+
struct UnmatchedLeftRowsReportArgs {
711+
std::string context_id;
712+
std::string left_table;
713+
std::string join_key_col; // Which column was the join key
714+
std::vector<std::string> unmatched_keys; // LEFT key values that had no match
715+
uint32_t right_column_count = 0; // Number of right columns for NULL-padding
716+
717+
[[nodiscard]] std::vector<uint8_t> serialize() const {
718+
std::vector<uint8_t> out;
719+
Serializer::serialize_string(context_id, out);
720+
Serializer::serialize_string(left_table, out);
721+
Serializer::serialize_string(join_key_col, out);
722+
723+
// Serialize right column count
724+
const uint32_t rc_count = right_column_count;
725+
const size_t rc_off = out.size();
726+
out.resize(rc_off + Serializer::VAL_SIZE_32);
727+
std::memcpy(out.data() + rc_off, &rc_count, Serializer::VAL_SIZE_32);
728+
729+
// Serialize unmatched keys count
730+
const uint32_t count = static_cast<uint32_t>(unmatched_keys.size());
731+
const size_t off = out.size();
732+
out.resize(off + Serializer::VAL_SIZE_32);
733+
std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32);
734+
735+
// Serialize each key
736+
for (const auto& key : unmatched_keys) {
737+
Serializer::serialize_string(key, out);
738+
}
739+
return out;
740+
}
741+
742+
static UnmatchedLeftRowsReportArgs deserialize(const std::vector<uint8_t>& in) {
743+
UnmatchedLeftRowsReportArgs args;
744+
size_t offset = 0;
745+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
746+
args.left_table = Serializer::deserialize_string(in.data(), offset, in.size());
747+
args.join_key_col = Serializer::deserialize_string(in.data(), offset, in.size());
748+
749+
// Deserialize right column count
750+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
751+
std::memcpy(&args.right_column_count, in.data() + offset, Serializer::VAL_SIZE_32);
752+
offset += Serializer::VAL_SIZE_32;
753+
}
754+
755+
uint32_t count = 0;
756+
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
757+
std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32);
758+
offset += Serializer::VAL_SIZE_32;
759+
}
760+
761+
for (uint32_t i = 0; i < count; ++i) {
762+
args.unmatched_keys.push_back(
763+
Serializer::deserialize_string(in.data(), offset, in.size()));
764+
}
765+
return args;
766+
}
767+
};
768+
769+
/**
770+
* @brief Arguments for FetchUnmatchedLeftRows RPC
771+
* @note Coordinator fetches stored unmatched LEFT rows from a data node
772+
*/
773+
struct FetchUnmatchedLeftRowsArgs {
774+
std::string context_id;
775+
std::string table_name;
776+
777+
[[nodiscard]] std::vector<uint8_t> serialize() const {
778+
std::vector<uint8_t> out;
779+
Serializer::serialize_string(context_id, out);
780+
Serializer::serialize_string(table_name, out);
781+
return out;
782+
}
783+
784+
static FetchUnmatchedLeftRowsArgs deserialize(const std::vector<uint8_t>& in) {
785+
FetchUnmatchedLeftRowsArgs args;
786+
size_t offset = 0;
787+
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
788+
args.table_name = Serializer::deserialize_string(in.data(), offset, in.size());
789+
return args;
790+
}
791+
};
792+
703793
/**
704794
* @brief Arguments for TxnPrepare/Commit/Abort RPC
705795
*/

src/main.cpp

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,120 @@ int main(int argc, char* argv[]) {
837837
args.context_id, args.table_name);
838838
}
839839

840+
auto resp_p = reply.serialize();
841+
cloudsql::network::RpcHeader resp_h;
842+
resp_h.type = cloudsql::network::RpcType::QueryResults;
843+
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
844+
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
845+
resp_h.encode(h_buf);
846+
static_cast<void>(
847+
send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
848+
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
849+
});
850+
851+
// Handler for reporting unmatched LEFT rows after join execution
852+
// For FULL outer joins, each node identifies rows from its local left table
853+
// partition that had no matching right row during the distributed join
854+
rpc_server->set_handler(
855+
cloudsql::network::RpcType::UnmatchedLeftRowsReport,
856+
[&](const cloudsql::network::RpcHeader& h, const std::vector<uint8_t>& p,
857+
int fd) {
858+
(void)h;
859+
auto args = cloudsql::network::UnmatchedLeftRowsReportArgs::deserialize(p);
860+
cloudsql::network::UnmatchedLeftRowsReportArgs reply;
861+
reply.context_id = args.context_id;
862+
reply.left_table = args.left_table;
863+
reply.join_key_col = args.join_key_col;
864+
865+
// args.unmatched_keys contains MATCHED keys from coordinator
866+
// We need to return rows that are NOT in this set
867+
std::unordered_set<std::string> matched_keys_set(
868+
args.unmatched_keys.begin(), args.unmatched_keys.end());
869+
870+
try {
871+
// Scan local left table and collect rows that were NOT matched
872+
auto table_meta_opt = catalog->get_table_by_name(args.left_table);
873+
if (table_meta_opt.has_value()) {
874+
const auto* table_meta = table_meta_opt.value();
875+
cloudsql::executor::Schema schema;
876+
for (const auto& col : table_meta->columns) {
877+
schema.add_column(col.name, col.type);
878+
}
879+
cloudsql::storage::HeapTable table(args.left_table, *bpm, schema);
880+
881+
const size_t key_idx = schema.find_column(args.join_key_col);
882+
if (key_idx != static_cast<size_t>(-1)) {
883+
std::vector<cloudsql::executor::Tuple> unmatched_tuples;
884+
auto iter = table.scan();
885+
cloudsql::storage::HeapTable::TupleMeta t_meta;
886+
while (iter.next_meta(t_meta)) {
887+
if (t_meta.xmax == 0) {
888+
const auto& key_val = t_meta.tuple.get(key_idx);
889+
std::string key_str = key_val.to_string();
890+
// Only include if NOT in matched keys
891+
if (matched_keys_set.find(key_str) ==
892+
matched_keys_set.end()) {
893+
reply.unmatched_keys.push_back(key_str);
894+
// Pad with NULLs for right columns and append left
895+
// row
896+
std::vector<cloudsql::common::Value> padded_values;
897+
padded_values.reserve(t_meta.tuple.size() +
898+
args.right_column_count);
899+
// Append left table column values
900+
for (size_t j = 0; j < t_meta.tuple.size(); ++j) {
901+
padded_values.push_back(t_meta.tuple.get(j));
902+
}
903+
// Append NULLs for right table columns
904+
for (uint32_t i = 0; i < args.right_column_count;
905+
++i) {
906+
padded_values.push_back(
907+
cloudsql::common::Value::make_null());
908+
}
909+
unmatched_tuples.emplace_back(
910+
std::move(padded_values));
911+
}
912+
}
913+
}
914+
// Store properly padded tuples in ClusterManager for
915+
// coordinator to collect
916+
if (cluster_manager != nullptr && !unmatched_tuples.empty()) {
917+
cluster_manager->set_unmatched_left_rows(
918+
args.context_id, args.left_table,
919+
std::move(unmatched_tuples));
920+
}
921+
}
922+
}
923+
} catch (const std::exception& /*e*/) {
924+
// Return empty on error
925+
}
926+
927+
auto resp_p = reply.serialize();
928+
cloudsql::network::RpcHeader resp_h;
929+
resp_h.type = cloudsql::network::RpcType::QueryResults;
930+
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
931+
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
932+
resp_h.encode(h_buf);
933+
static_cast<void>(
934+
send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
935+
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
936+
});
937+
938+
// Handler for fetching stored unmatched LEFT rows from a data node
939+
// Coordinator calls this after UnmatchedLeftRowsReport to get full unmatched tuples
940+
rpc_server->set_handler(
941+
cloudsql::network::RpcType::FetchUnmatchedLeftRows,
942+
[&](const cloudsql::network::RpcHeader& h, const std::vector<uint8_t>& p,
943+
int fd) {
944+
(void)h;
945+
auto args = cloudsql::network::FetchUnmatchedLeftRowsArgs::deserialize(p);
946+
cloudsql::network::UnmatchedRowsPushArgs reply;
947+
reply.context_id = args.context_id;
948+
949+
if (cluster_manager != nullptr) {
950+
reply.unmatched_rows = cluster_manager->get_unmatched_left_rows(
951+
args.context_id, args.table_name);
952+
}
953+
840954
auto resp_p = reply.serialize();
841955
cloudsql::network::RpcHeader resp_h;
842956
resp_h.type = cloudsql::network::RpcType::QueryResults;

0 commit comments

Comments
 (0)