POC: CTE materialization for multi-referenced CTEs#22551
Conversation
Add support for materializing Common Table Expressions (CTEs) that are referenced more than once in a query. When a CTE ends in an expensive operation (Aggregate, Distinct, Window, or Union), the CTE is computed once and its results are cached in memory for reuse by multiple consumers. This implements a DuckDB-inspired heuristic: only materialize CTEs that end in expensive operations, avoiding regressions where predicate pushdown through the CTE would be more beneficial. The implementation uses Extension nodes (UserDefinedLogicalNode) to avoid modifying the core LogicalPlan enum, and introduces: - MaterializedCteProducer/Reader logical nodes - MaterializedCteExec/ReaderExec physical operators - MaterializedCtePlanner extension planner - Dependency-ordered execution for nested materialized CTEs Benchmarked on TPC-DS SF1 (10 iterations): - Q47: 2.85x speedup (401ms → 141ms) - Q57: 2.67x speedup (112ms → 42ms) - Q2: 1.58x speedup (101ms → 64ms) - Q74: 1.90x speedup (311ms → 164ms) Relates to: apache#17737
|
@nathanb9 Cool! Can you at-me when this is ready for review? |
… and return them from partition_statistics
… filters When a CTE ending in aggregate/distinct/window is referenced multiple times but each reference filters on a different literal value of the same column (e.g., d_moy=4 vs d_moy=5), inlining is better because the optimizer can push each filter through the aggregate, specializing each copy to process only a subset of the data. This fixes the TPC-DS Q39 regression (was 1.78x slower with materialization, now 1.01x — within noise). The detection handles: - Column-qualified filters above joins (inv1.d_moy=4, inv2.d_moy=5) - Simple constant arithmetic expressions (4+1 → 5) - Aliased group-by columns (d_year → syear) Also fixes a clippy warning: pass `statistics` by reference in `replace_materialized_cte_readers`. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The disjoint filter detection previously only looked at Filter nodes. When using JOIN ... ON syntax (vs comma-join with WHERE), the equality conditions like `a.d_moy = 1 AND b.d_moy = 2` live inside the Join node's filter field, not as a separate Filter node. This fixes a 4x regression on queries using JOIN ON with disjoint group-key predicates (e.g. benchmark Q4: inventory comparison across months). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…plan to be optimized in physical plan
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
run benchmarks tpcds |
|
Hi @nathanb9, thanks for the request (#22551 (comment)). Only whitelisted users can trigger benchmarks. Allowed users: Dandandan, Fokko, Jefffrey, Omega359, adriangb, alamb, asubiotto, brunal, buraksenn, cetra3, codephage2020, coderfender, comphead, erenavsarogullari, etseidl, friendlymatthew, gabotechs, geoffreyclaude, grtlr, haohuaijin, jonathanc-n, kevinjqliu, klion26, kosiew, kumarUjjawal, kunalsinghdadhwal, liamzwbao, mbutrovich, mkleen, mzabaluev, neilconway, rluvaton, sdf-jkl, timsaucer, xudong963, zhuqi-lucas. File an issue against this benchmark runner |
|
@neilconway Okay broke down into PR. |
Materialize CTEs and duplicate subplans — compute once, reuse many.
How It Works
SQL Planner — Detects multi-referenced CTEs and wraps them in
MaterializedCteProducer/Readernodes. RespectsMATERIALIZED/NOT MATERIALIZEDhints. Skips cheap non-volatile CTEs.Optimizer (
CommonSubplanEliminate) — Detects structurally identical subplans anywhere in the plan tree (not just CTEs) via subtree hashing. Wraps duplicates in the same Producer/Reader nodes. Catches repeated views, self-joins, and generated SQL patterns.Optimizer (
InlineCte) — Runs after filter pushdown. Removes materialization where it's not beneficial:force_materialized = true→ keep (explicit hint or CommonSubplan-detected)ref_count <= 1→ inline (single-ref or dead)base_table_refs > 2 AND refs * count > 10→ keep (expensive)Extension Planner (
MaterializedCtePlanner) — Converts logical nodes to physical. Creates one shared cache per materialized subplan and wires all scans to it.Execution (
MaterializeExec) — First partition triggerscollect_partitionedviaOnceAsync. All partitions await the shared future, then execute the continuation. Scans serve cached batches viaMemoryStream.Optimizer (
CteFilterPusher) (follow-up) — For CTEs that survive inlining, OR-combines filters from all readers and pushes the combined predicate into the CTE body. Community feedback welcome on scoping.Structs
Logical Nodes (
datafusion-expr)Physical Operators (
datafusion-physical-plan)General-purpose — reusable for CTEs, views, and duplicate subplans.
Optimizer Rules (
datafusion-optimizer)Extension Planner (
datafusion-core)Syntax / User Configuration
Session config:
Per-CTE SQL hints:
CommonSubplanEliminate (new)
Detects duplicate subplans via structural hashing of
LogicalPlansubtrees. Any subtree appearing 2+ times with sufficient cost (contains TableScan/Aggregate/Join/Window, >= 3 nodes) is wrapped in Producer/Reader for compute-once semantics.This benefits:
Currently does not help TPC-DS queries with:
Benchmark Results
TPC-DS SF1, 10 iterations, vs main:
All gains are from CTE materialization via the SQL planner.
CommonSubplanEliminatedoes not trigger on TPC-DS because repeated-table patterns there have differing filters/projections (not structurally identical subtrees).