Skip to content

Commit 654d198

Browse files
poyrazKgithub-actions[bot]
authored andcommitted
style: automated clang-format fixes
1 parent c171913 commit 654d198

5 files changed

Lines changed: 88 additions & 74 deletions

File tree

include/common/cluster_manager.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ class ClusterManager {
375375
* @brief Store unmatched rows for a context (used by outer join processing)
376376
*/
377377
void set_unmatched_rows(const std::string& context_id, const std::string& table_name,
378-
std::vector<executor::Tuple> rows) {
378+
std::vector<executor::Tuple> rows) {
379379
const std::scoped_lock<std::mutex> lock(mutex_);
380380
unmatched_rows_[context_id][table_name] = std::move(rows);
381381
}

include/network/rpc_message.hpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ enum class RpcType : uint8_t {
3636
BloomFilterPush = 11,
3737
BloomFilterBits = 12,
3838
UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join
39-
UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding
40-
FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node
39+
UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding
40+
FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node
4141
Error = 255
4242
};
4343

@@ -576,9 +576,9 @@ struct BloomFilterBitsArgs {
576576
struct UnmatchedRowsReportArgs {
577577
std::string context_id;
578578
std::string right_table;
579-
std::string join_key_col; // Which column was the join key
579+
std::string join_key_col; // Which column was the join key
580580
std::vector<std::string> unmatched_keys; // Key values that had no match
581-
uint32_t left_column_count = 0; // Number of left table columns for NULL-padding
581+
uint32_t left_column_count = 0; // Number of left table columns for NULL-padding
582582

583583
[[nodiscard]] std::vector<uint8_t> serialize() const {
584584
std::vector<uint8_t> out;
@@ -669,7 +669,8 @@ struct UnmatchedRowsPushArgs {
669669
}
670670

671671
for (uint32_t i = 0; i < count; ++i) {
672-
args.unmatched_rows.push_back(Serializer::deserialize_tuple(in.data(), offset, in.size()));
672+
args.unmatched_rows.push_back(
673+
Serializer::deserialize_tuple(in.data(), offset, in.size()));
673674
}
674675
return args;
675676
}

src/distributed/distributed_executor.cpp

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,9 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
202202
join.type != parser::SelectStatement::JoinType::Right &&
203203
join.type != parser::SelectStatement::JoinType::Full) {
204204
QueryResult res;
205-
res.set_error("Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL joins");
205+
res.set_error(
206+
"Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL "
207+
"joins");
206208
return res;
207209
}
208210

@@ -654,33 +656,31 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
654656
std::vector<std::future<std::pair<bool, network::UnmatchedRowsReportArgs>>> report_futures;
655657

656658
for (const auto& node : data_nodes) {
657-
report_futures.push_back(std::async(std::launch::async, [node, context_id,
658-
outer_join_right_table,
659-
outer_join_right_key,
660-
matched_keys,
661-
left_column_count]() {
662-
network::RpcClient client(node.address, node.cluster_port);
663-
network::UnmatchedRowsReportArgs reply;
664-
reply.context_id = context_id;
665-
if (client.connect()) {
666-
network::UnmatchedRowsReportArgs report_args;
667-
report_args.context_id = context_id;
668-
report_args.right_table = outer_join_right_table;
669-
report_args.join_key_col = outer_join_right_key;
670-
// Attach matched keys so node knows what was matched
671-
report_args.unmatched_keys = matched_keys;
672-
// Attach left column count for NULL-padding
673-
report_args.left_column_count = left_column_count;
659+
report_futures.push_back(std::async(
660+
std::launch::async, [node, context_id, outer_join_right_table, outer_join_right_key,
661+
matched_keys, left_column_count]() {
662+
network::RpcClient client(node.address, node.cluster_port);
663+
network::UnmatchedRowsReportArgs reply;
664+
reply.context_id = context_id;
665+
if (client.connect()) {
666+
network::UnmatchedRowsReportArgs report_args;
667+
report_args.context_id = context_id;
668+
report_args.right_table = outer_join_right_table;
669+
report_args.join_key_col = outer_join_right_key;
670+
// Attach matched keys so node knows what was matched
671+
report_args.unmatched_keys = matched_keys;
672+
// Attach left column count for NULL-padding
673+
report_args.left_column_count = left_column_count;
674674

675-
std::vector<uint8_t> resp;
676-
if (client.call(network::RpcType::UnmatchedRowsReport,
677-
report_args.serialize(), resp)) {
678-
reply = network::UnmatchedRowsReportArgs::deserialize(resp);
679-
return std::make_pair(true, reply);
675+
std::vector<uint8_t> resp;
676+
if (client.call(network::RpcType::UnmatchedRowsReport,
677+
report_args.serialize(), resp)) {
678+
reply = network::UnmatchedRowsReportArgs::deserialize(resp);
679+
return std::make_pair(true, reply);
680+
}
680681
}
681-
}
682-
return std::make_pair(false, reply);
683-
}));
682+
return std::make_pair(false, reply);
683+
}));
684684
}
685685

686686
// Wait for all report futures to complete
@@ -692,25 +692,25 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
692692
std::vector<std::future<std::pair<bool, std::vector<executor::Tuple>>>> fetch_futures;
693693

694694
for (const auto& node : data_nodes) {
695-
fetch_futures.push_back(std::async(std::launch::async, [node, context_id,
696-
outer_join_right_table]() {
697-
network::RpcClient client(node.address, node.cluster_port);
698-
std::vector<executor::Tuple> rows;
699-
if (client.connect()) {
700-
network::FetchUnmatchedRowsArgs fetch_args;
701-
fetch_args.context_id = context_id;
702-
fetch_args.table_name = outer_join_right_table;
695+
fetch_futures.push_back(
696+
std::async(std::launch::async, [node, context_id, outer_join_right_table]() {
697+
network::RpcClient client(node.address, node.cluster_port);
698+
std::vector<executor::Tuple> rows;
699+
if (client.connect()) {
700+
network::FetchUnmatchedRowsArgs fetch_args;
701+
fetch_args.context_id = context_id;
702+
fetch_args.table_name = outer_join_right_table;
703703

704-
std::vector<uint8_t> resp;
705-
if (client.call(network::RpcType::FetchUnmatchedRows,
706-
fetch_args.serialize(), resp)) {
707-
auto reply = network::UnmatchedRowsPushArgs::deserialize(resp);
708-
rows = std::move(reply.unmatched_rows);
709-
return std::make_pair(true, std::move(rows));
704+
std::vector<uint8_t> resp;
705+
if (client.call(network::RpcType::FetchUnmatchedRows,
706+
fetch_args.serialize(), resp)) {
707+
auto reply = network::UnmatchedRowsPushArgs::deserialize(resp);
708+
rows = std::move(reply.unmatched_rows);
709+
return std::make_pair(true, std::move(rows));
710+
}
710711
}
711-
}
712-
return std::make_pair(false, std::move(rows));
713-
}));
712+
return std::make_pair(false, std::move(rows));
713+
}));
714714
}
715715

716716
// Aggregate all unmatched rows from all nodes

src/main.cpp

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -745,28 +745,35 @@ int main(int argc, char* argv[]) {
745745
const auto& key_val = t_meta.tuple.get(key_idx);
746746
std::string key_str = key_val.to_string();
747747
// Only include if NOT in matched keys
748-
if (matched_keys_set.find(key_str) == matched_keys_set.end()) {
748+
if (matched_keys_set.find(key_str) ==
749+
matched_keys_set.end()) {
749750
reply.unmatched_keys.push_back(key_str);
750-
// Pad with NULLs for left columns and append right row
751+
// Pad with NULLs for left columns and append right
752+
// row
751753
std::vector<cloudsql::common::Value> padded_values;
752-
padded_values.reserve(args.left_column_count + t_meta.tuple.size());
754+
padded_values.reserve(args.left_column_count +
755+
t_meta.tuple.size());
753756
// Prepend NULLs for left table columns
754-
for (uint32_t i = 0; i < args.left_column_count; ++i) {
755-
padded_values.push_back(cloudsql::common::Value::make_null());
757+
for (uint32_t i = 0; i < args.left_column_count;
758+
++i) {
759+
padded_values.push_back(
760+
cloudsql::common::Value::make_null());
756761
}
757762
// Append right table column values
758763
for (size_t j = 0; j < t_meta.tuple.size(); ++j) {
759764
padded_values.push_back(t_meta.tuple.get(j));
760765
}
761-
unmatched_tuples.emplace_back(std::move(padded_values));
766+
unmatched_tuples.emplace_back(
767+
std::move(padded_values));
762768
}
763769
}
764770
}
765-
// Store properly padded tuples in ClusterManager for coordinator to collect
771+
// Store properly padded tuples in ClusterManager for
772+
// coordinator to collect
766773
if (cluster_manager != nullptr && !unmatched_tuples.empty()) {
767-
cluster_manager->set_unmatched_rows(args.context_id,
768-
args.right_table,
769-
std::move(unmatched_tuples));
774+
cluster_manager->set_unmatched_rows(
775+
args.context_id, args.right_table,
776+
std::move(unmatched_tuples));
770777
}
771778
}
772779
}
@@ -786,7 +793,8 @@ int main(int argc, char* argv[]) {
786793
});
787794

788795
// Handler for receiving unmatched rows from coordinator for NULL-padding emission
789-
// Coordinator broadcasts unmatched right rows to all nodes for final result assembly
796+
// Coordinator broadcasts unmatched right rows to all nodes for final result
797+
// assembly
790798
rpc_server->set_handler(
791799
cloudsql::network::RpcType::UnmatchedRowsPush,
792800
[&](const cloudsql::network::RpcHeader& h, const std::vector<uint8_t>& p,
@@ -796,8 +804,8 @@ int main(int argc, char* argv[]) {
796804

797805
if (cluster_manager != nullptr && !args.unmatched_rows.empty()) {
798806
cluster_manager->buffer_shuffle_data(args.context_id,
799-
"_unmatched_right_rows",
800-
std::move(args.unmatched_rows));
807+
"_unmatched_right_rows",
808+
std::move(args.unmatched_rows));
801809
}
802810

803811
cloudsql::network::QueryResultsReply reply;

tests/distributed_tests.cpp

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -577,11 +577,12 @@ TEST(DistributedExecutorTests, BloomFilterSkipForOuterJoin) {
577577
DistributedExecutor exec(*catalog, cm);
578578

579579
// Execute RIGHT join - bloom filter should NOT be sent
580-
auto lexer = std::make_unique<Lexer>(
581-
"SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id");
580+
auto lexer =
581+
std::make_unique<Lexer>("SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id");
582582
Parser parser(std::move(lexer));
583583
auto stmt = parser.parse_statement();
584-
auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id");
584+
auto res =
585+
exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id");
585586

586587
// For RIGHT join, bloom filter is skipped (should be 0)
587588
// Even though we returned valid bloom bits, the coordinator should not push for outer joins
@@ -615,15 +616,18 @@ TEST(DistributedExecutorTests, Phase3SkippedForRightJoin) {
615616
Schema schema;
616617
schema.add_column("id", common::ValueType::TYPE_INT64);
617618
schema.add_column("val", common::ValueType::TYPE_INT64);
618-
schema.add_column("id", common::ValueType::TYPE_INT64); // table2.id (ambiguous but matches)
619+
schema.add_column("id",
620+
common::ValueType::TYPE_INT64); // table2.id (ambiguous but matches)
619621
schema.add_column("val", common::ValueType::TYPE_INT64); // table2.val
620622
reply.schema = schema;
621623

622624
// Return some matched rows (e.g., rows where table2.id = 1 and 2 matched)
623625
// Format: {table1.id, table1.val, table2.id, table2.val}
624626
reply.rows = {
625-
Tuple{Value::make_int64(100), Value::make_int64(10), Value::make_int64(1), Value::make_int64(100)},
626-
Tuple{Value::make_int64(200), Value::make_int64(20), Value::make_int64(2), Value::make_int64(200)},
627+
Tuple{Value::make_int64(100), Value::make_int64(10), Value::make_int64(1),
628+
Value::make_int64(100)},
629+
Tuple{Value::make_int64(200), Value::make_int64(20), Value::make_int64(2),
630+
Value::make_int64(200)},
627631
};
628632

629633
auto resp_p = reply.serialize();
@@ -665,8 +669,8 @@ TEST(DistributedExecutorTests, Phase3SkippedForRightJoin) {
665669

666670
UnmatchedRowsPushArgs reply;
667671
reply.context_id = args.context_id;
668-
reply.unmatched_rows = {
669-
Tuple{Value::make_null(), Value::make_null(), Value::make_int64(3), Value::make_int64(30)}};
672+
reply.unmatched_rows = {Tuple{Value::make_null(), Value::make_null(), Value::make_int64(3),
673+
Value::make_int64(30)}};
670674

671675
auto resp_p = reply.serialize();
672676
RpcHeader resp_h;
@@ -743,16 +747,17 @@ TEST(DistributedExecutorTests, Phase3SkippedForRightJoin) {
743747
DistributedExecutor exec(*catalog, cm);
744748

745749
// Execute RIGHT join
746-
auto lexer = std::make_unique<Lexer>(
747-
"SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id");
750+
auto lexer =
751+
std::make_unique<Lexer>("SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id");
748752
Parser parser(std::move(lexer));
749753
auto stmt = parser.parse_statement();
750-
auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id");
754+
auto res =
755+
exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id");
751756

752757
// Verify Phase 3-4 RPCs were NOT called for RIGHT JOIN
753758
// (local executor handles unmatched right rows correctly)
754759
EXPECT_EQ(unmatched_report_calls.load(), 0); // NOT called for RIGHT JOIN
755-
EXPECT_EQ(fetch_unmatched_calls.load(), 0); // NOT called for RIGHT JOIN
760+
EXPECT_EQ(fetch_unmatched_calls.load(), 0); // NOT called for RIGHT JOIN
756761
EXPECT_TRUE(res.success());
757762

758763
node1.stop();

0 commit comments

Comments
 (0)