Skip to content

perf: stream the target on the probe side of the merge_insert join#7382

Open
sezruby wants to merge 1 commit into
lance-format:mainfrom
sezruby:fix/merge-insert-join-build-side
Open

perf: stream the target on the probe side of the merge_insert join#7382
sezruby wants to merge 1 commit into
lance-format:mainfrom
sezruby:fix/merge-insert-join-build-side

Conversation

@sezruby

@sezruby sezruby commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Problem

create_plan — the merge_insert fast path used by MergeInsertJob::execute — builds the join as target.join(source).

When the target exceeds DataFusion's hash-join collect threshold (hash_join_single_partition_threshold_rows, 128K rows), the join is planned as mode=Partitioned, whose build (left) side is hashed and held in memory (per partition). With the target on the left, the entire target is materialized in memory during the merge. On a large target this dominates memory use and scales with target size, even when only a handful of rows are being upserted.

This is the wrong side to build: the source of an upsert is typically far smaller than the target.

Fix

Build the join as source.join(target) so the (typically small) source is the hash build side and the (potentially huge) target is streamed as the probe side.

Neither input carries comparable row statistics (the source is a one-shot stream), so DataFusion's should_swap_join_order leaves the operands as written rather than swapping the target back onto the build side. The join-type mapping is mirrored accordingly (LeftRight; Inner/Full unchanged). Every column is referenced downstream by qualified name, not position, so the join output is semantically identical — this is purely a memory/scheduling change.

Measured (single-node, pure MergeInsertJob::execute)

Upserting a 50K-row source into a wide-row target, peak process RSS (getrusage):

target rows before (target on build) after (target on probe)
5M 0.34 GB
20M 1.82 GB 0.39 GB

After the fix, peak memory is bounded by the source size and stays flat as the target grows (5M→20M: ~0.35 GB); before, it scales with the target. ~4.7× lower peak at 20M, and the gap widens with target size.

How the memory numbers were produced (single-node, no external deps)

I measured this with a throwaway #[ignore]d test in merge_insert.rs (not included in this PR — it allocates multiple GB and isn't a unit test). It writes a wide-row target, upserts a small source through MergeInsertJob::execute, and prints the process peak RSS via getrusage(RUSAGE_SELF). Reviewers can paste it in to reproduce:

#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn merge_repro_peak_rss() {
    use arrow_array::types::{Float64Type, UInt64Type};
    use lance_datagen::ByteCount;

    fn peak_rss_bytes() -> i64 {
        // ru_maxrss is KB on Linux, bytes on macOS.
        let mut usage: libc::rusage = unsafe { std::mem::zeroed() };
        unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut usage) };
        let maxrss = usage.ru_maxrss as i64;
        if cfg!(target_os = "macos") { maxrss } else { maxrss * 1024 }
    }

    let target_rows = std::env::var("REPRO_TARGET_ROWS")
        .ok().and_then(|s| s.parse::<usize>().ok()).unwrap_or(5_000_000);
    let source_rows = std::env::var("REPRO_SOURCE_ROWS")
        .ok().and_then(|s| s.parse::<usize>().ok()).unwrap_or(50_000);

    let test_dir = TempStrDir::default();
    let test_uri = &test_dir;

    // Wide rows so the target is genuinely large in memory: a u64 key plus
    // two 48-byte strings and a float.
    let batch_rows = 100_000usize;
    let target = lance_datagen::gen_batch()
        .with_seed(Seed::from(1))
        .col("key", array::step::<UInt64Type>())
        .col("v0", array::rand_utf8(ByteCount::from(48), false))
        .col("v1", array::rand_utf8(ByteCount::from(48), false))
        .col("v2", array::rand::<Float64Type>())
        .into_reader_rows(
            RowCount::from(batch_rows as u64),
            BatchCount::from((target_rows / batch_rows) as u32),
        );
    let ds = Arc::new(Dataset::write(target, test_uri, None).await.unwrap());

    // Source: a small set of existing keys (pure updates).
    let source = lance_datagen::gen_batch()
        .with_seed(Seed::from(2))
        .col("key", array::step::<UInt64Type>())
        .col("v0", array::rand_utf8(ByteCount::from(48), false))
        .col("v1", array::rand_utf8(ByteCount::from(48), false))
        .col("v2", array::rand::<Float64Type>())
        .into_reader_rows(RowCount::from(source_rows as u64), BatchCount::from(1));
    let source_stream = reader_to_stream(Box::new(source));

    let job = MergeInsertBuilder::try_new(ds.clone(), vec!["key".to_string()])
        .unwrap()
        .when_matched(WhenMatched::UpdateAll)
        .when_not_matched(WhenNotMatched::InsertAll)
        .try_build()
        .unwrap();

    let before = peak_rss_bytes();
    let (_ds, stats) = job.execute(source_stream).await.unwrap();
    let after = peak_rss_bytes();

    eprintln!(
        "target_rows={target_rows} source_rows={source_rows} updated={} \
         peak_rss_before={:.2}GB peak_rss_after={:.2}GB",
        stats.num_updated_rows, before as f64 / 1e9, after as f64 / 1e9,
    );
}

