diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 1517b698a9db..dc8e1cbba57a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -351,6 +351,7 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons statistics.setNumExceptions(numExceptions); statistics.setGroupsTrimmed(response.isGroupsTrimmed()); statistics.setNumGroupsLimitReached(response.isNumGroupsLimitReached()); + statistics.setMseLiteLeafStageLimitReached(response.isMseLiteLeafStageLimitReached()); statistics.setProcessingTimeMillis(response.getTimeUsedMs()); statistics.setNumDocsScanned(response.getNumDocsScanned()); statistics.setTotalDocs(response.getTotalDocs()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 33651babb7f5..526020396b44 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -650,6 +650,12 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan(); + // Inject the implicit leaf-stage limit as a query option so servers can detect truncation + if (queryPlanResult.isLiteModeImplicitSortApplied()) { + query.getOptions().put(CommonConstants.Broker.Request.QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, + String.valueOf(queryPlanResult.getLiteModeEffectiveSortLimit())); + } + // Optionally set ignoreMissingSegments query option based on broker config if not already set. if (_config.getProperty(CommonConstants.Broker.CONFIG_OF_IGNORE_MISSING_SEGMENTS, CommonConstants.Broker.DEFAULT_IGNORE_MISSING_SEGMENTS)) { @@ -829,6 +835,15 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI } } + // Set MSE Lite planning-time warning fields + if (queryPlanResult.isLiteModeImplicitSortApplied()) { + int effectiveLimit = queryPlanResult.getLiteModeEffectiveSortLimit(); + brokerResponse.setMseLiteLeafStageEffectiveLimit(effectiveLimit); + brokerResponse.setMseLiteFanOutAdjustedLimitApplied( + effectiveLimit != QueryOptionsUtils.getLiteModeLeafStageLimit(query.getOptions(), + CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT)); + } + long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis(); _brokerMetrics.addTimedValue(BrokerTimer.MULTI_STAGE_QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS); @@ -841,6 +856,10 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI if (brokerResponse.isGroupsTrimmed()) { _brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1); } + if (brokerResponse.isMseLiteLeafStageLimitReached()) { + _brokerMetrics.addMeteredTableValue(table, + BrokerMeter.BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED, 1); + } } brokerResponse.setTimeUsedMs(totalTimeMs); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index ddba67d92df5..afb6665e30f4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -161,11 +161,12 @@ enum MetadataKey { // the merge (e.g., due to a schema conflict), so the merge ran over a strict subset of the // inputs. How a downstream consumer reacts (skip, retry, accept with annotation) is the // consumer's policy. - INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING); + INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING), + LITE_MODE_LEAF_STAGE_LIMIT_REACHED(44, "liteModeLeafStageLimitReached", MetadataValueType.STRING); // We keep this constant to track the max id added so far for backward compatibility. // Increase it when adding new keys, but NEVER DECREASE IT!!! - private static final int MAX_ID = INCOMPLETE_MERGE.getId(); + private static final int MAX_ID = LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getId(); private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1]; private static final Map NAME_TO_ENUM_KEY_MAP = new HashMap<>(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 614d666c8a4a..1bcd8c37b2d3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -174,6 +174,9 @@ public class BrokerMeter implements AbstractMetrics.Meter { public static final BrokerMeter BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED = create( "BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED", "badResponses", false); + public static final BrokerMeter BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED = create( + "BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED", "badResponses", false); + // These metrics track the cost of the query. public static final BrokerMeter DOCUMENTS_SCANNED = create( "DOCUMENTS_SCANNED", "documents", false); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index 061c2fed5707..1dc6f2d0076b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -166,6 +166,13 @@ default int getExceptionsSize() { */ boolean isMaxRowsInWindowReached(); + /** + * Returns whether the MSE Lite leaf-stage limit has been reached. + */ + default boolean isMseLiteLeafStageLimitReached() { + return false; + } + /** * Returns the total time used for query execution in milliseconds. */ diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index c9927c537c7d..3012c67831d0 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -52,7 +52,8 @@ "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried", "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes", "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes", - "pools", "rlsFiltersApplied", "groupsTrimmed" + "pools", "rlsFiltersApplied", "groupsTrimmed", + "mseLiteLeafStageLimitReached", "mseLiteLeafStageEffectiveLimit", "mseLiteFanOutAdjustedLimitApplied" }) public class BrokerResponseNativeV2 implements BrokerResponse { private final StatMap _brokerStats = new StatMap<>(StatKey.class); @@ -94,6 +95,10 @@ public class BrokerResponseNativeV2 implements BrokerResponse { private Set _pools = Set.of(); private boolean _rlsFiltersApplied = false; + @Nullable + private Integer _mseLiteLeafStageEffectiveLimit; + @Nullable + private Boolean _mseLiteFanOutAdjustedLimitApplied; @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @@ -121,7 +126,7 @@ public int getNumRowsResultSet() { @Override public boolean isPartialResult() { return getExceptionsSize() > 0 || isNumGroupsLimitReached() || !getEarlyTerminationReasons().isEmpty() - || isMaxRowsInJoinReached(); + || isMaxRowsInJoinReached() || isMseLiteLeafStageLimitReached(); } @Override @@ -172,6 +177,35 @@ public void mergeNumGroupsWarningLimitReached(boolean numGroupsWarningLimitReach _brokerStats.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, numGroupsWarningLimitReached); } + public boolean isMseLiteLeafStageLimitReached() { + return _brokerStats.getBoolean(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED); + } + + public void mergeMseLiteLeafStageLimitReached(boolean mseLiteLeafStageLimitReached) { + _brokerStats.merge(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED, mseLiteLeafStageLimitReached); + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public Integer getMseLiteLeafStageEffectiveLimit() { + return _mseLiteLeafStageEffectiveLimit; + } + + public void setMseLiteLeafStageEffectiveLimit(int mseLiteLeafStageEffectiveLimit) { + _mseLiteLeafStageEffectiveLimit = mseLiteLeafStageEffectiveLimit; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("mseLiteFanOutAdjustedLimitApplied") + @Nullable + public Boolean getMseLiteFanOutAdjustedLimitApplied() { + return _mseLiteFanOutAdjustedLimitApplied; + } + + public void setMseLiteFanOutAdjustedLimitApplied(boolean mseLiteFanOutAdjustedLimitApplied) { + _mseLiteFanOutAdjustedLimitApplied = mseLiteFanOutAdjustedLimitApplied; + } + @JsonInclude(JsonInclude.Include.NON_EMPTY) public List getEarlyTerminationReasons() { return List.copyOf(_brokerStats.getStringSet(StatKey.EARLY_TERMINATION_REASONS)); @@ -515,7 +549,8 @@ public long merge(long value1, long value2) { return Math.max(value1, value2); } }, - EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET); + EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET), + LITE_MODE_LEAF_STAGE_LIMIT_REACHED(StatMap.Type.BOOLEAN); private final StatMap.Type _type; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 3134d97f4bfa..a9bbda0ee4c4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -587,6 +587,12 @@ public static Integer getLiteModeLeafStageFanOutAdjustedLimit(Map queryOptions) { + String val = queryOptions.get(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT); + return val != null ? Integer.parseInt(val) : null; + } + @Nullable private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) { if (optionValue == null) { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java new file mode 100644 index 000000000000..56579cd129a0 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.broker; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class BrokerResponseNativeV2LiteModeTest { + + @Test + public void testLiteModeLeafStageLimitReachedMarksPartialResult() { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + assertFalse(brokerResponse.isMseLiteLeafStageLimitReached()); + assertFalse(brokerResponse.isPartialResult()); + + brokerResponse.mergeMseLiteLeafStageLimitReached(true); + + assertTrue(brokerResponse.isMseLiteLeafStageLimitReached()); + assertTrue(brokerResponse.isPartialResult()); + } + + @Test + public void testLiteModeLeafStageLimitReachedViaStatMap() { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + brokerResponse.addBrokerStats(new StatMap<>(BrokerResponseNativeV2.StatKey.class) + .merge(BrokerResponseNativeV2.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED, true)); + + assertTrue(brokerResponse.isMseLiteLeafStageLimitReached()); + assertTrue(brokerResponse.isPartialResult()); + } + + @Test + public void testLiteModeFieldsInJsonSerialization() + throws Exception { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + brokerResponse.mergeMseLiteLeafStageLimitReached(true); + brokerResponse.setMseLiteLeafStageEffectiveLimit(100); + brokerResponse.setMseLiteFanOutAdjustedLimitApplied(true); + + JsonNode json = JsonUtils.objectToJsonNode(brokerResponse); + + assertTrue(json.path("mseLiteLeafStageLimitReached").asBoolean(false)); + assertEquals(json.path("mseLiteLeafStageEffectiveLimit").asInt(), 100); + assertTrue(json.path("mseLiteFanOutAdjustedLimitApplied").asBoolean(false)); + assertTrue(json.path("partialResult").asBoolean(false)); + } + + @Test + public void testLiteModeFieldsNotSerializedWhenDefault() + throws Exception { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + + JsonNode json = JsonUtils.objectToJsonNode(brokerResponse); + + assertFalse(json.has("mseLiteLeafStageEffectiveLimit")); + assertFalse(json.has("mseLiteFanOutAdjustedLimitApplied")); + } + + @Test + public void testPartialResultNotSetWhenLimitNotReached() { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + brokerResponse.setMseLiteLeafStageEffectiveLimit(100); + + assertFalse(brokerResponse.isMseLiteLeafStageLimitReached()); + assertFalse(brokerResponse.isPartialResult()); + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java index 8ccc0b1232e7..5ece39b66cb2 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java @@ -343,4 +343,20 @@ private static Object getValue(Map map, String key) { throw new IllegalArgumentException("Unexpected key!"); } } + + @Test + public void testGetLiteModeImplicitLeafStageLimit() { + Map queryOptions = new HashMap<>(); + + // Absent → null + assertNull(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions)); + + // Present → parsed value + queryOptions.put(LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "42"); + assertEquals(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions), Integer.valueOf(42)); + + // Zero + queryOptions.put(LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "0"); + assertEquals(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions), Integer.valueOf(0)); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java index 5a8927fb4e3c..f5de20b2bf39 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; @@ -29,10 +30,13 @@ import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InstanceResponseOperator extends BaseOperator { private static final String EXPLAIN_NAME = "INSTANCE_RESPONSE"; + private static final Logger LOGGER = LoggerFactory.getLogger(InstanceResponseOperator.class); protected final BaseCombineOperator _combineOperator; protected final List _segmentContexts; @@ -96,6 +100,13 @@ protected InstanceResponseBlock buildInstanceResponseBlock(BaseResultsBlock base String.valueOf(_threadMemAllocatedBytes)); instanceResponseBlock.addMetadata(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), String.valueOf(_systemActivitiesCpuTimeNs)); + Integer implicitLimit = QueryOptionsUtils.getLiteModeImplicitLeafStageLimit( + _queryContext.getQueryOptions()); + // false-positive when table has exactly implicitLimit rows + if (implicitLimit != null && baseResultsBlock.getNumRows() >= implicitLimit) { + instanceResponseBlock.addMetadata( + MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), "true"); + } return instanceResponseBlock; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java index 857c853bcd03..aca7c7ab93b4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java @@ -19,6 +19,8 @@ package org.apache.pinot.core.operator.streaming; import java.util.List; +import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.operator.InstanceResponseOperator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; @@ -66,18 +68,22 @@ protected InstanceResponseBlock getNextBlock() { prefetchAll(); if (_streamingCombineOperator != null) { _streamingCombineOperator.start(); + long totalRowsStreamed = 0; BaseResultsBlock resultsBlock = getBaseBlock(); while (!(resultsBlock instanceof MetadataResultsBlock)) { if (resultsBlock instanceof ExceptionResultsBlock) { return new InstanceResponseBlock(resultsBlock); } if (resultsBlock.getNumRows() > 0) { + totalRowsStreamed += resultsBlock.getNumRows(); _streamer.send(resultsBlock); } resultsBlock = getBaseBlock(); } // Return a metadata-only block in the end - return buildInstanceResponseBlock(resultsBlock); + InstanceResponseBlock responseBlock = buildInstanceResponseBlock(resultsBlock); + addLiteModeMetadataIfNeeded(responseBlock, totalRowsStreamed); + return responseBlock; } else { // Handle single block combine operator in streaming fashion BaseResultsBlock resultsBlock = getBaseBlock(); @@ -115,6 +121,13 @@ protected BaseResultsBlock getCombinedResults() { return _combineOperator.nextBlock(); } + private void addLiteModeMetadataIfNeeded(InstanceResponseBlock responseBlock, long totalRowsStreamed) { + Integer implicitLimit = QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(_queryContext.getQueryOptions()); + if (implicitLimit != null && totalRowsStreamed >= implicitLimit) { + responseBlock.addMetadata(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), "true"); + } + } + @Override public String toExplainString() { return EXPLAIN_NAME; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java new file mode 100644 index 000000000000..9b2351ad4768 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; +import org.apache.pinot.core.operator.combine.BaseCombineOperator; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class InstanceResponseOperatorTest { + + @Test + public void testLiteModeLeafStageLimitReachedWhenNumRowsEqualsLimit() { + Map queryOptions = new HashMap<>(); + queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10"); + + SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class); + when(resultsBlock.getNumRows()).thenReturn(10); + when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>()); + when(resultsBlock.getNumServerThreads()).thenReturn(1); + + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.getQueryOptions()).thenReturn(queryOptions); + + InstanceResponseOperator operator = new InstanceResponseOperator( + mock(BaseCombineOperator.class), Collections.emptyList(), Collections.emptyList(), queryContext); + + InstanceResponseBlock responseBlock = operator.buildInstanceResponseBlock(resultsBlock); + + assertEquals(responseBlock.getResponseMetadata().get(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName()), + "true"); + } + + @Test + public void testLiteModeLeafStageLimitNotReachedWhenNumRowsBelowLimit() { + Map queryOptions = new HashMap<>(); + queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10"); + + SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class); + when(resultsBlock.getNumRows()).thenReturn(5); + when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>()); + when(resultsBlock.getNumServerThreads()).thenReturn(1); + + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.getQueryOptions()).thenReturn(queryOptions); + + InstanceResponseOperator operator = new InstanceResponseOperator( + mock(BaseCombineOperator.class), Collections.emptyList(), Collections.emptyList(), queryContext); + + InstanceResponseBlock responseBlock = operator.buildInstanceResponseBlock(resultsBlock); + + assertFalse(responseBlock.getResponseMetadata() + .containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName())); + } + + @Test + public void testNoLiteModeMetadataWhenOptionAbsent() { + Map queryOptions = new HashMap<>(); + + SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class); + when(resultsBlock.getNumRows()).thenReturn(100); + when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>()); + when(resultsBlock.getNumServerThreads()).thenReturn(1); + + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.getQueryOptions()).thenReturn(queryOptions); + + InstanceResponseOperator operator = new InstanceResponseOperator( + mock(BaseCombineOperator.class), Collections.emptyList(), Collections.emptyList(), queryContext); + + InstanceResponseBlock responseBlock = operator.buildInstanceResponseBlock(resultsBlock); + + assertFalse(responseBlock.getResponseMetadata() + .containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName())); + } + + @Test + public void testLiteModeLeafStageLimitReachedWhenNumRowsExceedsLimit() { + Map queryOptions = new HashMap<>(); + queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10"); + + SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class); + when(resultsBlock.getNumRows()).thenReturn(15); + when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>()); + when(resultsBlock.getNumServerThreads()).thenReturn(1); + + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.getQueryOptions()).thenReturn(queryOptions); + + InstanceResponseOperator operator = new InstanceResponseOperator( + mock(BaseCombineOperator.class), Collections.emptyList(), Collections.emptyList(), queryContext); + + InstanceResponseBlock responseBlock = operator.buildInstanceResponseBlock(resultsBlock); + + assertTrue(responseBlock.getResponseMetadata() + .containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName())); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 83b8e4a5f20b..2fb657de1fdd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -277,7 +277,13 @@ private QueryEnvironment.QueryPlannerResult getQueryPlannerResult(PlannerContext extraFields.put(RuleTimingPlannerListener.RULE_TIMINGS, plannerContext.getPlannerOutput().get(RuleTimingPlannerListener.RULE_TIMINGS)); } - return new QueryPlannerResult(dispatchableSubPlan, explainStr, tableNames, extraFields); + int liteModeEffectiveSortLimit = -1; + PhysicalPlannerContext physicalPlannerContext = plannerContext.getPhysicalPlannerContext(); + if (physicalPlannerContext != null) { + liteModeEffectiveSortLimit = physicalPlannerContext.getLiteModeEffectiveSortLimit(); + } + return new QueryPlannerResult(dispatchableSubPlan, explainStr, tableNames, extraFields, + liteModeEffectiveSortLimit); } /// @deprecated Use [#compile] and then [explain][CompiledQuery#explain(long) ] the returned query instead @@ -354,13 +360,16 @@ public static class QueryPlannerResult { private final String _explainPlan; private final Set _tableNames; private final Map _extraFields; + private final int _liteModeEffectiveSortLimit; QueryPlannerResult(@Nullable DispatchableSubPlan dispatchableSubPlan, @Nullable String explainPlan, - Set tableNames, Map extraFields) { + Set tableNames, Map extraFields, + int liteModeEffectiveSortLimit) { _dispatchableSubPlan = dispatchableSubPlan; _explainPlan = explainPlan; _tableNames = tableNames; _extraFields = extraFields; + _liteModeEffectiveSortLimit = liteModeEffectiveSortLimit; } public String getExplainPlan() { @@ -378,6 +387,14 @@ public Set getTableNames() { public Map getExtraFields() { return _extraFields; } + + public boolean isLiteModeImplicitSortApplied() { + return _liteModeEffectiveSortLimit >= 0; + } + + public int getLiteModeEffectiveSortLimit() { + return _liteModeEffectiveSortLimit; + } } // -------------------------------------------------------------------------- diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java index 816190ae719a..451f48b1bc20 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java @@ -74,6 +74,7 @@ public Integer get() { private final boolean _liteModeJoinsEnabled; @Nullable private final MultiClusterRoutingContext _multiClusterRoutingContext; + private int _liteModeEffectiveSortLimit = -1; /** * Used by controller when it needs to extract table names from the query. @@ -186,6 +187,18 @@ public int getLiteModeLeafStageFanOutAdjustedLimit() { return _liteModeLeafStageFanOutAdjustedLimit; } + public void setLiteModeEffectiveSortLimit(int effectiveLimit) { + _liteModeEffectiveSortLimit = effectiveLimit; + } + + public boolean isLiteModeImplicitSortApplied() { + return _liteModeEffectiveSortLimit >= 0; + } + + public int getLiteModeEffectiveSortLimit() { + return _liteModeEffectiveSortLimit; + } + /** * Gets a random instance id from the registered instances in the context. *

diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java index e8128699b41c..7bee7f08440a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java @@ -81,6 +81,7 @@ public PRelNode onMatch(PRelOptRuleCall call) { liteModeLimit); return sort; } + _context.setLiteModeEffectiveSortLimit(liteModeLimit); return sort.withFetch(newFetch); } if (call._currentNode instanceof PhysicalAggregate) { @@ -88,7 +89,13 @@ public PRelNode onMatch(PRelOptRuleCall call) { PhysicalAggregate aggregate = (PhysicalAggregate) call._currentNode; Preconditions.checkState(aggregate.getLimit() <= liteModeLimit, "Group trim limit={} exceeds server stage limit={}", aggregate.getLimit(), liteModeLimit); - int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : liteModeLimit; + int limit; + if (aggregate.getLimit() > 0) { + limit = aggregate.getLimit(); + } else { + limit = liteModeLimit; + _context.setLiteModeEffectiveSortLimit(liteModeLimit); + } return aggregate.withLimit(limit); } RelCollation relCollation = RelCollations.EMPTY; @@ -103,6 +110,7 @@ public PRelNode onMatch(PRelOptRuleCall call) { } } PRelNode input = call._currentNode; + _context.setLiteModeEffectiveSortLimit(liteModeLimit); return new PhysicalSort(input.unwrap().getCluster(), RelTraitSet.createEmpty(), List.of(), relCollation, null /* offset */, newFetch, input, nodeId(), input.getPinotDataDistributionOrThrow(), true); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java index 289ccc53c754..5ec9d9fd2e28 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java @@ -368,6 +368,9 @@ private synchronized void mergeExecutionStats(Map executionStats case NUM_GROUPS_WARNING_LIMIT_REACHED: _statMap.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, Boolean.parseBoolean(entry.getValue())); break; + case LITE_MODE_LEAF_STAGE_LIMIT_REACHED: + _statMap.merge(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED, Boolean.parseBoolean(entry.getValue())); + break; case TIME_USED_MS: _statMap.merge(StatKey.SSE_EXECUTION_TIME_MS, Long.parseLong(entry.getValue())); break; @@ -750,6 +753,7 @@ public long merge(long value1, long value2) { GROUPS_TRIMMED(StatMap.Type.BOOLEAN), NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN), NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN), + LITE_MODE_LEAF_STAGE_LIMIT_REACHED(StatMap.Type.BOOLEAN), NUM_RESIZES(StatMap.Type.INT, null), RESIZE_TIME_MS(StatMap.Type.LONG, null), THREAD_CPU_TIME_NS(StatMap.Type.LONG, null), diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java index b56840d05c77..ebcbbb2e13b1 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java @@ -647,4 +647,63 @@ public void shouldReturnDistinctResultBlock() { operator.close(); } + + @Test + public void shouldPropagateLiteModeLeafStageLimitReached() { + // Given: + DataSchema schema = new DataSchema(new String[]{"intCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new MetadataResultsBlock()); + metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), + "true"); + QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(), metadataBlock); + LeafOperator operator = + new LeafOperator(OperatorTestUtil.getTracingContext(), mockQueryRequests(1), schema, queryExecutor, + _executorService); + _operatorRef.set(operator); + + // When: + assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the metadata block"); + + // Then: + StatMap leafStats = operator.copyStatMaps(); + assertTrue(leafStats.getBoolean(LeafOperator.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED)); + + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats); + assertTrue(brokerResponse.isMseLiteLeafStageLimitReached()); + assertTrue(brokerResponse.isPartialResult()); + JsonNode responseJson = JsonUtils.objectToJsonNode(brokerResponse); + assertTrue(responseJson.path("mseLiteLeafStageLimitReached").asBoolean(false)); + assertTrue(responseJson.path("partialResult").asBoolean(false)); + + operator.close(); + } + + @Test + public void shouldNotSetLiteModeLeafStageLimitWhenNotReached() { + // Given: + DataSchema schema = new DataSchema(new String[]{"intCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new MetadataResultsBlock()); + QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(), metadataBlock); + LeafOperator operator = + new LeafOperator(OperatorTestUtil.getTracingContext(), mockQueryRequests(1), schema, queryExecutor, + _executorService); + _operatorRef.set(operator); + + // When: + assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the metadata block"); + + // Then: + StatMap leafStats = operator.copyStatMaps(); + assertFalse(leafStats.getBoolean(LeafOperator.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED)); + + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats); + assertFalse(brokerResponse.isMseLiteLeafStageLimitReached()); + assertFalse(brokerResponse.isPartialResult()); + + operator.close(); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java index f29adcc327ef..38067ee1614c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java @@ -507,6 +507,11 @@ public void setNumGroupsLimitReached(boolean numGroupsLimitReached) { _isNumGroupsLimitReached = numGroupsLimitReached; } + @Override + public void setMseLiteLeafStageLimitReached(boolean mseLiteLeafStageLimitReached) { + // No-op: not tracked in default context + } + @Override public void setNumExceptions(int numExceptions) { _numExceptions = numExceptions; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java index 95f1cf25ea10..37e413765dcf 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java @@ -186,6 +186,8 @@ default boolean isSampledRequest() { void setNumGroupsLimitReached(boolean numGroupsLimitReached); + void setMseLiteLeafStageLimitReached(boolean mseLiteLeafStageLimitReached); + void setNumExceptions(int numExceptions); void setNumRowsResultSet(int numRowsResultSet); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c93b7d1087ef..3b2d69a33b53 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -981,6 +981,9 @@ public static class QueryOptionKey { // Server stage limit for lite mode queries. public static final String LITE_MODE_LEAF_STAGE_LIMIT = "liteModeLeafStageLimit"; public static final String LITE_MODE_LEAF_STAGE_FANOUT_ADJUSTED_LIMIT = "liteModeLeafStageFanOutAdjustedLimit"; + // System-internal option injected by the broker when MSE Lite implicitly inserts a leaf-stage limit. + // Not user-settable; used by servers to detect silent truncation at execution time. + public static final String LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT = "liteModeImplicitLeafStageLimit"; // Used by the MSE engine to enable broker-side segment pruning during routing. The physical optimizer // path defaults to DEFAULT_USE_BROKER_PRUNING (true); the logical planner path defaults to // DEFAULT_LOGICAL_PLANNER_USE_BROKER_PRUNING (false). Both can be overridden per-query.