Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ message UnnestNode {
repeated int32 elementIndexes = 3;
// Optional absolute output index of the ordinality column in the stage schema.
optional int32 ordinalityIndex = 4;
// When prunedPassthrough is true, the input-row column indexes copied into the output, in output order.
// When absent/empty (old brokers), the operator copies the whole input row (legacy behavior).
repeated int32 passthroughInputIndexes = 5;
// Whether the output schema was pruned to only the input columns referenced downstream. Absent => false => legacy.
bool prunedPassthrough = 6;
}

enum WindowFrameType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -303,6 +304,43 @@ public void testOrdinalityAliasVariation(boolean useMultiStageQueryEngine)
assertEquals(sum, 6 * getCountStarResult());
}

@DataProvider(name = "columnPruningQueries")
public Object[][] columnPruningQueries() {
String table = getTableName();
return new Object[][]{
// single array, source array dropped
{String.format("SELECT intCol, u.elem FROM %s CROSS JOIN UNNEST(stringArrayCol) AS u(elem) "
+ "ORDER BY intCol, u.elem", table)},
// select only the unnested element (zero passthrough)
{String.format("SELECT u.elem FROM %s CROSS JOIN UNNEST(stringArrayCol) AS u(elem) ORDER BY u.elem", table)},
// WITH ORDINALITY (ordinality index recomputed against pruned output)
{String.format("SELECT intCol, u.elem, u.idx FROM %s CROSS JOIN UNNEST(stringArrayCol) WITH ORDINALITY "
+ "AS u(elem, idx) ORDER BY intCol, u.idx", table)},
// multiple arrays, both source arrays dropped
{String.format("SELECT intCol, u.longValue, u.stringValue FROM %s "
+ "CROSS JOIN UNNEST(longArrayCol, stringArrayCol) AS u(longValue, stringValue) "
+ "ORDER BY intCol, u.longValue", table)},
// source array also selected (must be retained)
{String.format("SELECT intCol, stringArrayCol, u.elem FROM %s CROSS JOIN UNNEST(stringArrayCol) AS u(elem) "
+ "ORDER BY intCol, u.elem", table)}
};
}

@Test(dataProvider = "columnPruningQueries")
public void testCrossJoinUnnestColumnPruningMatchesDefault(String select)
throws Exception {
// Pruning input/passthrough columns from the UNNEST output must not change results. Verified end-to-end on the
// multi-stage engine (the only engine that supports UNNEST CROSS JOIN), with the flag toggled per query.
setUseMultiStageQueryEngine(true);
JsonNode defaultRows = postQuery(select).get("resultTable").get("rows");
JsonNode prunedRows = postQuery("SET unnestColumnPruning=true; " + select).get("resultTable").get("rows");

assertNotNull(defaultRows);
assertNotNull(prunedRows);
assertEquals(prunedRows.toString(), defaultRows.toString(),
"Column pruning must not change UNNEST results for: " + select);
}

@Override
public String getTableName() {
return DEFAULT_TABLE_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex
return pinotDispatchPlanner.createDispatchableSubPlanV2(plan.getLeft(), plan.getRight());
}
SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker, useSpools(plannerContext.getOptions()),
_envConfig.defaultHashFunction());
_envConfig.defaultHashFunction(), pruneUnnestColumns(plannerContext.getOptions()));
PinotDispatchPlanner pinotDispatchPlanner =
new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), _envConfig.getRequestId(),
_envConfig.getTableCache());
Expand Down Expand Up @@ -702,6 +702,16 @@ public boolean useSpools(Map<String, String> options) {
return Boolean.parseBoolean(optionValue);
}

/**
* Whether to prune unused input (passthrough) columns from the UNNEST output. Defaults to {@code false} because a
* broker emitting the smaller schema cannot be honored by an un-upgraded server; enable only once all servers
* support it (see {@link CommonConstants.Broker.Request.QueryOptionKey#UNNEST_COLUMN_PRUNING}).
*/
public boolean pruneUnnestColumns(Map<String, String> options) {
String optionValue = options.get(CommonConstants.Broker.Request.QueryOptionKey.UNNEST_COLUMN_PRUNING);
return Boolean.parseBoolean(optionValue);
}

