@@ -202,7 +202,9 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
202202 join.type != parser::SelectStatement::JoinType::Right &&
203203 join.type != parser::SelectStatement::JoinType::Full) {
204204 QueryResult res;
205- res.set_error (" Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL joins" );
205+ res.set_error (
206+ " Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL "
207+ " joins" );
206208 return res;
207209 }
208210
@@ -648,33 +650,31 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
648650 std::vector<std::future<std::pair<bool , network::UnmatchedRowsReportArgs>>> report_futures;
649651
650652 for (const auto & node : data_nodes) {
651- report_futures.push_back (std::async (std::launch::async, [node, context_id,
652- outer_join_right_table,
653- outer_join_right_key,
654- matched_keys,
655- left_column_count]() {
656- network::RpcClient client (node.address , node.cluster_port );
657- network::UnmatchedRowsReportArgs reply;
658- reply.context_id = context_id;
659- if (client.connect ()) {
660- network::UnmatchedRowsReportArgs report_args;
661- report_args.context_id = context_id;
662- report_args.right_table = outer_join_right_table;
663- report_args.join_key_col = outer_join_right_key;
664- // Attach matched keys so node knows what was matched
665- report_args.unmatched_keys = matched_keys;
666- // Attach left column count for NULL-padding
667- report_args.left_column_count = left_column_count;
653+ report_futures.push_back (std::async (
654+ std::launch::async, [node, context_id, outer_join_right_table, outer_join_right_key,
655+ matched_keys, left_column_count]() {
656+ network::RpcClient client (node.address , node.cluster_port );
657+ network::UnmatchedRowsReportArgs reply;
658+ reply.context_id = context_id;
659+ if (client.connect ()) {
660+ network::UnmatchedRowsReportArgs report_args;
661+ report_args.context_id = context_id;
662+ report_args.right_table = outer_join_right_table;
663+ report_args.join_key_col = outer_join_right_key;
664+ // Attach matched keys so node knows what was matched
665+ report_args.unmatched_keys = matched_keys;
666+ // Attach left column count for NULL-padding
667+ report_args.left_column_count = left_column_count;
668668
669- std::vector<uint8_t > resp;
670- if (client.call (network::RpcType::UnmatchedRowsReport,
671- report_args.serialize (), resp)) {
672- reply = network::UnmatchedRowsReportArgs::deserialize (resp);
673- return std::make_pair (true , reply);
669+ std::vector<uint8_t > resp;
670+ if (client.call (network::RpcType::UnmatchedRowsReport,
671+ report_args.serialize (), resp)) {
672+ reply = network::UnmatchedRowsReportArgs::deserialize (resp);
673+ return std::make_pair (true , reply);
674+ }
674675 }
675- }
676- return std::make_pair (false , reply);
677- }));
676+ return std::make_pair (false , reply);
677+ }));
678678 }
679679
680680 // Wait for all report futures to complete
@@ -686,25 +686,25 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt,
686686 std::vector<std::future<std::pair<bool , std::vector<executor::Tuple>>>> fetch_futures;
687687
688688 for (const auto & node : data_nodes) {
689- fetch_futures.push_back (std::async (std::launch::async, [node, context_id,
690- outer_join_right_table]() {
691- network::RpcClient client (node.address , node.cluster_port );
692- std::vector<executor::Tuple> rows;
693- if (client.connect ()) {
694- network::FetchUnmatchedRowsArgs fetch_args;
695- fetch_args.context_id = context_id;
696- fetch_args.table_name = outer_join_right_table;
689+ fetch_futures.push_back (
690+ std::async (std::launch::async, [node, context_id, outer_join_right_table]() {
691+ network::RpcClient client (node.address , node.cluster_port );
692+ std::vector<executor::Tuple> rows;
693+ if (client.connect ()) {
694+ network::FetchUnmatchedRowsArgs fetch_args;
695+ fetch_args.context_id = context_id;
696+ fetch_args.table_name = outer_join_right_table;
697697
698- std::vector<uint8_t > resp;
699- if (client.call (network::RpcType::FetchUnmatchedRows,
700- fetch_args.serialize (), resp)) {
701- auto reply = network::UnmatchedRowsPushArgs::deserialize (resp);
702- rows = std::move (reply.unmatched_rows );
703- return std::make_pair (true , std::move (rows));
698+ std::vector<uint8_t > resp;
699+ if (client.call (network::RpcType::FetchUnmatchedRows,
700+ fetch_args.serialize (), resp)) {
701+ auto reply = network::UnmatchedRowsPushArgs::deserialize (resp);
702+ rows = std::move (reply.unmatched_rows );
703+ return std::make_pair (true , std::move (rows));
704+ }
704705 }
705- }
706- return std::make_pair (false , std::move (rows));
707- }));
706+ return std::make_pair (false , std::move (rows));
707+ }));
708708 }
709709
710710 // Aggregate all unmatched rows from all nodes
0 commit comments