diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto index 352cd92f4c19..8c2917d09686 100644 --- a/pinot-common/src/main/proto/plan.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -236,6 +236,11 @@ message UnnestNode { repeated int32 elementIndexes = 3; // Optional absolute output index of the ordinality column in the stage schema. optional int32 ordinalityIndex = 4; + // When prunedPassthrough is true, the input-row column indexes copied into the output, in output order. + // When absent/empty (old brokers), the operator copies the whole input row (legacy behavior). + repeated int32 passthroughInputIndexes = 5; + // Whether the output schema was pruned to only the input columns referenced downstream. Absent => false => legacy. + bool prunedPassthrough = 6; } enum WindowFrameType { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java index 5e824af94f62..c93522a16e30 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -303,6 +304,43 @@ public void testOrdinalityAliasVariation(boolean useMultiStageQueryEngine) assertEquals(sum, 6 * getCountStarResult()); } + @DataProvider(name = "columnPruningQueries") + public Object[][] columnPruningQueries() { + String table = getTableName(); + return new Object[][]{ + // single array, source array dropped + {String.format("SELECT intCol, u.elem FROM %s CROSS JOIN UNNEST(stringArrayCol) AS u(elem) " + + "ORDER BY intCol, u.elem", table)}, + // select only the unnested element (zero passthrough) + {String.format("SELECT u.elem FROM %s CROSS JOIN UNNEST(stringArrayCol) AS u(elem) ORDER BY u.elem", table)}, + // WITH ORDINALITY (ordinality index recomputed against pruned output) + {String.format("SELECT intCol, u.elem, u.idx FROM %s CROSS JOIN UNNEST(stringArrayCol) WITH ORDINALITY " + + "AS u(elem, idx) ORDER BY intCol, u.idx", table)}, + // multiple arrays, both source arrays dropped + {String.format("SELECT intCol, u.longValue, u.stringValue FROM %s " + + "CROSS JOIN UNNEST(longArrayCol, stringArrayCol) AS u(longValue, stringValue) " + + "ORDER BY intCol, u.longValue", table)}, + // source array also selected (must be retained) + {String.format("SELECT intCol, stringArrayCol, u.elem FROM %s CROSS JOIN UNNEST(stringArrayCol) AS u(elem) " + + "ORDER BY intCol, u.elem", table)} + }; + } + + @Test(dataProvider = "columnPruningQueries") + public void testCrossJoinUnnestColumnPruningMatchesDefault(String select) + throws Exception { + // Pruning input/passthrough columns from the UNNEST output must not change results. Verified end-to-end on the + // multi-stage engine (the only engine that supports UNNEST CROSS JOIN), with the flag toggled per query. + setUseMultiStageQueryEngine(true); + JsonNode defaultRows = postQuery(select).get("resultTable").get("rows"); + JsonNode prunedRows = postQuery("SET unnestColumnPruning=true; " + select).get("resultTable").get("rows"); + + assertNotNull(defaultRows); + assertNotNull(prunedRows); + assertEquals(prunedRows.toString(), defaultRows.toString(), + "Column pruning must not change UNNEST results for: " + select); + } + @Override public String getTableName() { return DEFAULT_TABLE_NAME; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 83b8e4a5f20b..40cc78d73333 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -519,7 +519,7 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex return pinotDispatchPlanner.createDispatchableSubPlanV2(plan.getLeft(), plan.getRight()); } SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker, useSpools(plannerContext.getOptions()), - _envConfig.defaultHashFunction()); + _envConfig.defaultHashFunction(), pruneUnnestColumns(plannerContext.getOptions())); PinotDispatchPlanner pinotDispatchPlanner = new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), _envConfig.getRequestId(), _envConfig.getTableCache()); @@ -702,6 +702,16 @@ public boolean useSpools(Map options) { return Boolean.parseBoolean(optionValue); } + /** + * Whether to prune unused input (passthrough) columns from the UNNEST output. Defaults to {@code false} because a + * broker emitting the smaller schema cannot be honored by an un-upgraded server; enable only once all servers + * support it (see {@link CommonConstants.Broker.Request.QueryOptionKey#UNNEST_COLUMN_PRUNING}). + */ + public boolean pruneUnnestColumns(Map options) { + String optionValue = options.get(CommonConstants.Broker.Request.QueryOptionKey.UNNEST_COLUMN_PRUNING); + return Boolean.parseBoolean(optionValue); + } + @Value.Immutable public interface Config { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java index 1e2ddca94910..ee9df3177712 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java @@ -47,6 +47,7 @@ import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.spi.utils.CommonConstants; /** @@ -61,8 +62,9 @@ private PinotLogicalQueryPlanner() { */ public static SubPlan makePlan(RelRoot relRoot, @Nullable TransformationTracker.Builder tracker, boolean useSpools, - String hashFunction) { - PlanNode rootNode = new RelToPlanNodeConverter(tracker, hashFunction).toPlanNode(relRoot.rel); + String hashFunction, boolean pruneUnnestColumns) { + PlanNode rootNode = new RelToPlanNodeConverter(tracker, hashFunction, + !CommonConstants.Helix.DEFAULT_ENABLE_CASE_INSENSITIVE, pruneUnnestColumns).toPlanNode(relRoot.rel); PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker, useSpools, hashFunction); return new SubPlan(rootFragment, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index cd737896c737..849c8f75b8f2 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -111,6 +112,8 @@ public final class RelToPlanNodeConverter { private final TransformationTracker.Builder _tracker; private final String _hashFunction; private final boolean _caseSensitive; + // When true, UNNEST output is pruned to drop input (passthrough) columns not referenced downstream. Default off. + private final boolean _pruneUnnestColumns; public RelToPlanNodeConverter(@Nullable TransformationTracker.Builder tracker, String hashFunction) { @@ -119,9 +122,15 @@ public RelToPlanNodeConverter(@Nullable TransformationTracker.Builder tracker, String hashFunction, boolean caseSensitive) { + this(tracker, hashFunction, caseSensitive, false); + } + + public RelToPlanNodeConverter(@Nullable TransformationTracker.Builder tracker, + String hashFunction, boolean caseSensitive, boolean pruneUnnestColumns) { _tracker = tracker; _hashFunction = hashFunction; _caseSensitive = caseSensitive; + _pruneUnnestColumns = pruneUnnestColumns; } /** @@ -227,6 +236,9 @@ private UnnestNode convertLogicalUncollect(Uncollect node) { convertInputs(node.getInputs()), arrayExprs, tableFunctionContext); } + // NOTE: Besides the main dispatch, this is also invoked from tryPruneUnnestPassthrough. Its result is always used + // (either directly or rewritten into a pruned node), so it must remain free of non-idempotent side effects beyond + // converting/tracking its own subtree. private BasePlanNode convertLogicalCorrelate(LogicalCorrelate node) { // Pattern: Correlate(left, Uncollect(Project(correlatedFields...))) RelNode right = node.getRight(); @@ -709,10 +721,193 @@ private AggregateNode convertLogicalAggregate(PinotLogicalAggregate node) { } private ProjectNode convertLogicalProject(LogicalProject node) { + if (_pruneUnnestColumns) { + ProjectNode pruned = tryPruneUnnestPassthrough(node); + if (pruned != null) { + return pruned; + } + } return new ProjectNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()), convertInputs(node.getInputs()), RexExpressionUtils.fromRexNodes(node.getProjects())); } + /** + * When a Project sits directly above a CROSS JOIN UNNEST (a {@link LogicalCorrelate} over {@link Uncollect}), prunes + * the input (passthrough) columns that the Project does not reference - notably the unnested source array - from the + * {@link UnnestNode} output, so the operator does not copy them into every exploded row. The Project's own + * {@code InputRef}s are remapped from the full correlate-output index space to the pruned space. + *

+ * Returns {@code null} (caller falls back to the default conversion, preserving all passthrough columns) when the + * pattern is not recognized, the layout is not the standard one, or pruning would be a no-op. + */ + @Nullable + private ProjectNode tryPruneUnnestPassthrough(LogicalProject node) { + List inputs = node.getInputs(); + if (inputs.size() != 1 || !(inputs.get(0) instanceof LogicalCorrelate)) { + return null; + } + LogicalCorrelate correlate = (LogicalCorrelate) inputs.get(0); + RelNode right = correlate.getRight(); + // Only handle the UNNEST pattern, and only when no correlate-filter wraps the Uncollect. With a wrapping filter, + // convertLogicalCorrelate emits a FilterNode, so the Project no longer sits directly on the UnnestNode. + if (findUncollect(right) == null || findCorrelateFilter(right) != null) { + // Cheap structural check failed before any conversion: let the caller fall back to the default conversion. + return null; + } + // Convert the correlate exactly once. From here we always return a ProjectNode wrapping this converted input, so + // the caller never re-runs the (side-effecting, tracker-registering) conversion of the left subtree. + BasePlanNode converted = convertLogicalCorrelate(correlate); + if (converted instanceof UnnestNode) { + ProjectNode pruned = buildPrunedProject(node, (UnnestNode) converted, correlate); + if (pruned != null) { + return pruned; + } + } + // Not prunable (unexpected layout, no-op, or non-UnnestNode): wrap the already-converted input in a default Project. + if (_tracker != null) { + _tracker.trackCreation(correlate, converted); + } + return new ProjectNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()), + new ArrayList<>(List.of(converted)), RexExpressionUtils.fromRexNodes(node.getProjects())); + } + + /** + * Builds the pruned Project-over-UnnestNode for the standard CROSS JOIN UNNEST layout, or returns {@code null} when + * the layout is non-standard or pruning would be a no-op (all passthrough columns are referenced downstream). On + * success it registers the new UnnestNode with the tracker against {@code correlate}. + */ + @Nullable + private ProjectNode buildPrunedProject(LogicalProject node, UnnestNode full, RelNode correlate) { + if (full.isPrunedPassthrough()) { + return null; + } + DataSchema fullSchema = full.getDataSchema(); + int outSize = fullSchema.size(); + int numArrays = full.getArrayExprs().size(); + boolean withOrdinality = full.isWithOrdinality(); + int leftCount = outSize - numArrays - (withOrdinality ? 1 : 0); + if (leftCount <= 0) { + // No passthrough columns to prune. + return null; + } + // Only handle the clean layout produced by the standard path: passthrough columns occupy [0, leftCount) and the + // element (then ordinality) columns occupy the trailing region. Fall back otherwise. + List elementIndexes = full.getElementIndexes(); + if (elementIndexes.size() != numArrays) { + return null; + } + for (int k = 0; k < numArrays; k++) { + if (elementIndexes.get(k) != leftCount + k) { + return null; + } + } + if (withOrdinality && full.getOrdinalityIndex() != leftCount + numArrays) { + return null; + } + // Collect referenced passthrough (left) columns from the project expressions. + List projects = RexExpressionUtils.fromRexNodes(node.getProjects()); + boolean[] referenced = new boolean[leftCount]; + for (RexExpression project : projects) { + collectReferencedColumns(project, leftCount, referenced); + } + List retained = new ArrayList<>(); + for (int i = 0; i < leftCount; i++) { + if (referenced[i]) { + retained.add(i); + } + } + if (retained.size() == leftCount) { + // All passthrough columns are used downstream; pruning would be a no-op. + return null; + } + int numRetained = retained.size(); + // Build the old (full correlate output) -> new (pruned output) index map. + int[] oldToNew = new int[outSize]; + Arrays.fill(oldToNew, -1); + for (int i = 0; i < numRetained; i++) { + oldToNew[retained.get(i)] = i; + } + for (int k = 0; k < numArrays; k++) { + oldToNew[leftCount + k] = numRetained + k; + } + if (withOrdinality) { + oldToNew[leftCount + numArrays] = numRetained + numArrays; + } + // Build the pruned output schema: retained passthrough columns, then element columns, then ordinality. + int newSize = numRetained + numArrays + (withOrdinality ? 1 : 0); + String[] columnNames = new String[newSize]; + ColumnDataType[] columnTypes = new ColumnDataType[newSize]; + for (int i = 0; i < numRetained; i++) { + int oldIdx = retained.get(i); + columnNames[i] = fullSchema.getColumnName(oldIdx); + columnTypes[i] = fullSchema.getColumnDataType(oldIdx); + } + for (int k = 0; k < numArrays; k++) { + columnNames[numRetained + k] = fullSchema.getColumnName(leftCount + k); + columnTypes[numRetained + k] = fullSchema.getColumnDataType(leftCount + k); + } + if (withOrdinality) { + columnNames[numRetained + numArrays] = fullSchema.getColumnName(leftCount + numArrays); + columnTypes[numRetained + numArrays] = fullSchema.getColumnDataType(leftCount + numArrays); + } + DataSchema prunedSchema = new DataSchema(columnNames, columnTypes); + // Recompute element/ordinality indexes against the pruned (smaller) output. + List newElementIndexes = new ArrayList<>(numArrays); + for (int k = 0; k < numArrays; k++) { + newElementIndexes.add(numRetained + k); + } + int newOrdinalityIndex = withOrdinality ? numRetained + numArrays : UnnestNode.UNSPECIFIED_INDEX; + UnnestNode.TableFunctionContext context = new UnnestNode.TableFunctionContext(withOrdinality, newElementIndexes, + newOrdinalityIndex, retained, true); + UnnestNode prunedUnnest = new UnnestNode(DEFAULT_STAGE_ID, prunedSchema, full.getNodeHint(), full.getInputs(), + full.getArrayExprs(), context); + if (_tracker != null) { + _tracker.trackCreation(correlate, prunedUnnest); + } + // Remap the project's input refs from the full correlate-output space to the pruned space. + List remappedProjects = new ArrayList<>(projects.size()); + for (RexExpression project : projects) { + remappedProjects.add(remapInputRefs(project, oldToNew)); + } + return new ProjectNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()), + new ArrayList<>(List.of(prunedUnnest)), remappedProjects); + } + + private static void collectReferencedColumns(RexExpression expr, int leftCount, boolean[] referenced) { + if (expr instanceof RexExpression.InputRef) { + int idx = ((RexExpression.InputRef) expr).getIndex(); + if (idx >= 0 && idx < leftCount) { + referenced[idx] = true; + } + } else if (expr instanceof RexExpression.FunctionCall) { + for (RexExpression op : ((RexExpression.FunctionCall) expr).getFunctionOperands()) { + collectReferencedColumns(op, leftCount, referenced); + } + } + } + + private static RexExpression remapInputRefs(RexExpression expr, int[] oldToNew) { + if (expr instanceof RexExpression.InputRef) { + int idx = ((RexExpression.InputRef) expr).getIndex(); + int mapped = (idx >= 0 && idx < oldToNew.length) ? oldToNew[idx] : -1; + // mapped is always >= 0 here: passthrough columns referenced by the project were retained, and element/ + // ordinality columns are always mapped. Guard defensively to avoid silently corrupting indexes. + Preconditions.checkState(mapped >= 0, "Unexpected unmapped InputRef index %s while pruning UNNEST passthrough", + idx); + return new RexExpression.InputRef(mapped); + } else if (expr instanceof RexExpression.FunctionCall) { + RexExpression.FunctionCall fc = (RexExpression.FunctionCall) expr; + List ops = fc.getFunctionOperands(); + List rewritten = new ArrayList<>(ops.size()); + for (RexExpression op : ops) { + rewritten.add(remapInputRefs(op, oldToNew)); + } + return new RexExpression.FunctionCall(fc.getDataType(), fc.getFunctionName(), rewritten); + } else { + return expr; + } + } + private FilterNode convertLogicalFilter(LogicalFilter node) { return new FilterNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()), convertInputs(node.getInputs()), RexExpressionUtils.fromRexNode(node.getCondition())); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/UnnestNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/UnnestNode.java index 1135de4312c1..786659a208f0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/UnnestNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/UnnestNode.java @@ -106,6 +106,22 @@ public int getOrdinalityIndex() { return _tableFunctionContext.getOrdinalityIndex(); } + /** + * Input-row column indexes copied into the output, in output order. Only meaningful when + * {@link #isPrunedPassthrough()} is {@code true}; otherwise empty and the operator copies the whole input row. + */ + public List getPassthroughInputIndexes() { + return _tableFunctionContext.getPassthroughInputIndexes(); + } + + /** + * Whether the output schema has been pruned to only the input columns referenced downstream (plus the element and + * ordinality columns). When {@code false}, the operator preserves legacy behavior of copying the whole input row. + */ + public boolean isPrunedPassthrough() { + return _tableFunctionContext.isPrunedPassthrough(); + } + @Override public String explain() { return "UNNEST"; @@ -157,11 +173,21 @@ public static final class TableFunctionContext { private final boolean _withOrdinality; private final List _elementIndexes; private final int _ordinalityIndex; + private final List _passthroughInputIndexes; + private final boolean _prunedPassthrough; public TableFunctionContext(boolean withOrdinality, List elementIndexes, int ordinalityIndex) { + // Legacy/default: no passthrough pruning. The operator copies the whole input row into the output. + this(withOrdinality, elementIndexes, ordinalityIndex, List.of(), false); + } + + public TableFunctionContext(boolean withOrdinality, List elementIndexes, int ordinalityIndex, + List passthroughInputIndexes, boolean prunedPassthrough) { _withOrdinality = withOrdinality; _elementIndexes = List.copyOf(elementIndexes); _ordinalityIndex = ordinalityIndex; + _passthroughInputIndexes = List.copyOf(passthroughInputIndexes); + _prunedPassthrough = prunedPassthrough; } public boolean isWithOrdinality() { @@ -176,8 +202,17 @@ public int getOrdinalityIndex() { return _ordinalityIndex; } + public List getPassthroughInputIndexes() { + return _passthroughInputIndexes; + } + + public boolean isPrunedPassthrough() { + return _prunedPassthrough; + } + public TableFunctionContext copy() { - return new TableFunctionContext(_withOrdinality, _elementIndexes, _ordinalityIndex); + return new TableFunctionContext(_withOrdinality, _elementIndexes, _ordinalityIndex, _passthroughInputIndexes, + _prunedPassthrough); } @Override @@ -190,12 +225,15 @@ public boolean equals(Object o) { } TableFunctionContext that = (TableFunctionContext) o; return _withOrdinality == that._withOrdinality && _ordinalityIndex == that._ordinalityIndex - && Objects.equals(_elementIndexes, that._elementIndexes); + && _prunedPassthrough == that._prunedPassthrough + && Objects.equals(_elementIndexes, that._elementIndexes) + && Objects.equals(_passthroughInputIndexes, that._passthroughInputIndexes); } @Override public int hashCode() { - return Objects.hash(_withOrdinality, _elementIndexes, _ordinalityIndex); + return Objects.hash(_withOrdinality, _elementIndexes, _ordinalityIndex, _passthroughInputIndexes, + _prunedPassthrough); } } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java index 7f2144124d36..c91e46d8f5fb 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java @@ -240,8 +240,16 @@ private static UnnestNode deserializeUnnestNode(Plan.PlanNode protoNode) { int ordIdx = protoUnnestNode.hasOrdinalityIndex() ? protoUnnestNode.getOrdinalityIndex() : UnnestNode.UNSPECIFIED_INDEX; + // Passthrough pruning metadata. Absent on plans produced by older brokers, in which case prunedPassthrough is + // false and the operator copies the whole input row (legacy behavior). + List passthroughInputIndexes = new ArrayList<>(); + for (int idx : protoUnnestNode.getPassthroughInputIndexesList()) { + passthroughInputIndexes.add(idx); + } + UnnestNode.TableFunctionContext context = - new UnnestNode.TableFunctionContext(protoUnnestNode.getWithOrdinality(), elementIndexes, ordIdx); + new UnnestNode.TableFunctionContext(protoUnnestNode.getWithOrdinality(), elementIndexes, ordIdx, + passthroughInputIndexes, protoUnnestNode.getPrunedPassthrough()); return new UnnestNode(protoNode.getStageId(), extractDataSchema(protoNode), extractNodeHint(protoNode), extractInputs(protoNode), arrayExprs, context); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java index 99665d5c29bc..f2283ed09f51 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java @@ -287,7 +287,9 @@ public Void visitUnnest(UnnestNode node, Plan.PlanNode.Builder builder) { .addAllArrayExprs(convertExpressions(node.getArrayExprs())) .setWithOrdinality(context.isWithOrdinality()) .addAllElementIndexes(context.getElementIndexes()) - .setOrdinalityIndex(context.getOrdinalityIndex()); + .setOrdinalityIndex(context.getOrdinalityIndex()) + .addAllPassthroughInputIndexes(context.getPassthroughInputIndexes()) + .setPrunedPassthrough(context.isPrunedPassthrough()); builder.setUnnestNode(unnestNodeBuilder.build()); return null; } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/UnnestNodeTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/UnnestNodeTest.java index 0396d2dd284c..6045cd8aad02 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/UnnestNodeTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/UnnestNodeTest.java @@ -185,4 +185,36 @@ public void testEqualsAndHashCode() { Assert.assertEquals(node1.hashCode(), node2.hashCode()); Assert.assertNotEquals(node1, node3); } + + @Test + public void testDefaultPassthroughIsNotPruned() { + DataSchema dataSchema = new DataSchema(new String[]{"id", "elem"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); + UnnestNode node = new UnnestNode(0, dataSchema, PlanNode.NodeHint.EMPTY, + new ArrayList<>(), List.of(new RexExpression.InputRef(1)), List.of("elem"), false, null); + + // Legacy/default construction: no pruning, empty passthrough map. + Assert.assertFalse(node.isPrunedPassthrough()); + Assert.assertTrue(node.getPassthroughInputIndexes().isEmpty()); + } + + @Test + public void testPrunedPassthroughMetadata() { + DataSchema dataSchema = new DataSchema(new String[]{"id", "elem"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); + UnnestNode.TableFunctionContext context = + new UnnestNode.TableFunctionContext(false, List.of(1), UnnestNode.UNSPECIFIED_INDEX, List.of(0), true); + UnnestNode node = new UnnestNode(0, dataSchema, PlanNode.NodeHint.EMPTY, + new ArrayList<>(), List.of(new RexExpression.InputRef(1)), context); + + Assert.assertTrue(node.isPrunedPassthrough()); + Assert.assertEquals(node.getPassthroughInputIndexes(), List.of(0)); + + // A pruned node must not equal an otherwise-identical non-pruned node. + UnnestNode legacy = new UnnestNode(0, dataSchema, PlanNode.NodeHint.EMPTY, + new ArrayList<>(), List.of(new RexExpression.InputRef(1)), + new UnnestNode.TableFunctionContext(false, List.of(1), UnnestNode.UNSPECIFIED_INDEX)); + Assert.assertNotEquals(node, legacy); + Assert.assertNotEquals(node.hashCode(), legacy.hashCode()); + } } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/serde/PlanNodeSerDeTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/serde/PlanNodeSerDeTest.java index 8fc02e78122f..98745869d709 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/serde/PlanNodeSerDeTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/serde/PlanNodeSerDeTest.java @@ -18,10 +18,16 @@ */ package org.apache.pinot.query.planner.serde; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.query.QueryEnvironmentTestBase; +import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.UnnestNode; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -38,4 +44,38 @@ public void testQueryStagePlanSerDe(String query) { assertEquals(stagePlan, deserializedStagePlan); } } + + @Test + public void testPrunedUnnestNodeSerDe() { + // Round-trips the passthrough-pruning wire fields (passthroughInputIndexes, prunedPassthrough). A non-sequential + // index list plus WITH ORDINALITY exercise the proto repeated/bool fields and ordering. + DataSchema dataSchema = new DataSchema(new String[]{"col0", "col2", "elem", "ord"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT}); + UnnestNode.TableFunctionContext context = + new UnnestNode.TableFunctionContext(true, List.of(2), 3, List.of(0, 2), true); + UnnestNode node = new UnnestNode(1, dataSchema, PlanNode.NodeHint.EMPTY, new ArrayList<>(), + List.of(new RexExpression.InputRef(1)), context); + + PlanNode deserialized = PlanNodeDeserializer.process(PlanNodeSerializer.process(node)); + assertEquals(deserialized, node); + UnnestNode deserializedUnnest = (UnnestNode) deserialized; + assertEquals(deserializedUnnest.getPassthroughInputIndexes(), List.of(0, 2)); + assertEquals(deserializedUnnest.isPrunedPassthrough(), true); + assertEquals(deserializedUnnest.getOrdinalityIndex(), 3); + } + + @Test + public void testLegacyUnnestNodeSerDe() { + // A non-pruned UnnestNode must round-trip with prunedPassthrough=false and an empty passthrough map (the wire + // default an old broker produces). + DataSchema dataSchema = new DataSchema(new String[]{"id", "arr", "elem"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.INT_ARRAY, ColumnDataType.INT}); + UnnestNode node = new UnnestNode(1, dataSchema, PlanNode.NodeHint.EMPTY, new ArrayList<>(), + new RexExpression.InputRef(1), "elem", false, null); + + UnnestNode deserialized = (UnnestNode) PlanNodeDeserializer.process(PlanNodeSerializer.process(node)); + assertEquals(deserialized, node); + assertEquals(deserialized.isPrunedPassthrough(), false); + assertEquals(deserialized.getPassthroughInputIndexes(), List.of()); + } } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/UnnestSqlPlannerTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/UnnestSqlPlannerTest.java index 5a4d1d3d1df1..1901fbb1c0c3 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/UnnestSqlPlannerTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/UnnestSqlPlannerTest.java @@ -85,6 +85,115 @@ public void testUnnestSqlPlans(String sql, int expectedArrayExprCount, boolean w assertOrdinality(unnestNode, withOrdinality); } + @Test + public void testUnnestColumnPruningDropsSourceArray() { + // With pruning enabled, the unnested source array (mcol1) must not be carried in the UnnestNode output. + String sql = "SET unnestColumnPruning=true; " + + "SELECT e.col1, u.s FROM e CROSS JOIN UNNEST(e.mcol1) AS u(s)"; + DispatchableSubPlan subPlan = _queryEnvironment.planQuery(sql); + List unnestNodes = findUnnestNodes(subPlan); + Assert.assertEquals(unnestNodes.size(), 1); + UnnestNode unnestNode = unnestNodes.get(0); + + Assert.assertTrue(unnestNode.isPrunedPassthrough(), "UnnestNode should be pruned"); + List columns = Arrays.asList(unnestNode.getDataSchema().getColumnNames()); + Assert.assertFalse(columns.contains("mcol1"), + "Source array column should be pruned from UnnestNode output, found: " + columns); + Assert.assertTrue(columns.contains("col1"), "Referenced passthrough column col1 should be retained: " + columns); + Assert.assertEquals(unnestNode.getDataSchema().size(), 2, "Expected [col1, element] only: " + columns); + // col1 is the only retained passthrough column (input index 0); the element lands right after it. + Assert.assertEquals(unnestNode.getPassthroughInputIndexes(), List.of(0)); + Assert.assertEquals(unnestNode.getElementIndexes(), List.of(1)); + } + + @Test + public void testUnnestColumnPruningDisabledByDefault() { + // Without the flag, behavior is unchanged: the source array is still part of the UnnestNode output. + String sql = "SELECT e.col1, u.s FROM e CROSS JOIN UNNEST(e.mcol1) AS u(s)"; + DispatchableSubPlan subPlan = _queryEnvironment.planQuery(sql); + List unnestNodes = findUnnestNodes(subPlan); + Assert.assertEquals(unnestNodes.size(), 1); + UnnestNode unnestNode = unnestNodes.get(0); + + Assert.assertFalse(unnestNode.isPrunedPassthrough(), "UnnestNode should not be pruned by default"); + List columns = Arrays.asList(unnestNode.getDataSchema().getColumnNames()); + Assert.assertTrue(columns.contains("mcol1"), + "Without pruning, the source array column should remain in the UnnestNode output: " + columns); + } + + @Test + public void testUnnestColumnPruningRetainsSelectedSourceArray() { + // When the user also selects the source array, it must be retained even with pruning enabled. + String sql = "SET unnestColumnPruning=true; " + + "SELECT e.col1, e.mcol1, u.s FROM e CROSS JOIN UNNEST(e.mcol1) AS u(s)"; + DispatchableSubPlan subPlan = _queryEnvironment.planQuery(sql); + List unnestNodes = findUnnestNodes(subPlan); + Assert.assertEquals(unnestNodes.size(), 1); + UnnestNode unnestNode = unnestNodes.get(0); + + List columns = Arrays.asList(unnestNode.getDataSchema().getColumnNames()); + Assert.assertTrue(columns.contains("mcol1"), + "Explicitly selected source array must be retained: " + columns); + Assert.assertTrue(columns.contains("col1"), columns.toString()); + } + + @Test + public void testUnnestColumnPruningWithOrdinality() { + // The ordinality index must be recomputed against the pruned (smaller) output. + String sql = "SET unnestColumnPruning=true; " + + "SELECT e.col1, u.s, u.ord FROM e CROSS JOIN UNNEST(e.mcol1) WITH ORDINALITY AS u(s, ord)"; + DispatchableSubPlan subPlan = _queryEnvironment.planQuery(sql); + List unnestNodes = findUnnestNodes(subPlan); + Assert.assertEquals(unnestNodes.size(), 1); + UnnestNode unnestNode = unnestNodes.get(0); + + Assert.assertTrue(unnestNode.isWithOrdinality()); + Assert.assertTrue(unnestNode.isPrunedPassthrough()); + List columns = Arrays.asList(unnestNode.getDataSchema().getColumnNames()); + Assert.assertFalse(columns.contains("mcol1"), "Source array should be pruned: " + columns); + // Retained passthrough = [col1] (1), element at index 1, ordinality at index 2. + Assert.assertEquals(unnestNode.getPassthroughInputIndexes(), List.of(0)); + Assert.assertEquals(unnestNode.getElementIndexes(), List.of(1)); + Assert.assertEquals(unnestNode.getOrdinalityIndex(), 2); + Assert.assertEquals(unnestNode.getDataSchema().size(), 3); + } + + @Test + public void testUnnestColumnPruningMultipleArrays() { + // Both source arrays must be dropped; element indexes recompute contiguously after the retained passthrough. + String sql = "SET unnestColumnPruning=true; " + + "SELECT e.col1, u.longVal, u.stringVal FROM e CROSS JOIN UNNEST(e.mcol2, e.mcol1) AS u(longVal, stringVal)"; + DispatchableSubPlan subPlan = _queryEnvironment.planQuery(sql); + List unnestNodes = findUnnestNodes(subPlan); + Assert.assertEquals(unnestNodes.size(), 1); + UnnestNode unnestNode = unnestNodes.get(0); + + Assert.assertTrue(unnestNode.isPrunedPassthrough()); + List columns = Arrays.asList(unnestNode.getDataSchema().getColumnNames()); + Assert.assertFalse(columns.contains("mcol1"), columns.toString()); + Assert.assertFalse(columns.contains("mcol2"), columns.toString()); + Assert.assertTrue(columns.contains("col1"), columns.toString()); + Assert.assertEquals(unnestNode.getPassthroughInputIndexes(), List.of(0)); + Assert.assertEquals(unnestNode.getElementIndexes(), List.of(1, 2)); + Assert.assertEquals(unnestNode.getDataSchema().size(), 3); + } + + @Test + public void testUnnestColumnPruningToZeroPassthrough() { + // Selecting only the unnested element retains zero passthrough columns. + String sql = "SET unnestColumnPruning=true; " + + "SELECT u.s FROM e CROSS JOIN UNNEST(e.mcol1) AS u(s)"; + DispatchableSubPlan subPlan = _queryEnvironment.planQuery(sql); + List unnestNodes = findUnnestNodes(subPlan); + Assert.assertEquals(unnestNodes.size(), 1); + UnnestNode unnestNode = unnestNodes.get(0); + + Assert.assertTrue(unnestNode.isPrunedPassthrough()); + Assert.assertTrue(unnestNode.getPassthroughInputIndexes().isEmpty()); + Assert.assertEquals(unnestNode.getElementIndexes(), List.of(0)); + Assert.assertEquals(unnestNode.getDataSchema().size(), 1); + } + @Test public void testAggregateWithOrdinality() { String sql = "SELECT SUM(w.ord) FROM e CROSS JOIN UNNEST(e.mcol1) WITH ORDINALITY AS w(s, ord)"; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java index 0594e82a72c5..367a565cac9b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java @@ -50,6 +50,8 @@ public class UnnestOperator extends MultiStageOperator { private final boolean _withOrdinality; private final List _elementIndexes; private final int _ordinalityIndex; + private final int[] _passthroughInputIndexes; + private final boolean _prunedPassthrough; private final StatMap _statMap = new StatMap<>(StatKey.class); private boolean _loggedElementOverflow; @@ -66,6 +68,13 @@ public UnnestOperator(OpChainExecutionContext context, MultiStageOperator input, _withOrdinality = node.isWithOrdinality(); _elementIndexes = node.getElementIndexes(); _ordinalityIndex = node.getOrdinalityIndex(); + // Resolve to a primitive array once so the per-row hot path avoids List.get + unboxing. + List passthroughInputIndexes = node.getPassthroughInputIndexes(); + _passthroughInputIndexes = new int[passthroughInputIndexes.size()]; + for (int i = 0; i < _passthroughInputIndexes.length; i++) { + _passthroughInputIndexes[i] = passthroughInputIndexes.get(i); + } + _prunedPassthrough = node.isPrunedPassthrough(); } @Override @@ -226,10 +235,21 @@ private void alignArraysByIndex(Object[] inputRow, List> arrays, Li private Object[] appendElements(Object[] inputRow, List elements, int ordinality) { int outSize = _resultSchema.size(); Object[] out = new Object[outSize]; - // Copy left columns at beginning - System.arraycopy(inputRow, 0, out, 0, inputRow.length); - // Determine positions for elements. Track next free slot after the copied input row. - int base = inputRow.length; + // Copy passthrough (left) columns at the beginning of the output row. + int base; + if (_prunedPassthrough) { + // Only the input columns referenced downstream are retained, in output order. + int numPassthrough = _passthroughInputIndexes.length; + for (int o = 0; o < numPassthrough; o++) { + out[o] = inputRow[_passthroughInputIndexes[o]]; + } + base = numPassthrough; + } else { + // Legacy behavior: copy the whole input row (including the unnested source array). + System.arraycopy(inputRow, 0, out, 0, inputRow.length); + base = inputRow.length; + } + // Determine positions for elements. Track next free slot after the copied passthrough columns. int nextFreePos = base; for (int i = 0; i < elements.size(); i++) { int elemPos = -1; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnnestOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnnestOperatorTest.java index 124e7f0cb42f..b3d32ffc397b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnnestOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnnestOperatorTest.java @@ -165,6 +165,119 @@ public void shouldRespectExplicitElementAndOrdinalPositions() { assertEquals(rows.get(1)[3], "y"); } + @Test + public void shouldPrunePassthroughColumnsWhenSourceArrayDropped() { + // Input keeps the source array, but the pruned output schema drops it: only "id" passes through. + DataSchema inputSchema = new DataSchema(new String[]{"id", "arr"}, new ColumnDataType[]{ + ColumnDataType.INT, ColumnDataType.INT_ARRAY + }); + when(_input.nextBlock()).thenReturn( + OperatorTestUtil.block(inputSchema, + new Object[]{1, new int[]{10, 20}}, + new Object[]{2, new int[]{30}})); + + DataSchema resultSchema = new DataSchema(new String[]{"id", "elem"}, new ColumnDataType[]{ + ColumnDataType.INT, ColumnDataType.INT + }); + RexExpression arrayExpr = new RexExpression.InputRef(1); + // Passthrough only input column 0 ("id") to output 0; element lands at output 1. prunedPassthrough = true. + UnnestNode.TableFunctionContext context = + new UnnestNode.TableFunctionContext(false, List.of(1), UnnestNode.UNSPECIFIED_INDEX, List.of(0), true); + UnnestNode node = new UnnestNode(-1, resultSchema, PlanNode.NodeHint.EMPTY, List.of(), List.of(arrayExpr), context); + UnnestOperator operator = new UnnestOperator(OperatorTestUtil.getTracingContext(), _input, inputSchema, node); + + List rows = ((MseBlock.Data) operator.nextBlock()).asRowHeap().getRows(); + assertEquals(rows.size(), 3); + for (Object[] row : rows) { + assertEquals(row.length, 2); // source array is not carried + } + assertEquals(rows.get(0)[0], 1); + assertEquals(rows.get(0)[1], 10); + assertEquals(rows.get(1)[0], 1); + assertEquals(rows.get(1)[1], 20); + assertEquals(rows.get(2)[0], 2); + assertEquals(rows.get(2)[1], 30); + } + + @Test + public void shouldPrunePassthroughToZeroColumns() { + // SELECT only the unnested element: no passthrough columns retained. + DataSchema inputSchema = new DataSchema(new String[]{"id", "arr"}, new ColumnDataType[]{ + ColumnDataType.INT, ColumnDataType.INT_ARRAY + }); + when(_input.nextBlock()).thenReturn( + OperatorTestUtil.block(inputSchema, new Object[]{1, new int[]{10, 20}})); + + DataSchema resultSchema = new DataSchema(new String[]{"elem"}, new ColumnDataType[]{ColumnDataType.INT}); + RexExpression arrayExpr = new RexExpression.InputRef(1); + UnnestNode.TableFunctionContext context = + new UnnestNode.TableFunctionContext(false, List.of(0), UnnestNode.UNSPECIFIED_INDEX, List.of(), true); + UnnestNode node = new UnnestNode(-1, resultSchema, PlanNode.NodeHint.EMPTY, List.of(), List.of(arrayExpr), context); + UnnestOperator operator = new UnnestOperator(OperatorTestUtil.getTracingContext(), _input, inputSchema, node); + + List rows = ((MseBlock.Data) operator.nextBlock()).asRowHeap().getRows(); + assertEquals(rows.size(), 2); + assertEquals(rows.get(0).length, 1); + assertEquals(rows.get(0)[0], 10); + assertEquals(rows.get(1)[0], 20); + } + + @Test + public void shouldPrunePassthroughWithOrdinality() { + DataSchema inputSchema = new DataSchema(new String[]{"id", "arr"}, new ColumnDataType[]{ + ColumnDataType.INT, ColumnDataType.INT_ARRAY + }); + when(_input.nextBlock()).thenReturn( + OperatorTestUtil.block(inputSchema, new Object[]{1, new int[]{5, 6}})); + + // Pruned output [id, elem, ord]; source array dropped. element at 1, ordinality at 2. + DataSchema resultSchema = new DataSchema(new String[]{"id", "elem", "ord"}, new ColumnDataType[]{ + ColumnDataType.INT, ColumnDataType.INT, ColumnDataType.INT + }); + UnnestNode.TableFunctionContext context = + new UnnestNode.TableFunctionContext(true, List.of(1), 2, List.of(0), true); + UnnestNode node = new UnnestNode(-1, resultSchema, PlanNode.NodeHint.EMPTY, List.of(), + List.of(new RexExpression.InputRef(1)), context); + UnnestOperator operator = new UnnestOperator(OperatorTestUtil.getTracingContext(), _input, inputSchema, node); + + List rows = ((MseBlock.Data) operator.nextBlock()).asRowHeap().getRows(); + assertEquals(rows.size(), 2); + assertEquals(rows.get(0).length, 3); + assertEquals(rows.get(0)[0], 1); + assertEquals(rows.get(0)[1], 5); + assertEquals(rows.get(0)[2], 1); + assertEquals(rows.get(1)[1], 6); + assertEquals(rows.get(1)[2], 2); + } + + @Test + public void shouldPrunePassthroughWithMultipleArrays() { + DataSchema inputSchema = new DataSchema(new String[]{"id", "a1", "a2"}, new ColumnDataType[]{ + ColumnDataType.INT, ColumnDataType.INT_ARRAY, ColumnDataType.STRING_ARRAY + }); + when(_input.nextBlock()).thenReturn( + OperatorTestUtil.block(inputSchema, new Object[]{10, new int[]{1, 2}, new String[]{"x", "y"}})); + + // Pruned output [id, v1, v2]; both source arrays dropped. + DataSchema resultSchema = new DataSchema(new String[]{"id", "v1", "v2"}, new ColumnDataType[]{ + ColumnDataType.INT, ColumnDataType.INT, ColumnDataType.STRING + }); + UnnestNode.TableFunctionContext context = + new UnnestNode.TableFunctionContext(false, List.of(1, 2), UnnestNode.UNSPECIFIED_INDEX, List.of(0), true); + UnnestNode node = new UnnestNode(-1, resultSchema, PlanNode.NodeHint.EMPTY, List.of(), + List.of(new RexExpression.InputRef(1), new RexExpression.InputRef(2)), context); + UnnestOperator operator = new UnnestOperator(OperatorTestUtil.getTracingContext(), _input, inputSchema, node); + + List rows = ((MseBlock.Data) operator.nextBlock()).asRowHeap().getRows(); + assertEquals(rows.size(), 2); + assertEquals(rows.get(0).length, 3); + assertEquals(rows.get(0)[0], 10); + assertEquals(rows.get(0)[1], 1); + assertEquals(rows.get(0)[2], "x"); + assertEquals(rows.get(1)[1], 2); + assertEquals(rows.get(1)[2], "y"); + } + @Test public void shouldZipMultipleArraysIntoColumns() { DataSchema inputSchema = new DataSchema(new String[]{"id", "a1", "a2"}, new ColumnDataType[]{ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c93b7d1087ef..173f2059a646 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -812,6 +812,11 @@ public static class QueryOptionKey { public static final String APPLICATION_NAME = "applicationName"; public static final String USE_SPOOLS = "useSpools"; public static final String USE_PHYSICAL_OPTIMIZER = "usePhysicalOptimizer"; + // When true, the multi-stage planner prunes input (passthrough) columns - notably the unnested source array - + // from the UNNEST output when they are not referenced downstream, avoiding copying them into every exploded + // row. Defaults to false: enabling it makes the broker emit a smaller UNNEST output schema, which an + // un-upgraded server cannot honor, so only enable it once all servers support it. + public static final String UNNEST_COLUMN_PRUNING = "unnestColumnPruning"; /** * When set to true, the broker uses the long-lived {@code SubmitWithStream} bidi RPC to dispatch the query, * receiving stage stats out-of-band as {@code OpChainComplete} messages instead of via mailbox EOS. The