From ff27baebb72b589aceee2e6f1d72c25d45d83274 Mon Sep 17 00:00:00 2001 From: "anurag.rai" Date: Wed, 10 Jun 2026 04:50:09 +0000 Subject: [PATCH 1/2] Surface user-facing warnings when MSE Lite silently truncates query results When a query runs under MSE Lite without an explicit LIMIT, the planner silently inserts a PhysicalSort at the leaf-stage boundary to cap rows per server. Users get incomplete results with no indication. This change adds three new fields to BrokerResponseNativeV2 (mseLiteLeafStageLimitReached, mseLiteLeafStageEffectiveLimit, mseLiteFanOutAdjustedLimitApplied) that signal when the implicit limit was binding. Detection happens at execution time on each server via a new DataTable.MetadataKey, propagated through LeafOperator.StatKey to the broker response. Both StreamingInstanceResponseOperator (selection queries) and InstanceResponseOperator (aggregation queries) are instrumented. --- .../MultiStageBrokerRequestHandler.java | 15 +++ .../pinot/common/datatable/DataTable.java | 5 +- .../broker/BrokerResponseNativeV2.java | 41 +++++- .../utils/config/QueryOptionsUtils.java | 6 + .../BrokerResponseNativeV2LiteModeTest.java | 90 +++++++++++++ .../utils/config/QueryOptionsUtilsTest.java | 16 +++ .../operator/InstanceResponseOperator.java | 8 ++ .../StreamingInstanceResponseOperator.java | 15 ++- .../InstanceResponseOperatorTest.java | 127 ++++++++++++++++++ .../apache/pinot/query/QueryEnvironment.java | 25 +++- .../query/context/PhysicalPlannerContext.java | 15 +++ .../v2/opt/rules/LiteModeSortInsertRule.java | 5 + .../query/runtime/operator/LeafOperator.java | 4 + .../runtime/operator/LeafOperatorTest.java | 59 ++++++++ .../pinot/spi/utils/CommonConstants.java | 1 + 15 files changed, 424 insertions(+), 8 deletions(-) create mode 100644 pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 33651babb7f5..1e6626eaaa9e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -650,6 +650,12 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan(); + // Inject the implicit leaf-stage limit as a query option so servers can detect truncation + if (queryPlanResult.isLiteModeImplicitSortApplied()) { + query.getOptions().put(CommonConstants.Broker.Request.QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, + String.valueOf(queryPlanResult.getLiteModeEffectiveSortLimit())); + } + // Optionally set ignoreMissingSegments query option based on broker config if not already set. if (_config.getProperty(CommonConstants.Broker.CONFIG_OF_IGNORE_MISSING_SEGMENTS, CommonConstants.Broker.DEFAULT_IGNORE_MISSING_SEGMENTS)) { @@ -829,6 +835,15 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI } } + // Set MSE Lite planning-time warning fields + if (queryPlanResult.isLiteModeImplicitSortApplied()) { + int effectiveLimit = queryPlanResult.getLiteModeEffectiveSortLimit(); + brokerResponse.setMseLiteLeafStageEffectiveLimit(effectiveLimit); + brokerResponse.setMseLiteFanOutAdjustedLimitApplied( + effectiveLimit != QueryOptionsUtils.getLiteModeLeafStageLimit(query.getOptions(), + CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT)); + } + long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis(); _brokerMetrics.addTimedValue(BrokerTimer.MULTI_STAGE_QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index ddba67d92df5..afb6665e30f4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -161,11 +161,12 @@ enum MetadataKey { // the merge (e.g., due to a schema conflict), so the merge ran over a strict subset of the // inputs. How a downstream consumer reacts (skip, retry, accept with annotation) is the // consumer's policy. - INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING); + INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING), + LITE_MODE_LEAF_STAGE_LIMIT_REACHED(44, "liteModeLeafStageLimitReached", MetadataValueType.STRING); // We keep this constant to track the max id added so far for backward compatibility. // Increase it when adding new keys, but NEVER DECREASE IT!!! - private static final int MAX_ID = INCOMPLETE_MERGE.getId(); + private static final int MAX_ID = LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getId(); private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1]; private static final Map NAME_TO_ENUM_KEY_MAP = new HashMap<>(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index c9927c537c7d..3012c67831d0 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -52,7 +52,8 @@ "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried", "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes", "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes", - "pools", "rlsFiltersApplied", "groupsTrimmed" + "pools", "rlsFiltersApplied", "groupsTrimmed", + "mseLiteLeafStageLimitReached", "mseLiteLeafStageEffectiveLimit", "mseLiteFanOutAdjustedLimitApplied" }) public class BrokerResponseNativeV2 implements BrokerResponse { private final StatMap _brokerStats = new StatMap<>(StatKey.class); @@ -94,6 +95,10 @@ public class BrokerResponseNativeV2 implements BrokerResponse { private Set _pools = Set.of(); private boolean _rlsFiltersApplied = false; + @Nullable + private Integer _mseLiteLeafStageEffectiveLimit; + @Nullable + private Boolean _mseLiteFanOutAdjustedLimitApplied; @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @@ -121,7 +126,7 @@ public int getNumRowsResultSet() { @Override public boolean isPartialResult() { return getExceptionsSize() > 0 || isNumGroupsLimitReached() || !getEarlyTerminationReasons().isEmpty() - || isMaxRowsInJoinReached(); + || isMaxRowsInJoinReached() || isMseLiteLeafStageLimitReached(); } @Override @@ -172,6 +177,35 @@ public void mergeNumGroupsWarningLimitReached(boolean numGroupsWarningLimitReach _brokerStats.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, numGroupsWarningLimitReached); } + public boolean isMseLiteLeafStageLimitReached() { + return _brokerStats.getBoolean(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED); + } + + public void mergeMseLiteLeafStageLimitReached(boolean mseLiteLeafStageLimitReached) { + _brokerStats.merge(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED, mseLiteLeafStageLimitReached); + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public Integer getMseLiteLeafStageEffectiveLimit() { + return _mseLiteLeafStageEffectiveLimit; + } + + public void setMseLiteLeafStageEffectiveLimit(int mseLiteLeafStageEffectiveLimit) { + _mseLiteLeafStageEffectiveLimit = mseLiteLeafStageEffectiveLimit; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("mseLiteFanOutAdjustedLimitApplied") + @Nullable + public Boolean getMseLiteFanOutAdjustedLimitApplied() { + return _mseLiteFanOutAdjustedLimitApplied; + } + + public void setMseLiteFanOutAdjustedLimitApplied(boolean mseLiteFanOutAdjustedLimitApplied) { + _mseLiteFanOutAdjustedLimitApplied = mseLiteFanOutAdjustedLimitApplied; + } + @JsonInclude(JsonInclude.Include.NON_EMPTY) public List getEarlyTerminationReasons() { return List.copyOf(_brokerStats.getStringSet(StatKey.EARLY_TERMINATION_REASONS)); @@ -515,7 +549,8 @@ public long merge(long value1, long value2) { return Math.max(value1, value2); } }, - EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET); + EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET), + LITE_MODE_LEAF_STAGE_LIMIT_REACHED(StatMap.Type.BOOLEAN); private final StatMap.Type _type; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 3134d97f4bfa..a9bbda0ee4c4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -587,6 +587,12 @@ public static Integer getLiteModeLeafStageFanOutAdjustedLimit(Map queryOptions) { + String val = queryOptions.get(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT); + return val != null ? Integer.parseInt(val) : null; + } + @Nullable private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) { if (optionValue == null) { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java new file mode 100644 index 000000000000..56579cd129a0 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.broker; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class BrokerResponseNativeV2LiteModeTest { + + @Test + public void testLiteModeLeafStageLimitReachedMarksPartialResult() { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + assertFalse(brokerResponse.isMseLiteLeafStageLimitReached()); + assertFalse(brokerResponse.isPartialResult()); + + brokerResponse.mergeMseLiteLeafStageLimitReached(true); + + assertTrue(brokerResponse.isMseLiteLeafStageLimitReached()); + assertTrue(brokerResponse.isPartialResult()); + } + + @Test + public void testLiteModeLeafStageLimitReachedViaStatMap() { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + brokerResponse.addBrokerStats(new StatMap<>(BrokerResponseNativeV2.StatKey.class) + .merge(BrokerResponseNativeV2.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED, true)); + + assertTrue(brokerResponse.isMseLiteLeafStageLimitReached()); + assertTrue(brokerResponse.isPartialResult()); + } + + @Test + public void testLiteModeFieldsInJsonSerialization() + throws Exception { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + brokerResponse.mergeMseLiteLeafStageLimitReached(true); + brokerResponse.setMseLiteLeafStageEffectiveLimit(100); + brokerResponse.setMseLiteFanOutAdjustedLimitApplied(true); + + JsonNode json = JsonUtils.objectToJsonNode(brokerResponse); + + assertTrue(json.path("mseLiteLeafStageLimitReached").asBoolean(false)); + assertEquals(json.path("mseLiteLeafStageEffectiveLimit").asInt(), 100); + assertTrue(json.path("mseLiteFanOutAdjustedLimitApplied").asBoolean(false)); + assertTrue(json.path("partialResult").asBoolean(false)); + } + + @Test + public void testLiteModeFieldsNotSerializedWhenDefault() + throws Exception { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + + JsonNode json = JsonUtils.objectToJsonNode(brokerResponse); + + assertFalse(json.has("mseLiteLeafStageEffectiveLimit")); + assertFalse(json.has("mseLiteFanOutAdjustedLimitApplied")); + } + + @Test + public void testPartialResultNotSetWhenLimitNotReached() { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + brokerResponse.setMseLiteLeafStageEffectiveLimit(100); + + assertFalse(brokerResponse.isMseLiteLeafStageLimitReached()); + assertFalse(brokerResponse.isPartialResult()); + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java index 8ccc0b1232e7..5ece39b66cb2 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java @@ -343,4 +343,20 @@ private static Object getValue(Map map, String key) { throw new IllegalArgumentException("Unexpected key!"); } } + + @Test + public void testGetLiteModeImplicitLeafStageLimit() { + Map queryOptions = new HashMap<>(); + + // Absent → null + assertNull(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions)); + + // Present → parsed value + queryOptions.put(LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "42"); + assertEquals(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions), Integer.valueOf(42)); + + // Zero + queryOptions.put(LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "0"); + assertEquals(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions), Integer.valueOf(0)); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java index 5a8927fb4e3c..8feff1ffbdf3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; @@ -96,6 +97,13 @@ protected InstanceResponseBlock buildInstanceResponseBlock(BaseResultsBlock base String.valueOf(_threadMemAllocatedBytes)); instanceResponseBlock.addMetadata(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), String.valueOf(_systemActivitiesCpuTimeNs)); + Integer implicitLimit = QueryOptionsUtils.getLiteModeImplicitLeafStageLimit( + _queryContext.getQueryOptions()); + // false-positive when table has exactly implicitLimit rows + if (implicitLimit != null && baseResultsBlock.getNumRows() >= implicitLimit) { + instanceResponseBlock.addMetadata( + MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), "true"); + } return instanceResponseBlock; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java index 857c853bcd03..aca7c7ab93b4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java @@ -19,6 +19,8 @@ package org.apache.pinot.core.operator.streaming; import java.util.List; +import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.operator.InstanceResponseOperator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; @@ -66,18 +68,22 @@ protected InstanceResponseBlock getNextBlock() { prefetchAll(); if (_streamingCombineOperator != null) { _streamingCombineOperator.start(); + long totalRowsStreamed = 0; BaseResultsBlock resultsBlock = getBaseBlock(); while (!(resultsBlock instanceof MetadataResultsBlock)) { if (resultsBlock instanceof ExceptionResultsBlock) { return new InstanceResponseBlock(resultsBlock); } if (resultsBlock.getNumRows() > 0) { + totalRowsStreamed += resultsBlock.getNumRows(); _streamer.send(resultsBlock); } resultsBlock = getBaseBlock(); } // Return a metadata-only block in the end - return buildInstanceResponseBlock(resultsBlock); + InstanceResponseBlock responseBlock = buildInstanceResponseBlock(resultsBlock); + addLiteModeMetadataIfNeeded(responseBlock, totalRowsStreamed); + return responseBlock; } else { // Handle single block combine operator in streaming fashion BaseResultsBlock resultsBlock = getBaseBlock(); @@ -115,6 +121,13 @@ protected BaseResultsBlock getCombinedResults() { return _combineOperator.nextBlock(); } + private void addLiteModeMetadataIfNeeded(InstanceResponseBlock responseBlock, long totalRowsStreamed) { + Integer implicitLimit = QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(_queryContext.getQueryOptions()); + if (implicitLimit != null && totalRowsStreamed >= implicitLimit) { + responseBlock.addMetadata(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), "true"); + } + } + @Override public String toExplainString() { return EXPLAIN_NAME; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java new file mode 100644 index 000000000000..9b2351ad4768 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; +import org.apache.pinot.core.operator.combine.BaseCombineOperator; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class InstanceResponseOperatorTest { + + @Test + public void testLiteModeLeafStageLimitReachedWhenNumRowsEqualsLimit() { + Map queryOptions = new HashMap<>(); + queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10"); + + SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class); + when(resultsBlock.getNumRows()).thenReturn(10); + when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>()); + when(resultsBlock.getNumServerThreads()).thenReturn(1); + + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.getQueryOptions()).thenReturn(queryOptions); + + InstanceResponseOperator operator = new InstanceResponseOperator( + mock(BaseCombineOperator.class), Collections.emptyList(), Collections.emptyList(), queryContext); + + InstanceResponseBlock responseBlock = operator.buildInstanceResponseBlock(resultsBlock); + + assertEquals(responseBlock.getResponseMetadata().get(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName()), + "true"); + } + + @Test + public void testLiteModeLeafStageLimitNotReachedWhenNumRowsBelowLimit() { + Map queryOptions = new HashMap<>(); + queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10"); + + SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class); + when(resultsBlock.getNumRows()).thenReturn(5); + when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>()); + when(resultsBlock.getNumServerThreads()).thenReturn(1); + + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.getQueryOptions()).thenReturn(queryOptions); + + InstanceResponseOperator operator = new InstanceResponseOperator( + mock(BaseCombineOperator.class), Collections.emptyList(), Collections.emptyList(), queryContext); + + InstanceResponseBlock responseBlock = operator.buildInstanceResponseBlock(resultsBlock); + + assertFalse(responseBlock.getResponseMetadata() + .containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName())); + } + + @Test + public void testNoLiteModeMetadataWhenOptionAbsent() { + Map queryOptions = new HashMap<>(); + + SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class); + when(resultsBlock.getNumRows()).thenReturn(100); + when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>()); + when(resultsBlock.getNumServerThreads()).thenReturn(1); + + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.getQueryOptions()).thenReturn(queryOptions); + + InstanceResponseOperator operator = new InstanceResponseOperator( + mock(BaseCombineOperator.class), Collections.emptyList(), Collections.emptyList(), queryContext); + + InstanceResponseBlock responseBlock = operator.buildInstanceResponseBlock(resultsBlock); + + assertFalse(responseBlock.getResponseMetadata() + .containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName())); + } + + @Test + public void testLiteModeLeafStageLimitReachedWhenNumRowsExceedsLimit() { + Map queryOptions = new HashMap<>(); + queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10"); + + SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class); + when(resultsBlock.getNumRows()).thenReturn(15); + when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>()); + when(resultsBlock.getNumServerThreads()).thenReturn(1); + + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.getQueryOptions()).thenReturn(queryOptions); + + InstanceResponseOperator operator = new InstanceResponseOperator( + mock(BaseCombineOperator.class), Collections.emptyList(), Collections.emptyList(), queryContext); + + InstanceResponseBlock responseBlock = operator.buildInstanceResponseBlock(resultsBlock); + + assertTrue(responseBlock.getResponseMetadata() + .containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName())); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 83b8e4a5f20b..3fddffd99013 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -277,7 +277,15 @@ private QueryEnvironment.QueryPlannerResult getQueryPlannerResult(PlannerContext extraFields.put(RuleTimingPlannerListener.RULE_TIMINGS, plannerContext.getPlannerOutput().get(RuleTimingPlannerListener.RULE_TIMINGS)); } - return new QueryPlannerResult(dispatchableSubPlan, explainStr, tableNames, extraFields); + boolean liteModeImplicitSortApplied = false; + int liteModeEffectiveSortLimit = -1; + PhysicalPlannerContext physicalPlannerContext = plannerContext.getPhysicalPlannerContext(); + if (physicalPlannerContext != null && physicalPlannerContext.isLiteModeImplicitSortApplied()) { + liteModeImplicitSortApplied = true; + liteModeEffectiveSortLimit = physicalPlannerContext.getLiteModeEffectiveSortLimit(); + } + return new QueryPlannerResult(dispatchableSubPlan, explainStr, tableNames, extraFields, + liteModeImplicitSortApplied, liteModeEffectiveSortLimit); } /// @deprecated Use [#compile] and then [explain][CompiledQuery#explain(long) ] the returned query instead @@ -354,13 +362,18 @@ public static class QueryPlannerResult { private final String _explainPlan; private final Set _tableNames; private final Map _extraFields; + private final boolean _liteModeImplicitSortApplied; + private final int _liteModeEffectiveSortLimit; QueryPlannerResult(@Nullable DispatchableSubPlan dispatchableSubPlan, @Nullable String explainPlan, - Set tableNames, Map extraFields) { + Set tableNames, Map extraFields, boolean liteModeImplicitSortApplied, + int liteModeEffectiveSortLimit) { _dispatchableSubPlan = dispatchableSubPlan; _explainPlan = explainPlan; _tableNames = tableNames; _extraFields = extraFields; + _liteModeImplicitSortApplied = liteModeImplicitSortApplied; + _liteModeEffectiveSortLimit = liteModeEffectiveSortLimit; } public String getExplainPlan() { @@ -378,6 +391,14 @@ public Set getTableNames() { public Map getExtraFields() { return _extraFields; } + + public boolean isLiteModeImplicitSortApplied() { + return _liteModeImplicitSortApplied; + } + + public int getLiteModeEffectiveSortLimit() { + return _liteModeEffectiveSortLimit; + } } // -------------------------------------------------------------------------- diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java index 816190ae719a..4adb09403a70 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java @@ -74,6 +74,8 @@ public Integer get() { private final boolean _liteModeJoinsEnabled; @Nullable private final MultiClusterRoutingContext _multiClusterRoutingContext; + private boolean _liteModeImplicitSortApplied = false; + private int _liteModeEffectiveSortLimit = -1; /** * Used by controller when it needs to extract table names from the query. @@ -186,6 +188,19 @@ public int getLiteModeLeafStageFanOutAdjustedLimit() { return _liteModeLeafStageFanOutAdjustedLimit; } + public void setLiteModeImplicitSortApplied(int effectiveLimit) { + _liteModeImplicitSortApplied = true; + _liteModeEffectiveSortLimit = effectiveLimit; + } + + public boolean isLiteModeImplicitSortApplied() { + return _liteModeImplicitSortApplied; + } + + public int getLiteModeEffectiveSortLimit() { + return _liteModeEffectiveSortLimit; + } + /** * Gets a random instance id from the registered instances in the context. *

diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java index e8128699b41c..0ca38a795ccf 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java @@ -81,6 +81,7 @@ public PRelNode onMatch(PRelOptRuleCall call) { liteModeLimit); return sort; } + _context.setLiteModeImplicitSortApplied(liteModeLimit); return sort.withFetch(newFetch); } if (call._currentNode instanceof PhysicalAggregate) { @@ -89,6 +90,9 @@ public PRelNode onMatch(PRelOptRuleCall call) { Preconditions.checkState(aggregate.getLimit() <= liteModeLimit, "Group trim limit={} exceeds server stage limit={}", aggregate.getLimit(), liteModeLimit); int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : liteModeLimit; + if (aggregate.getLimit() <= 0) { + _context.setLiteModeImplicitSortApplied(liteModeLimit); + } return aggregate.withLimit(limit); } RelCollation relCollation = RelCollations.EMPTY; @@ -103,6 +107,7 @@ public PRelNode onMatch(PRelOptRuleCall call) { } } PRelNode input = call._currentNode; + _context.setLiteModeImplicitSortApplied(liteModeLimit); return new PhysicalSort(input.unwrap().getCluster(), RelTraitSet.createEmpty(), List.of(), relCollation, null /* offset */, newFetch, input, nodeId(), input.getPinotDataDistributionOrThrow(), true); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java index 289ccc53c754..5ec9d9fd2e28 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java @@ -368,6 +368,9 @@ private synchronized void mergeExecutionStats(Map executionStats case NUM_GROUPS_WARNING_LIMIT_REACHED: _statMap.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, Boolean.parseBoolean(entry.getValue())); break; + case LITE_MODE_LEAF_STAGE_LIMIT_REACHED: + _statMap.merge(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED, Boolean.parseBoolean(entry.getValue())); + break; case TIME_USED_MS: _statMap.merge(StatKey.SSE_EXECUTION_TIME_MS, Long.parseLong(entry.getValue())); break; @@ -750,6 +753,7 @@ public long merge(long value1, long value2) { GROUPS_TRIMMED(StatMap.Type.BOOLEAN), NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN), NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN), + LITE_MODE_LEAF_STAGE_LIMIT_REACHED(StatMap.Type.BOOLEAN), NUM_RESIZES(StatMap.Type.INT, null), RESIZE_TIME_MS(StatMap.Type.LONG, null), THREAD_CPU_TIME_NS(StatMap.Type.LONG, null), diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java index b56840d05c77..ebcbbb2e13b1 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java @@ -647,4 +647,63 @@ public void shouldReturnDistinctResultBlock() { operator.close(); } + + @Test + public void shouldPropagateLiteModeLeafStageLimitReached() { + // Given: + DataSchema schema = new DataSchema(new String[]{"intCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new MetadataResultsBlock()); + metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), + "true"); + QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(), metadataBlock); + LeafOperator operator = + new LeafOperator(OperatorTestUtil.getTracingContext(), mockQueryRequests(1), schema, queryExecutor, + _executorService); + _operatorRef.set(operator); + + // When: + assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the metadata block"); + + // Then: + StatMap leafStats = operator.copyStatMaps(); + assertTrue(leafStats.getBoolean(LeafOperator.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED)); + + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats); + assertTrue(brokerResponse.isMseLiteLeafStageLimitReached()); + assertTrue(brokerResponse.isPartialResult()); + JsonNode responseJson = JsonUtils.objectToJsonNode(brokerResponse); + assertTrue(responseJson.path("mseLiteLeafStageLimitReached").asBoolean(false)); + assertTrue(responseJson.path("partialResult").asBoolean(false)); + + operator.close(); + } + + @Test + public void shouldNotSetLiteModeLeafStageLimitWhenNotReached() { + // Given: + DataSchema schema = new DataSchema(new String[]{"intCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new MetadataResultsBlock()); + QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(), metadataBlock); + LeafOperator operator = + new LeafOperator(OperatorTestUtil.getTracingContext(), mockQueryRequests(1), schema, queryExecutor, + _executorService); + _operatorRef.set(operator); + + // When: + assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the metadata block"); + + // Then: + StatMap leafStats = operator.copyStatMaps(); + assertFalse(leafStats.getBoolean(LeafOperator.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED)); + + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats); + assertFalse(brokerResponse.isMseLiteLeafStageLimitReached()); + assertFalse(brokerResponse.isPartialResult()); + + operator.close(); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c93b7d1087ef..80c06e990ec1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -981,6 +981,7 @@ public static class QueryOptionKey { // Server stage limit for lite mode queries. public static final String LITE_MODE_LEAF_STAGE_LIMIT = "liteModeLeafStageLimit"; public static final String LITE_MODE_LEAF_STAGE_FANOUT_ADJUSTED_LIMIT = "liteModeLeafStageFanOutAdjustedLimit"; + public static final String LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT = "liteModeImplicitLeafStageLimit"; // Used by the MSE engine to enable broker-side segment pruning during routing. The physical optimizer // path defaults to DEFAULT_USE_BROKER_PRUNING (true); the logical planner path defaults to // DEFAULT_LOGICAL_PLANNER_USE_BROKER_PRUNING (false). Both can be overridden per-query. From e40bbc10d4d76c6a8f8443e962f79b5c306c8df6 Mon Sep 17 00:00:00 2001 From: "anurag.rai" Date: Tue, 16 Jun 2026 14:35:15 +0000 Subject: [PATCH 2/2] Address PR #18725 review comments for MSE Lite warnings - Remove redundant _liteModeImplicitSortApplied boolean; derive from _liteModeEffectiveSortLimit >= 0 (simplifies PhysicalPlannerContext, QueryEnvironment, QueryPlannerResult) - Convert ternary to if/else in LiteModeSortInsertRule aggregate branch - Add comment to LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT explaining it is a system-internal query option - Add BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED metric - Add isMseLiteLeafStageLimitReached to BrokerResponse interface and RequestContext for broker event listener capture --- .../requesthandler/BaseBrokerRequestHandler.java | 1 + .../MultiStageBrokerRequestHandler.java | 4 ++++ .../apache/pinot/common/metrics/BrokerMeter.java | 3 +++ .../pinot/common/response/BrokerResponse.java | 7 +++++++ .../core/operator/InstanceResponseOperator.java | 3 +++ .../org/apache/pinot/query/QueryEnvironment.java | 12 ++++-------- .../pinot/query/context/PhysicalPlannerContext.java | 6 ++---- .../v2/opt/rules/LiteModeSortInsertRule.java | 13 ++++++++----- .../pinot/spi/trace/DefaultRequestContext.java | 5 +++++ .../org/apache/pinot/spi/trace/RequestContext.java | 2 ++ .../org/apache/pinot/spi/utils/CommonConstants.java | 2 ++ 11 files changed, 41 insertions(+), 17 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 1517b698a9db..dc8e1cbba57a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -351,6 +351,7 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons statistics.setNumExceptions(numExceptions); statistics.setGroupsTrimmed(response.isGroupsTrimmed()); statistics.setNumGroupsLimitReached(response.isNumGroupsLimitReached()); + statistics.setMseLiteLeafStageLimitReached(response.isMseLiteLeafStageLimitReached()); statistics.setProcessingTimeMillis(response.getTimeUsedMs()); statistics.setNumDocsScanned(response.getNumDocsScanned()); statistics.setTotalDocs(response.getTotalDocs()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 1e6626eaaa9e..526020396b44 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -856,6 +856,10 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI if (brokerResponse.isGroupsTrimmed()) { _brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1); } + if (brokerResponse.isMseLiteLeafStageLimitReached()) { + _brokerMetrics.addMeteredTableValue(table, + BrokerMeter.BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED, 1); + } } brokerResponse.setTimeUsedMs(totalTimeMs); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 614d666c8a4a..1bcd8c37b2d3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -174,6 +174,9 @@ public class BrokerMeter implements AbstractMetrics.Meter { public static final BrokerMeter BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED = create( "BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED", "badResponses", false); + public static final BrokerMeter BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED = create( + "BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED", "badResponses", false); + // These metrics track the cost of the query. public static final BrokerMeter DOCUMENTS_SCANNED = create( "DOCUMENTS_SCANNED", "documents", false); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index 061c2fed5707..1dc6f2d0076b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -166,6 +166,13 @@ default int getExceptionsSize() { */ boolean isMaxRowsInWindowReached(); + /** + * Returns whether the MSE Lite leaf-stage limit has been reached. + */ + default boolean isMseLiteLeafStageLimitReached() { + return false; + } + /** * Returns the total time used for query execution in milliseconds. */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java index 8feff1ffbdf3..f5de20b2bf39 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java @@ -30,10 +30,13 @@ import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InstanceResponseOperator extends BaseOperator { private static final String EXPLAIN_NAME = "INSTANCE_RESPONSE"; + private static final Logger LOGGER = LoggerFactory.getLogger(InstanceResponseOperator.class); protected final BaseCombineOperator _combineOperator; protected final List _segmentContexts; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 3fddffd99013..2fb657de1fdd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -277,15 +277,13 @@ private QueryEnvironment.QueryPlannerResult getQueryPlannerResult(PlannerContext extraFields.put(RuleTimingPlannerListener.RULE_TIMINGS, plannerContext.getPlannerOutput().get(RuleTimingPlannerListener.RULE_TIMINGS)); } - boolean liteModeImplicitSortApplied = false; int liteModeEffectiveSortLimit = -1; PhysicalPlannerContext physicalPlannerContext = plannerContext.getPhysicalPlannerContext(); - if (physicalPlannerContext != null && physicalPlannerContext.isLiteModeImplicitSortApplied()) { - liteModeImplicitSortApplied = true; + if (physicalPlannerContext != null) { liteModeEffectiveSortLimit = physicalPlannerContext.getLiteModeEffectiveSortLimit(); } return new QueryPlannerResult(dispatchableSubPlan, explainStr, tableNames, extraFields, - liteModeImplicitSortApplied, liteModeEffectiveSortLimit); + liteModeEffectiveSortLimit); } /// @deprecated Use [#compile] and then [explain][CompiledQuery#explain(long) ] the returned query instead @@ -362,17 +360,15 @@ public static class QueryPlannerResult { private final String _explainPlan; private final Set _tableNames; private final Map _extraFields; - private final boolean _liteModeImplicitSortApplied; private final int _liteModeEffectiveSortLimit; QueryPlannerResult(@Nullable DispatchableSubPlan dispatchableSubPlan, @Nullable String explainPlan, - Set tableNames, Map extraFields, boolean liteModeImplicitSortApplied, + Set tableNames, Map extraFields, int liteModeEffectiveSortLimit) { _dispatchableSubPlan = dispatchableSubPlan; _explainPlan = explainPlan; _tableNames = tableNames; _extraFields = extraFields; - _liteModeImplicitSortApplied = liteModeImplicitSortApplied; _liteModeEffectiveSortLimit = liteModeEffectiveSortLimit; } @@ -393,7 +389,7 @@ public Map getExtraFields() { } public boolean isLiteModeImplicitSortApplied() { - return _liteModeImplicitSortApplied; + return _liteModeEffectiveSortLimit >= 0; } public int getLiteModeEffectiveSortLimit() { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java index 4adb09403a70..451f48b1bc20 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java @@ -74,7 +74,6 @@ public Integer get() { private final boolean _liteModeJoinsEnabled; @Nullable private final MultiClusterRoutingContext _multiClusterRoutingContext; - private boolean _liteModeImplicitSortApplied = false; private int _liteModeEffectiveSortLimit = -1; /** @@ -188,13 +187,12 @@ public int getLiteModeLeafStageFanOutAdjustedLimit() { return _liteModeLeafStageFanOutAdjustedLimit; } - public void setLiteModeImplicitSortApplied(int effectiveLimit) { - _liteModeImplicitSortApplied = true; + public void setLiteModeEffectiveSortLimit(int effectiveLimit) { _liteModeEffectiveSortLimit = effectiveLimit; } public boolean isLiteModeImplicitSortApplied() { - return _liteModeImplicitSortApplied; + return _liteModeEffectiveSortLimit >= 0; } public int getLiteModeEffectiveSortLimit() { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java index 0ca38a795ccf..7bee7f08440a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java @@ -81,7 +81,7 @@ public PRelNode onMatch(PRelOptRuleCall call) { liteModeLimit); return sort; } - _context.setLiteModeImplicitSortApplied(liteModeLimit); + _context.setLiteModeEffectiveSortLimit(liteModeLimit); return sort.withFetch(newFetch); } if (call._currentNode instanceof PhysicalAggregate) { @@ -89,9 +89,12 @@ public PRelNode onMatch(PRelOptRuleCall call) { PhysicalAggregate aggregate = (PhysicalAggregate) call._currentNode; Preconditions.checkState(aggregate.getLimit() <= liteModeLimit, "Group trim limit={} exceeds server stage limit={}", aggregate.getLimit(), liteModeLimit); - int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : liteModeLimit; - if (aggregate.getLimit() <= 0) { - _context.setLiteModeImplicitSortApplied(liteModeLimit); + int limit; + if (aggregate.getLimit() > 0) { + limit = aggregate.getLimit(); + } else { + limit = liteModeLimit; + _context.setLiteModeEffectiveSortLimit(liteModeLimit); } return aggregate.withLimit(limit); } @@ -107,7 +110,7 @@ public PRelNode onMatch(PRelOptRuleCall call) { } } PRelNode input = call._currentNode; - _context.setLiteModeImplicitSortApplied(liteModeLimit); + _context.setLiteModeEffectiveSortLimit(liteModeLimit); return new PhysicalSort(input.unwrap().getCluster(), RelTraitSet.createEmpty(), List.of(), relCollation, null /* offset */, newFetch, input, nodeId(), input.getPinotDataDistributionOrThrow(), true); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java index f29adcc327ef..38067ee1614c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java @@ -507,6 +507,11 @@ public void setNumGroupsLimitReached(boolean numGroupsLimitReached) { _isNumGroupsLimitReached = numGroupsLimitReached; } + @Override + public void setMseLiteLeafStageLimitReached(boolean mseLiteLeafStageLimitReached) { + // No-op: not tracked in default context + } + @Override public void setNumExceptions(int numExceptions) { _numExceptions = numExceptions; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java index 95f1cf25ea10..37e413765dcf 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java @@ -186,6 +186,8 @@ default boolean isSampledRequest() { void setNumGroupsLimitReached(boolean numGroupsLimitReached); + void setMseLiteLeafStageLimitReached(boolean mseLiteLeafStageLimitReached); + void setNumExceptions(int numExceptions); void setNumRowsResultSet(int numRowsResultSet); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 80c06e990ec1..3b2d69a33b53 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -981,6 +981,8 @@ public static class QueryOptionKey { // Server stage limit for lite mode queries. public static final String LITE_MODE_LEAF_STAGE_LIMIT = "liteModeLeafStageLimit"; public static final String LITE_MODE_LEAF_STAGE_FANOUT_ADJUSTED_LIMIT = "liteModeLeafStageFanOutAdjustedLimit"; + // System-internal option injected by the broker when MSE Lite implicitly inserts a leaf-stage limit. + // Not user-settable; used by servers to detect silent truncation at execution time. public static final String LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT = "liteModeImplicitLeafStageLimit"; // Used by the MSE engine to enable broker-side segment pruning during routing. The physical optimizer // path defaults to DEFAULT_USE_BROKER_PRUNING (true); the logical planner path defaults to