feat: accept TableProvider write inputs for merge_insert and insert#7368
Draft
wjones127 wants to merge 3 commits into
Draft
feat: accept TableProvider write inputs for merge_insert and insert#7368wjones127 wants to merge 3 commits into
wjones127 wants to merge 3 commits into
Conversation
Make `Arc<dyn TableProvider>` the canonical internal write input behind ergonomic wrappers. Re-readable sources are now replayed across retries without spilling to disk, and materialized sources report statistics that let DataFusion choose the merge-join build side. - merge_insert: add `execute_provider` (canonical) and `execute_batches` (multi-partition `MemTable`); `execute(stream)` becomes a wrapper. Retries re-scan the provider instead of the removed `new_source_iter`/ `SpillStreamIter` replay layer. A one-shot stream spills only when retries are enabled; `spill_for_retry(false)` fails fast instead of buffering. - Plan against the provider directly so a MemTable/file source's statistics reach the join; stream sources keep the one-shot path (no stats lost, and the source's original error type is preserved). - InsertBuilder gains `execute_provider`/`execute_uncommitted_provider`. - Python routes materialized inputs (pa.Table, RecordBatch, DataFrame, ...) through the in-memory path; streams and scanners keep spilling. Re-scannable Python providers (pa.dataset.Dataset/Scanner) and parallel data-file writes over provider partitions remain follow-ups. Issue: lance-format#4583 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
wjones127
commented
Jun 18, 2026
Always plan against the source TableProvider directly (read_table), so every source — including spilled and one-shot streams — exposes its statistics to the merge join. The merge write node already requires a single-partition input, so the optimizer coalesces multi-partition providers; the previous target_partitions(1) hack and the scan_provider_directly branch are removed. Drop the provider_to_stream first-batch peek that preserved the source error's concrete type. Source errors are shared across join partitions by DataFusion (DataFusionError::Shared), so the type cannot be recovered, and no caller needs it — Python surfaces these errors by message. The error conversion now handles Shared (recursing when sole-owner, otherwise preserving the message under the execution category). Revert the InsertBuilder provider methods: they only adapt a provider back to a stream and add no parallelism until fragment fan-out exists. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Verify that merging an empty batch list is a no-op that leaves the target unchanged, exercising the empty-source partition and schema-fallback paths in batches_to_provider / batches_into_partitions. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implements #4583: make
Arc<dyn TableProvider>the canonical internal write input behind ergonomic wrappers, so re-readable sources replay across retries without spilling and materialized sources expose statistics to the merge join.Changes
execute_provider(canonical entry) andexecute_batches(multi-partitionMemTable);execute(stream)is now a thin wrapper. The retry loop re-scans the provider per attempt, replacing thenew_source_iter/SpillStreamIterreplay layer. A one-shot stream spills (memory→disk) only whenconflict_retries > 0;spill_for_retry(false)fails fast on contention instead of buffering the stream.execute_provider/execute_batchesplan against the provider viaread_table, so aMemTable/file source's exactnum_rows/total_byte_sizereach the optimizer and drive join build-side selection. Stream sources keep the one-shot path — they carry no statistics anyway, and this preserves the source's original error type.execute_provider/execute_uncommitted_providerfor input-shape parity (no retry/spill today).pa.Table,pa.RecordBatch, pandas/polars frames, dict/list-of-dict) route through the new in-memoryexecute_batchespath; streams, readers, scanners, and datasets keep the spilling path.Tests
execute_provider,execute_batches, source-stats-in-plan,spill_for_retry(false)fail-fast, andInsertBuilder::execute_provider; fulldataset::write::suite + spill tests green; clippy/fmt clean.Follow-ups (out of scope here)
pa.dataset.Dataset/Scanner(currently still spill — needs a Python-callbackTableProvider).🤖 Generated with Claude Code