perf(merge_insert): defer reading non-source columns via a logical late-materialization rule#7367
Draft
wjones127 wants to merge 7 commits into
Draft
Conversation
…lization rule
Partial-schema upserts fill missing (non-source) columns from the target
side of the join via `col("target.<field>")` above the join, so projection
pushdown keeps those columns in the target scan — reading wide columns for
every target row even when only a few rows match.
This adds a late-materialization physical optimizer rule that drops those
carried-through columns from the target scan and re-fetches them by
`_rowaddr` with a `TakeExec` inserted above the join. The parent projection
(including the `__action` expression) is re-indexed by name onto the take,
so the plan's output schema is unchanged. The rule is applied only at the
merge_insert call site, not the session-wide optimizer, to bound its blast
radius. A width/storage gate (reusing the scanner's late-materialization
heuristic) only defers columns wide enough that re-fetching beats scanning.
`TakeExec` is now null-tolerant: outer-join insert rows have a null
`target._rowaddr`, so the take fetches only the non-null addresses and
scatters results back with NULLs at the insert positions. The taken fields
are marked nullable when the take sits above an outer join.
Read-side only; write-side amplification stays on lance-format#4193.
Closes lance-format#7363
8d68606 to
4a31cea
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Prototype a logical alternative to the physical `LateMaterializeOverReducingJoin` rule. A `LateTakeNode` (`UserDefinedLogicalNodeCore`) is inserted above a row-reducing join and advertises an output schema of "join columns minus deferred, plus the deferred columns appended". Its `necessary_children_exprs` reports that it does not need the deferred columns from its child (only `_rowaddr`), so DataFusion's stock `OptimizeProjections` prunes them from the scan automatically — no manual index remapping, and downstream references resolve the deferred columns from the take by name. `LateMaterializeJoin` (a logical `OptimizerRule`) detects candidates by qualifier rather than physical side, gates on actual usage above the join and the existing width heuristic, and bails on join filters or duplicate join-output names. `LateTakePlanner` lowers the node to the existing null-tolerant `TakeExec`. This is generic and unit-tested in isolation; wiring it into `merge_insert::create_plan` (and retiring the physical rule) is a follow-up. The physical rule and its tests are left untouched. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…cal rule Wire the generic logical `LateMaterializeJoin` rule + `LateTakePlanner` into `merge_insert::create_plan`, replacing the physical `LateMaterializeOverReducingJoin` rule (now deleted). The rule runs between two `session_state.optimize` calls: the first keeps the wide non-source columns in the target scan, the rule inserts a `LateTake` above the join, and the second prunes those columns from the scan via projection pushdown. The merge_insert join is an equi-join on a shared key name, which surfaced two issues the Stage-1 unit tests (distinct key names) did not: - `collect_referenced_columns` now skips a join's own on-clause expressions. Those keys are consumed by the join, not above it, so counting them made both sides' same-named keys look "used above the join". - The rule inserts a normalizing projection between the join and the take, selecting only the columns carried past the join (deferred columns dropped). Without it the opaque take node blocks the physical planner from giving `HashJoinExec` a tight output projection, so both same-named keys reach `TakeExec` as duplicate arrow names (qualifiers are erased at the physical level) and the lance schema build rejects the duplicate. The projection is also where scan narrowing now happens. `is_wide_column` moves into `late_take.rs` as a private fn. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
wjones127
commented
Jun 18, 2026
Address review on the logical late-materialization node: - The node identified deferred columns by name alone, so a same-named column from another relation would be wrongly dropped from the passthrough. Match on (qualifier, name) instead, in both `build_output_schema` and `necessary_children_exprs`. Adds a unit test. - Strip use-case/issue-specific context from the module doc and `is_wide_column` so the rule reads as generic. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Self-review polish: - Add a doc comment to the public `LateTakeNode::try_new` constructor. - Note on the `PartialOrd` impl why it orders a subset of the equality fields. - Add a unit test for `is_wide_column` covering the local vs cloud byte thresholds and the blob exclusion (previously only the local variable-width path was exercised). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…descent Add two tests flagged in self-review: - `test_kept_column_name_conflict_not_deferred`: two non-deferred columns from different relations sharing a name (the kept-vs-kept bail path, distinct from the deferred-name collision already covered). - `test_find_lance_dataset_descends_single_input_only`: the documented invariant that a multi-input join never surfaces a nested scan as its relation. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Collect deferral candidates by name and derive `deferred_columns` by iterating the dataset schema in order, dropping the index/sort tuple. - Trim the normalizing-projection comment. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Partial-schema upserts fill missing (non-source) columns from the target side of the join via
col("target.<field>"), so projection pushdown keeps those columns in the target scan — reading wide columns for every target row even when only a few rows match.This defers those reads to after the join shrinks the row count. It is done with a generic logical optimizer rule rather than a physical one, so the deferral is driven by which relation a column comes from (qualifier) and the scan-narrowing falls out of DataFusion's existing
OptimizeProjections— no positional index remapping or hardcoded join side.How it works
A new logical extension node,
LateTakeNode, is inserted above the row-reducing join:LateMaterializeJoin(the rule) finds a join with a Lance scan side that emits_rowaddr, picks the wide data columns that are only carried through (not join keys, not used in a join filter), and wraps the join. A width/storage gate (is_wide_column) only defers columns wide enough that re-fetching by address beats scanning them for every row.necessary_children_exprsthat it does not need the deferred columns from its child, soOptimizeProjectionsprunes them from the scan while keeping_rowaddr. Downstream references resolve the appended columns by name — the plan's output schema is unchanged.LateTakePlannerlowers the node to the existing physicalTakeExec.session_state.optimizepasses), not the session-wide optimizer, to bound its blast radius.A normalizing projection is inserted between the join and the take. Without it the opaque extension node forces
HashJoinExecto emit its fullleft ++ rightoutput, which for an equi-join on a shared key name yields two arrow fields namedkey(qualifiers are erased at the physical level) — a duplicateTakeExeccannot merge by name. The projection keeps only the carried columns, so the physical join gets a tight output projection.TakeExecis now null-tolerant: outer-join insert rows have a nulltarget._rowaddr, so the take fetches only the non-null addresses and scatters results back with NULLs at the insert positions. The taken fields are marked nullable when the take sits above an outer join.Tests
late_take.rsunit tests for the rule and node in isolation: wide-column deferred (scan narrowed, output schema unchanged), narrow/join-key/duplicate-name columns not deferred, qualifier-aware matching, outer-join nullable fields,necessary_children_exprs, theis_wide_columngate (local vs cloud thresholds, blob exclusion), and end-to-end execution parity vs the un-deferred plan (inner + outer with null_rowaddrscatter).merge_insertsubcols tests: the targetLanceReadexcludes the deferred column and aTakeappears above the join; on a selective update the wide column is materialized only for the matched rows (Takeoutput_rows = matches,LanceReadstill visits all rows); anUpdateIfcondition on the deferred target column evaluates correctly; multi-fragment, multi-wide-column, and scalar-index cases pass.TakeExecnull-address unit tests.Out of scope
Write-side amplification (delete + append rewrites all columns of matched rows) stays on #4193.
Closes #7363
🤖 Generated with Claude Code