Run (each orientation in its own process so peak RSS is isolated):

REPRO_TARGET_ROWS=20000000 cargo test -p lance --lib --release \
    merge_repro_peak_rss -- --ignored --nocapture

To see the "before" number, temporarily flip the orientation back to scan_aliased.join(source_df_aliased, ...) (with the join type mirrored) and rerun.

Test

Adds test_plan_keeps_target_on_probe_side_at_scale, which exercises the production-representative Partitioned plan (>128K-row target) and asserts the target scan (LanceRead) is the probe (right) side of the HashJoinExec.

The existing toy-sized snapshot tests cannot catch a regression here: at small scale the join is CollectLeft and the optimizer freely swaps sides, so they would still pass with the operands reversed. The four snapshot tests are updated for the new (semantically identical) projection column order.

cargo test -p lance --lib merge_insert → 157 passed. cargo fmt --all + cargo clippy -p lance --lib --tests clean.

🤖 Generated with Claude Code

`create_plan` (the v2 merge_insert fast path used by both single-node
`execute` and uncommitted/distributed merges) built the join as
`target.join(source)`. When the target exceeds DataFusion's hash-join
collect threshold the join is planned as `mode=Partitioned`, whose build
(left) side is hashed and held in memory per partition. With the target on
the left this materialized the entire target per partition, which can
exhaust executor memory on large tables (observed: an 8x16GB Spark cluster
OOM-killed merging a 12-51M row source into a 64M row target, peak RSS
hitting the 16GB pod limit with only ~0.4GB on the JVM heap — the rest was
native Arrow hash memory).

Build the join as `source.join(target)` instead, so the (typically small)
source is the hash build side and the (potentially huge) target is streamed
as the probe side. Neither input carries comparable row statistics (the
source is a one-shot stream), so DataFusion's `should_swap_join_order`
leaves the operands as written rather than swapping the target back onto the
build side. The join output is semantically identical — every column is
referenced downstream by qualified name, not position — so this is purely a
memory/scheduling change.

Measured on the OOM repro (real 64M-row target, same 16GB pods): the merge
now completes where it previously OOM-killed, peak executor RSS dropped from
16.3GB to 6.7GB (20% source) and 11.0GB (80% source), and it ran ~2x faster
than the position-delta path.

Add `test_plan_keeps_target_on_probe_side_at_scale`, which exercises the
production-representative `Partitioned` plan (>128K-row target) and asserts
the target scan is the probe side. The existing toy-sized snapshot tests
cannot catch a regression here: at small scale the join is `CollectLeft` and
the optimizer freely swaps sides, so they pass with the operands reversed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sezruby sezruby marked this pull request as draft June 20, 2026 01:37
@codecov

codecov Bot commented Jun 20, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 93.22034% with 4 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
rust/lance/src/dataset/write/merge_insert.rs 93.22% 2 Missing and 2 partials ⚠️

📢 Thoughts on this report? Let us know!

@sezruby sezruby marked this pull request as ready for review June 20, 2026 02:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant