Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions include/common/cluster_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,73 @@ class ClusterManager {
unmatched_rows_.erase(context_id);
}

/**
* @brief Store local left table rows for outer join processing
* Called during Phase 2 shuffle when sending left table rows to other nodes
*/
void set_local_left_rows(const std::string& context_id, const std::string& table_name,
std::vector<executor::Tuple> rows) {
const std::scoped_lock<std::mutex> lock(mutex_);
local_left_table_rows_[context_id][table_name] = std::move(rows);
}

/**
* @brief Get stored local left table rows
*/
[[nodiscard]] std::vector<executor::Tuple> get_local_left_rows(
const std::string& context_id, const std::string& table_name) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto ctx_it = local_left_table_rows_.find(context_id);
if (ctx_it != local_left_table_rows_.end()) {
auto table_it = ctx_it->second.find(table_name);
if (table_it != ctx_it->second.end()) {
return table_it->second;
}
}
return {};
}

/**
* @brief Clear local left table rows for a context
*/
void clear_local_left_rows(const std::string& context_id) {
const std::scoped_lock<std::mutex> lock(mutex_);
local_left_table_rows_.erase(context_id);
}

/**
* @brief Store unmatched LEFT rows for a context (used by FULL join NULL-padding)
*/
void set_unmatched_left_rows(const std::string& context_id, const std::string& table_name,
std::vector<executor::Tuple> rows) {
const std::scoped_lock<std::mutex> lock(mutex_);
unmatched_left_rows_[context_id][table_name] = std::move(rows);
}

/**
* @brief Get stored unmatched LEFT rows for a context
*/
[[nodiscard]] std::vector<executor::Tuple> get_unmatched_left_rows(
const std::string& context_id, const std::string& table_name) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto ctx_it = unmatched_left_rows_.find(context_id);
if (ctx_it != unmatched_left_rows_.end()) {
auto table_it = ctx_it->second.find(table_name);
if (table_it != ctx_it->second.end()) {
return table_it->second;
}
}
return {};
}

/**
* @brief Clear unmatched LEFT rows for a context
*/
void clear_unmatched_left_rows(const std::string& context_id) {
const std::scoped_lock<std::mutex> lock(mutex_);
unmatched_left_rows_.erase(context_id);
}

private:
/**
* @brief Stored bloom filter data for a context
Expand Down Expand Up @@ -438,6 +505,12 @@ class ClusterManager {
/* context_id -> table_name -> unmatched rows for outer join NULL-padding */
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
unmatched_rows_;
/* context_id -> table_name -> local left table rows for outer join tracking */
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
local_left_table_rows_;
/* context_id -> table_name -> unmatched LEFT rows for FULL join NULL-padding */
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
unmatched_left_rows_;
mutable std::mutex mutex_;
};

Expand Down
90 changes: 90 additions & 0 deletions include/network/rpc_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ enum class RpcType : uint8_t {
UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join
UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding
FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node
// LEFT-side counterparts for FULL join
UnmatchedLeftRowsReport = 16, // Data node reports unmatched LEFT rows for FULL join
FetchUnmatchedLeftRows = 17, // Coordinator fetches stored unmatched LEFT rows
Error = 255
};

Expand Down Expand Up @@ -700,6 +703,93 @@ struct FetchUnmatchedRowsArgs {
}
};

/**
* @brief Arguments for UnmatchedLeftRowsReport RPC
* @note Data node reports unmatched LEFT row keys to coordinator after local FULL join
*/
struct UnmatchedLeftRowsReportArgs {
std::string context_id;
std::string left_table;
std::string join_key_col; // Which column was the join key
std::vector<std::string> unmatched_keys; // LEFT key values that had no match
uint32_t right_column_count = 0; // Number of right columns for NULL-padding

[[nodiscard]] std::vector<uint8_t> serialize() const {
std::vector<uint8_t> out;
Serializer::serialize_string(context_id, out);
Serializer::serialize_string(left_table, out);
Serializer::serialize_string(join_key_col, out);

// Serialize right column count
const uint32_t rc_count = right_column_count;
const size_t rc_off = out.size();
out.resize(rc_off + Serializer::VAL_SIZE_32);
std::memcpy(out.data() + rc_off, &rc_count, Serializer::VAL_SIZE_32);

// Serialize unmatched keys count
const uint32_t count = static_cast<uint32_t>(unmatched_keys.size());
const size_t off = out.size();
out.resize(off + Serializer::VAL_SIZE_32);
std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32);

// Serialize each key
for (const auto& key : unmatched_keys) {
Serializer::serialize_string(key, out);
}
return out;
}

static UnmatchedLeftRowsReportArgs deserialize(const std::vector<uint8_t>& in) {
UnmatchedLeftRowsReportArgs args;
size_t offset = 0;
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
args.left_table = Serializer::deserialize_string(in.data(), offset, in.size());
args.join_key_col = Serializer::deserialize_string(in.data(), offset, in.size());

// Deserialize right column count
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
std::memcpy(&args.right_column_count, in.data() + offset, Serializer::VAL_SIZE_32);
offset += Serializer::VAL_SIZE_32;
}

uint32_t count = 0;
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32);
offset += Serializer::VAL_SIZE_32;
}

for (uint32_t i = 0; i < count; ++i) {
args.unmatched_keys.push_back(
Serializer::deserialize_string(in.data(), offset, in.size()));
}
return args;
}
};

