@@ -202,7 +202,9 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
202202 join.type != parser::SelectStatement::JoinType::Right &&
203203 join.type != parser::SelectStatement::JoinType::Full) {
204204 QueryResult res;
205- res.set_error (" Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL joins" );
205+ res.set_error (
206+ " Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL "
207+ " joins" );
206208 return res;
207209 }
208210
@@ -653,33 +655,31 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
653655 std::vector<std::future<std::pair<bool , network::UnmatchedRowsReportArgs>>> report_futures;
654656
655657 for (const auto & node : data_nodes) {
656- report_futures.push_back (std::async (std::launch::async, [node, context_id,
657- outer_join_right_table,
658- outer_join_right_key,
659- matched_keys,
660- left_column_count]() {
661- network::RpcClient client (node.address , node.cluster_port );
662- network::UnmatchedRowsReportArgs reply;
663- reply.context_id = context_id;
664- if (client.connect ()) {
665- network::UnmatchedRowsReportArgs report_args;
666- report_args.context_id = context_id;
667- report_args.right_table = outer_join_right_table;
668- report_args.join_key_col = outer_join_right_key;
669- // Attach matched keys so node knows what was matched
670- report_args.unmatched_keys = matched_keys;
671- // Attach left column count for NULL-padding
672- report_args.left_column_count = left_column_count;
658+ report_futures.push_back (std::async (
659+ std::launch::async, [node, context_id, outer_join_right_table, outer_join_right_key,
660+ matched_keys, left_column_count]() {
661+ network::RpcClient client (node.address , node.cluster_port );
662+ network::UnmatchedRowsReportArgs reply;
663+ reply.context_id = context_id;
664+ if (client.connect ()) {
665+ network::UnmatchedRowsReportArgs report_args;
666+ report_args.context_id = context_id;
667+ report_args.right_table = outer_join_right_table;
668+ report_args.join_key_col = outer_join_right_key;
669+ // Attach matched keys so node knows what was matched
670+ report_args.unmatched_keys = matched_keys;
671+ // Attach left column count for NULL-padding
672+ report_args.left_column_count = left_column_count;
673673
674- std::vector<uint8_t > resp;
675- if (client.call (network::RpcType::UnmatchedRowsReport,
676- report_args.serialize (), resp)) {
677- reply = network::UnmatchedRowsReportArgs::deserialize (resp);
678- return std::make_pair (true , reply);
674+ std::vector<uint8_t > resp;
675+ if (client.call (network::RpcType::UnmatchedRowsReport,
676+ report_args.serialize (), resp)) {
677+ reply = network::UnmatchedRowsReportArgs::deserialize (resp);
678+ return std::make_pair (true , reply);
679+ }
679680 }
680- }
681- return std::make_pair (false , reply);
682- }));
681+ return std::make_pair (false , reply);
682+ }));
683683 }
684684
685685 // Wait for all report futures to complete
@@ -691,25 +691,25 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
691691 std::vector<std::future<std::pair<bool , std::vector<executor::Tuple>>>> fetch_futures;
692692
693693 for (const auto & node : data_nodes) {
694- fetch_futures.push_back (std::async (std::launch::async, [node, context_id,
695- outer_join_right_table]() {
696- network::RpcClient client (node.address , node.cluster_port );
697- std::vector<executor::Tuple> rows;
698- if (client.connect ()) {
699- network::FetchUnmatchedRowsArgs fetch_args;
700- fetch_args.context_id = context_id;
701- fetch_args.table_name = outer_join_right_table;
694+ fetch_futures.push_back (
695+ std::async (std::launch::async, [node, context_id, outer_join_right_table]() {
696+ network::RpcClient client (node.address , node.cluster_port );
697+ std::vector<executor::Tuple> rows;
698+ if (client.connect ()) {
699+ network::FetchUnmatchedRowsArgs fetch_args;
700+ fetch_args.context_id = context_id;
701+ fetch_args.table_name = outer_join_right_table;
702702
703- std::vector<uint8_t > resp;
704- if (client.call (network::RpcType::FetchUnmatchedRows,
705- fetch_args.serialize (), resp)) {
706- auto reply = network::UnmatchedRowsPushArgs::deserialize (resp);
707- rows = std::move (reply.unmatched_rows );
708- return std::make_pair (true , std::move (rows));
703+ std::vector<uint8_t > resp;
704+ if (client.call (network::RpcType::FetchUnmatchedRows,
705+ fetch_args.serialize (), resp)) {
706+ auto reply = network::UnmatchedRowsPushArgs::deserialize (resp);
707+ rows = std::move (reply.unmatched_rows );
708+ return std::make_pair (true , std::move (rows));
709+ }
709710 }
710- }
711- return std::make_pair (false , std::move (rows));
712- }));
711+ return std::make_pair (false , std::move (rows));
712+ }));
713713 }
714714
715715 // Aggregate all unmatched rows from all nodes
0 commit comments