Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/executor/operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ class HashJoinOperator : public Operator {
/* Final phase for RIGHT/FULL joins */
std::optional<std::unordered_multimap<std::string, BuildTuple>::iterator> right_idx_iter_;

/* Storage for unmatched LEFT tuples (for FULL JOIN distributed collection) */
std::vector<Tuple> unmatched_left_rows_;
std::vector<std::string> unmatched_left_keys_;

public:
HashJoinOperator(std::unique_ptr<Operator> left, std::unique_ptr<Operator> right,
std::unique_ptr<parser::Expression> left_key,
Expand Down Expand Up @@ -370,6 +374,18 @@ class HashJoinOperator : public Operator {
* @return Vector of strings - the join key values for unmatched right rows
*/
[[nodiscard]] std::vector<std::string> get_unmatched_right_keys() const;

/**
* @brief Get unmatched left rows after join execution
* @return Vector of tuples - the left-side rows that had no match
*/
[[nodiscard]] std::vector<Tuple> get_unmatched_left_rows() const;

/**
* @brief Get join key values of unmatched left rows
* @return Vector of strings - the join key values for unmatched left rows
*/
[[nodiscard]] std::vector<std::string> get_unmatched_left_keys() const;
};

/**
Expand Down
26 changes: 24 additions & 2 deletions src/executor/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ bool HashJoinOperator::open() {
bool HashJoinOperator::next(Tuple& out_tuple) {
auto left_schema = left_->output_schema();
auto right_schema = right_->output_schema();
std::string current_left_key_str;

while (true) {
if (match_iter_.has_value()) {
Expand All @@ -784,6 +785,10 @@ bool HashJoinOperator::next(Tuple& out_tuple) {
match_iter_ = std::nullopt;
if ((join_type_ == JoinType::Left || join_type_ == JoinType::Full) &&
!left_had_match_) {
/* Store unmatched left tuple and key for Phase 3-5 collection (FULL JOIN) */
unmatched_left_keys_.push_back(current_left_key_str);
unmatched_left_rows_.push_back(Tuple(*left_tuple_));

std::pmr::vector<common::Value> joined_values(left_tuple_->values().begin(),
left_tuple_->values().end(),
get_memory_resource());
Expand All @@ -802,15 +807,22 @@ bool HashJoinOperator::next(Tuple& out_tuple) {
if (left_->next(next_left)) {
left_tuple_ = std::move(next_left);
left_had_match_ = false;
}

if (left_tuple_.has_value()) {
const common::Value key =
left_key_->evaluate(&(left_tuple_.value()), &left_schema, get_params());
current_left_key_str = key.to_string();
auto range = hash_table_.equal_range(current_left_key_str);

/* Look up in hash table */
auto range = hash_table_.equal_range(key.to_string());
if (range.first != range.second) {
match_iter_ = {range.first, range.second};
} else if (join_type_ == JoinType::Left || join_type_ == JoinType::Full) {
/* No match found immediately, emit NULLs if Left/Full join */
/* Store unmatched left tuple and key for Phase 3-5 collection (FULL JOIN) */
unmatched_left_keys_.push_back(current_left_key_str);
unmatched_left_rows_.push_back(Tuple(*left_tuple_));

std::pmr::vector<common::Value> joined_values(left_tuple_->values().begin(),
left_tuple_->values().end(),
get_memory_resource());
Expand Down Expand Up @@ -862,6 +874,8 @@ void HashJoinOperator::close() {
hash_table_.clear();
match_iter_ = std::nullopt;
left_tuple_ = std::nullopt;
unmatched_left_rows_.clear();
unmatched_left_keys_.clear();
set_state(ExecState::Done);
}

Expand Down Expand Up @@ -912,6 +926,14 @@ std::vector<std::string> HashJoinOperator::get_unmatched_right_keys() const {
return keys;
}

std::vector<Tuple> HashJoinOperator::get_unmatched_left_rows() const {
return unmatched_left_rows_;
}

std::vector<std::string> HashJoinOperator::get_unmatched_left_keys() const {
return unmatched_left_keys_;
}

/* --- LimitOperator --- */

LimitOperator::LimitOperator(std::unique_ptr<Operator> child, int64_t limit, int64_t offset)
Expand Down
87 changes: 87 additions & 0 deletions tests/operator_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,93 @@ TEST_F(OperatorTests, HashJoinLeft) {
join->close();
}

TEST_F(OperatorTests, HashJoinLeftUnmatchedCollection) {
// Test that get_unmatched_left_rows/keys correctly tracks unmatched left tuples
// Left table: values 1, 2, 3 (only 2 has a match)
Schema left_schema = make_schema({{"id", common::ValueType::TYPE_INT64}});
std::vector<Tuple> left_data;
left_data.push_back(make_tuple({common::Value::make_int64(1)})); // no match
left_data.push_back(make_tuple({common::Value::make_int64(2)})); // matches
left_data.push_back(make_tuple({common::Value::make_int64(3)})); // no match

// Right table: values 2, 4
Schema right_schema = make_schema({{"id", common::ValueType::TYPE_INT64}});
std::vector<Tuple> right_data;
right_data.push_back(make_tuple({common::Value::make_int64(2)}));
right_data.push_back(make_tuple({common::Value::make_int64(4)}));

auto left_scan = make_buffer_scan("left_table", left_data, left_schema);
auto right_scan = make_buffer_scan("right_table", right_data, right_schema);

auto join = make_hash_join(std::move(left_scan), std::move(right_scan), col_expr("id"),
col_expr("id"), JoinType::Left);

ASSERT_TRUE(join->init());
ASSERT_TRUE(join->open());

// Consume all join results
Tuple tuple;
while (join->next(tuple)) {
}

// After join completes, verify unmatched left tracking
auto unmatched_rows = join->get_unmatched_left_rows();
auto unmatched_keys = join->get_unmatched_left_keys();

// We expect 2 unmatched left tuples: id=1 and id=3
EXPECT_EQ(unmatched_rows.size(), 2U);
EXPECT_EQ(unmatched_keys.size(), 2U);

// Keys should be "1" and "3" (to_string of int64)
EXPECT_EQ(unmatched_keys[0], "1");
EXPECT_EQ(unmatched_keys[1], "3");

// Check the actual tuple values
EXPECT_EQ(unmatched_rows[0].get(0).to_int64(), 1);
EXPECT_EQ(unmatched_rows[1].get(0).to_int64(), 3);

join->close();
}

TEST_F(OperatorTests, HashJoinFullUnmatchedLeftCollection) {
// Test LEFT unmatched collection for FULL join
// Similar to LEFT join but tests the FULL join path
Schema left_schema = make_schema({{"id", common::ValueType::TYPE_INT64}});
std::vector<Tuple> left_data;
left_data.push_back(make_tuple({common::Value::make_int64(1)})); // no match
left_data.push_back(make_tuple({common::Value::make_int64(2)})); // matches

Schema right_schema = make_schema({{"id", common::ValueType::TYPE_INT64}});
std::vector<Tuple> right_data;
right_data.push_back(make_tuple({common::Value::make_int64(2)}));
right_data.push_back(make_tuple({common::Value::make_int64(3)})); // no match

auto left_scan = make_buffer_scan("left_table", left_data, left_schema);
auto right_scan = make_buffer_scan("right_table", right_data, right_schema);

auto join = make_hash_join(std::move(left_scan), std::move(right_scan), col_expr("id"),
col_expr("id"), JoinType::Full);

ASSERT_TRUE(join->init());
ASSERT_TRUE(join->open());

// Consume all join results
Tuple tuple;
while (join->next(tuple)) {
}

// For FULL join, we should track unmatched LEFT tuples
// Note: RIGHT unmatched tuples are emitted during right scan phase and marked matched,
// so get_unmatched_right_keys() won't include them (they're already "accounted for")
auto unmatched_left_keys = join->get_unmatched_left_keys();

// Left unmatched: id=1
EXPECT_EQ(unmatched_left_keys.size(), 1U);
EXPECT_EQ(unmatched_left_keys[0], "1");

join->close();
}

TEST_F(OperatorTests, HashJoinEmpty) {
// Left has data
Schema left_schema = make_schema({{"id", common::ValueType::TYPE_INT64}});
Expand Down
Loading