/**
* @brief Arguments for FetchUnmatchedLeftRows RPC
* @note Coordinator fetches stored unmatched LEFT rows from a data node
*/
struct FetchUnmatchedLeftRowsArgs {
std::string context_id;
std::string table_name;

[[nodiscard]] std::vector<uint8_t> serialize() const {
std::vector<uint8_t> out;
Serializer::serialize_string(context_id, out);
Serializer::serialize_string(table_name, out);
return out;
}

static FetchUnmatchedLeftRowsArgs deserialize(const std::vector<uint8_t>& in) {
FetchUnmatchedLeftRowsArgs args;
size_t offset = 0;
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
args.table_name = Serializer::deserialize_string(in.data(), offset, in.size());
return args;
}
};

/**
* @brief Arguments for TxnPrepare/Commit/Abort RPC
*/
Expand Down
114 changes: 114 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,120 @@ int main(int argc, char* argv[]) {
args.context_id, args.table_name);
}

auto resp_p = reply.serialize();
cloudsql::network::RpcHeader resp_h;
resp_h.type = cloudsql::network::RpcType::QueryResults;
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
resp_h.encode(h_buf);
static_cast<void>(
send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
});

// Handler for reporting unmatched LEFT rows after join execution
// For FULL outer joins, each node identifies rows from its local left table
// partition that had no matching right row during the distributed join
rpc_server->set_handler(
cloudsql::network::RpcType::UnmatchedLeftRowsReport,
[&](const cloudsql::network::RpcHeader& h, const std::vector<uint8_t>& p,
int fd) {
(void)h;
auto args = cloudsql::network::UnmatchedLeftRowsReportArgs::deserialize(p);
cloudsql::network::UnmatchedLeftRowsReportArgs reply;
reply.context_id = args.context_id;
reply.left_table = args.left_table;
reply.join_key_col = args.join_key_col;

// args.unmatched_keys contains MATCHED keys from coordinator
// We need to return rows that are NOT in this set
std::unordered_set<std::string> matched_keys_set(
args.unmatched_keys.begin(), args.unmatched_keys.end());

try {
// Scan local left table and collect rows that were NOT matched
auto table_meta_opt = catalog->get_table_by_name(args.left_table);
if (table_meta_opt.has_value()) {
const auto* table_meta = table_meta_opt.value();
cloudsql::executor::Schema schema;
for (const auto& col : table_meta->columns) {
schema.add_column(col.name, col.type);
}
cloudsql::storage::HeapTable table(args.left_table, *bpm, schema);

const size_t key_idx = schema.find_column(args.join_key_col);
if (key_idx != static_cast<size_t>(-1)) {
std::vector<cloudsql::executor::Tuple> unmatched_tuples;
auto iter = table.scan();
cloudsql::storage::HeapTable::TupleMeta t_meta;
while (iter.next_meta(t_meta)) {
if (t_meta.xmax == 0) {
const auto& key_val = t_meta.tuple.get(key_idx);
std::string key_str = key_val.to_string();
// Only include if NOT in matched keys
if (matched_keys_set.find(key_str) ==
matched_keys_set.end()) {
reply.unmatched_keys.push_back(key_str);
// Pad with NULLs for right columns and append left
// row
std::vector<cloudsql::common::Value> padded_values;
padded_values.reserve(t_meta.tuple.size() +
args.right_column_count);
// Append left table column values
for (size_t j = 0; j < t_meta.tuple.size(); ++j) {
padded_values.push_back(t_meta.tuple.get(j));
}
// Append NULLs for right table columns
for (uint32_t i = 0; i < args.right_column_count;
++i) {
padded_values.push_back(
cloudsql::common::Value::make_null());
}
unmatched_tuples.emplace_back(
std::move(padded_values));
}
}
}
// Store properly padded tuples in ClusterManager for
// coordinator to collect
if (cluster_manager != nullptr && !unmatched_tuples.empty()) {
cluster_manager->set_unmatched_left_rows(
args.context_id, args.left_table,
std::move(unmatched_tuples));
}
}
}
} catch (const std::exception& /*e*/) {
// Return empty on error
}

auto resp_p = reply.serialize();
cloudsql::network::RpcHeader resp_h;
resp_h.type = cloudsql::network::RpcType::QueryResults;
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
resp_h.encode(h_buf);
static_cast<void>(
send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
});

// Handler for fetching stored unmatched LEFT rows from a data node
// Coordinator calls this after UnmatchedLeftRowsReport to get full unmatched tuples
rpc_server->set_handler(
cloudsql::network::RpcType::FetchUnmatchedLeftRows,
[&](const cloudsql::network::RpcHeader& h, const std::vector<uint8_t>& p,
int fd) {
(void)h;
auto args = cloudsql::network::FetchUnmatchedLeftRowsArgs::deserialize(p);
cloudsql::network::UnmatchedRowsPushArgs reply;
reply.context_id = args.context_id;

if (cluster_manager != nullptr) {
reply.unmatched_rows = cluster_manager->get_unmatched_left_rows(
args.context_id, args.table_name);
}

auto resp_p = reply.serialize();
cloudsql::network::RpcHeader resp_h;
resp_h.type = cloudsql::network::RpcType::QueryResults;
Expand Down