@Value.Immutable
public interface Config {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.spi.utils.CommonConstants;


/**
Expand All @@ -61,8 +62,9 @@ private PinotLogicalQueryPlanner() {
*/
public static SubPlan makePlan(RelRoot relRoot,
@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker, boolean useSpools,
String hashFunction) {
PlanNode rootNode = new RelToPlanNodeConverter(tracker, hashFunction).toPlanNode(relRoot.rel);
String hashFunction, boolean pruneUnnestColumns) {
PlanNode rootNode = new RelToPlanNodeConverter(tracker, hashFunction,
!CommonConstants.Helix.DEFAULT_ENABLE_CASE_INSENSITIVE, pruneUnnestColumns).toPlanNode(relRoot.rel);

PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker, useSpools, hashFunction);
return new SubPlan(rootFragment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -111,6 +112,8 @@ public final class RelToPlanNodeConverter {
private final TransformationTracker.Builder<PlanNode, RelNode> _tracker;
private final String _hashFunction;
private final boolean _caseSensitive;
// When true, UNNEST output is pruned to drop input (passthrough) columns not referenced downstream. Default off.
private final boolean _pruneUnnestColumns;

public RelToPlanNodeConverter(@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker,
String hashFunction) {
Expand All @@ -119,9 +122,15 @@ public RelToPlanNodeConverter(@Nullable TransformationTracker.Builder<PlanNode,

public RelToPlanNodeConverter(@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker,
String hashFunction, boolean caseSensitive) {
this(tracker, hashFunction, caseSensitive, false);
}

public RelToPlanNodeConverter(@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker,
String hashFunction, boolean caseSensitive, boolean pruneUnnestColumns) {
_tracker = tracker;
_hashFunction = hashFunction;
_caseSensitive = caseSensitive;
_pruneUnnestColumns = pruneUnnestColumns;
}

/**
Expand Down Expand Up @@ -227,6 +236,9 @@ private UnnestNode convertLogicalUncollect(Uncollect node) {
convertInputs(node.getInputs()), arrayExprs, tableFunctionContext);
}

// NOTE: Besides the main dispatch, this is also invoked from tryPruneUnnestPassthrough. Its result is always used
// (either directly or rewritten into a pruned node), so it must remain free of non-idempotent side effects beyond
// converting/tracking its own subtree.
private BasePlanNode convertLogicalCorrelate(LogicalCorrelate node) {
// Pattern: Correlate(left, Uncollect(Project(correlatedFields...)))
RelNode right = node.getRight();
Expand Down Expand Up @@ -709,10 +721,193 @@ private AggregateNode convertLogicalAggregate(PinotLogicalAggregate node) {
}

private ProjectNode convertLogicalProject(LogicalProject node) {
if (_pruneUnnestColumns) {
ProjectNode pruned = tryPruneUnnestPassthrough(node);
if (pruned != null) {
return pruned;
}
}
return new ProjectNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
convertInputs(node.getInputs()), RexExpressionUtils.fromRexNodes(node.getProjects()));
}

/**
* When a Project sits directly above a CROSS JOIN UNNEST (a {@link LogicalCorrelate} over {@link Uncollect}), prunes
* the input (passthrough) columns that the Project does not reference - notably the unnested source array - from the
* {@link UnnestNode} output, so the operator does not copy them into every exploded row. The Project's own
* {@code InputRef}s are remapped from the full correlate-output index space to the pruned space.
* <p>
* Returns {@code null} (caller falls back to the default conversion, preserving all passthrough columns) when the
* pattern is not recognized, the layout is not the standard one, or pruning would be a no-op.
*/
@Nullable
private ProjectNode tryPruneUnnestPassthrough(LogicalProject node) {
List<RelNode> inputs = node.getInputs();
if (inputs.size() != 1 || !(inputs.get(0) instanceof LogicalCorrelate)) {
return null;
}
LogicalCorrelate correlate = (LogicalCorrelate) inputs.get(0);
RelNode right = correlate.getRight();
// Only handle the UNNEST pattern, and only when no correlate-filter wraps the Uncollect. With a wrapping filter,
// convertLogicalCorrelate emits a FilterNode, so the Project no longer sits directly on the UnnestNode.
if (findUncollect(right) == null || findCorrelateFilter(right) != null) {
// Cheap structural check failed before any conversion: let the caller fall back to the default conversion.
return null;
}
// Convert the correlate exactly once. From here we always return a ProjectNode wrapping this converted input, so
// the caller never re-runs the (side-effecting, tracker-registering) conversion of the left subtree.
BasePlanNode converted = convertLogicalCorrelate(correlate);
if (converted instanceof UnnestNode) {
ProjectNode pruned = buildPrunedProject(node, (UnnestNode) converted, correlate);
if (pruned != null) {
return pruned;
}
}
// Not prunable (unexpected layout, no-op, or non-UnnestNode): wrap the already-converted input in a default Project.
if (_tracker != null) {
_tracker.trackCreation(correlate, converted);
}
return new ProjectNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
new ArrayList<>(List.of(converted)), RexExpressionUtils.fromRexNodes(node.getProjects()));
}

/**
* Builds the pruned Project-over-UnnestNode for the standard CROSS JOIN UNNEST layout, or returns {@code null} when
* the layout is non-standard or pruning would be a no-op (all passthrough columns are referenced downstream). On
* success it registers the new UnnestNode with the tracker against {@code correlate}.
*/
@Nullable
private ProjectNode buildPrunedProject(LogicalProject node, UnnestNode full, RelNode correlate) {
if (full.isPrunedPassthrough()) {
return null;
}
DataSchema fullSchema = full.getDataSchema();
int outSize = fullSchema.size();
int numArrays = full.getArrayExprs().size();
boolean withOrdinality = full.isWithOrdinality();
int leftCount = outSize - numArrays - (withOrdinality ? 1 : 0);
if (leftCount <= 0) {
// No passthrough columns to prune.
return null;
}
// Only handle the clean layout produced by the standard path: passthrough columns occupy [0, leftCount) and the
// element (then ordinality) columns occupy the trailing region. Fall back otherwise.
List<Integer> elementIndexes = full.getElementIndexes();
if (elementIndexes.size() != numArrays) {
return null;
}
for (int k = 0; k < numArrays; k++) {
if (elementIndexes.get(k) != leftCount + k) {
return null;
}
}
if (withOrdinality && full.getOrdinalityIndex() != leftCount + numArrays) {
return null;
}
// Collect referenced passthrough (left) columns from the project expressions.
List<RexExpression> projects = RexExpressionUtils.fromRexNodes(node.getProjects());
boolean[] referenced = new boolean[leftCount];
for (RexExpression project : projects) {
collectReferencedColumns(project, leftCount, referenced);
}
List<Integer> retained = new ArrayList<>();
for (int i = 0; i < leftCount; i++) {
if (referenced[i]) {
retained.add(i);
}
}
if (retained.size() == leftCount) {
// All passthrough columns are used downstream; pruning would be a no-op.
return null;
}
int numRetained = retained.size();
// Build the old (full correlate output) -> new (pruned output) index map.
int[] oldToNew = new int[outSize];
Arrays.fill(oldToNew, -1);
for (int i = 0; i < numRetained; i++) {
oldToNew[retained.get(i)] = i;
}
for (int k = 0; k < numArrays; k++) {
oldToNew[leftCount + k] = numRetained + k;
}
if (withOrdinality) {
oldToNew[leftCount + numArrays] = numRetained + numArrays;
}
// Build the pruned output schema: retained passthrough columns, then element columns, then ordinality.
int newSize = numRetained + numArrays + (withOrdinality ? 1 : 0);
String[] columnNames = new String[newSize];
ColumnDataType[] columnTypes = new ColumnDataType[newSize];
for (int i = 0; i < numRetained; i++) {
int oldIdx = retained.get(i);
columnNames[i] = fullSchema.getColumnName(oldIdx);
columnTypes[i] = fullSchema.getColumnDataType(oldIdx);
}
for (int k = 0; k < numArrays; k++) {
columnNames[numRetained + k] = fullSchema.getColumnName(leftCount + k);
columnTypes[numRetained + k] = fullSchema.getColumnDataType(leftCount + k);
}
if (withOrdinality) {
columnNames[numRetained + numArrays] = fullSchema.getColumnName(leftCount + numArrays);
columnTypes[numRetained + numArrays] = fullSchema.getColumnDataType(leftCount + numArrays);
}
DataSchema prunedSchema = new DataSchema(columnNames, columnTypes);
// Recompute element/ordinality indexes against the pruned (smaller) output.
List<Integer> newElementIndexes = new ArrayList<>(numArrays);
for (int k = 0; k < numArrays; k++) {
newElementIndexes.add(numRetained + k);
}
int newOrdinalityIndex = withOrdinality ? numRetained + numArrays : UnnestNode.UNSPECIFIED_INDEX;
UnnestNode.TableFunctionContext context = new UnnestNode.TableFunctionContext(withOrdinality, newElementIndexes,
newOrdinalityIndex, retained, true);
UnnestNode prunedUnnest = new UnnestNode(DEFAULT_STAGE_ID, prunedSchema, full.getNodeHint(), full.getInputs(),
full.getArrayExprs(), context);
if (_tracker != null) {
_tracker.trackCreation(correlate, prunedUnnest);
}
// Remap the project's input refs from the full correlate-output space to the pruned space.
List<RexExpression> remappedProjects = new ArrayList<>(projects.size());
for (RexExpression project : projects) {
remappedProjects.add(remapInputRefs(project, oldToNew));
}
return new ProjectNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
new ArrayList<>(List.of(prunedUnnest)), remappedProjects);
}

private static void collectReferencedColumns(RexExpression expr, int leftCount, boolean[] referenced) {
if (expr instanceof RexExpression.InputRef) {
int idx = ((RexExpression.InputRef) expr).getIndex();
if (idx >= 0 && idx < leftCount) {
referenced[idx] = true;
}
} else if (expr instanceof RexExpression.FunctionCall) {
for (RexExpression op : ((RexExpression.FunctionCall) expr).getFunctionOperands()) {
collectReferencedColumns(op, leftCount, referenced);
}
}
}

private static RexExpression remapInputRefs(RexExpression expr, int[] oldToNew) {
if (expr instanceof RexExpression.InputRef) {
int idx = ((RexExpression.InputRef) expr).getIndex();
int mapped = (idx >= 0 && idx < oldToNew.length) ? oldToNew[idx] : -1;
// mapped is always >= 0 here: passthrough columns referenced by the project were retained, and element/
// ordinality columns are always mapped. Guard defensively to avoid silently corrupting indexes.
Preconditions.checkState(mapped >= 0, "Unexpected unmapped InputRef index %s while pruning UNNEST passthrough",
idx);
return new RexExpression.InputRef(mapped);
} else if (expr instanceof RexExpression.FunctionCall) {
RexExpression.FunctionCall fc = (RexExpression.FunctionCall) expr;
List<RexExpression> ops = fc.getFunctionOperands();
List<RexExpression> rewritten = new ArrayList<>(ops.size());
for (RexExpression op : ops) {
rewritten.add(remapInputRefs(op, oldToNew));
}
return new RexExpression.FunctionCall(fc.getDataType(), fc.getFunctionName(), rewritten);
} else {
return expr;
}
}

private FilterNode convertLogicalFilter(LogicalFilter node) {
return new FilterNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
convertInputs(node.getInputs()), RexExpressionUtils.fromRexNode(node.getCondition()));
Expand Down
Loading
Loading