Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons
statistics.setNumExceptions(numExceptions);
statistics.setGroupsTrimmed(response.isGroupsTrimmed());
statistics.setNumGroupsLimitReached(response.isNumGroupsLimitReached());
statistics.setMseLiteLeafStageLimitReached(response.isMseLiteLeafStageLimitReached());
statistics.setProcessingTimeMillis(response.getTimeUsedMs());
statistics.setNumDocsScanned(response.getNumDocsScanned());
statistics.setTotalDocs(response.getTotalDocs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,12 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI

DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();

// Inject the implicit leaf-stage limit as a query option so servers can detect truncation

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might also want to emit a metric when the limit is reached. also from Cellar PoV, with the current change would you be able to identify exact queries which hit this limit? you might want to ensure that broker event listener captures this flag too

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.
Added a metric at the end similar to groupsTrimmed. And added the flag to augmentStatistics in BaseBrokerRequestHandler

if (queryPlanResult.isLiteModeImplicitSortApplied()) {
query.getOptions().put(CommonConstants.Broker.Request.QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT,
String.valueOf(queryPlanResult.getLiteModeEffectiveSortLimit()));
}

// Optionally set ignoreMissingSegments query option based on broker config if not already set.
if (_config.getProperty(CommonConstants.Broker.CONFIG_OF_IGNORE_MISSING_SEGMENTS,
CommonConstants.Broker.DEFAULT_IGNORE_MISSING_SEGMENTS)) {
Expand Down Expand Up @@ -829,6 +835,15 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
}
}

// Set MSE Lite planning-time warning fields
if (queryPlanResult.isLiteModeImplicitSortApplied()) {
int effectiveLimit = queryPlanResult.getLiteModeEffectiveSortLimit();
brokerResponse.setMseLiteLeafStageEffectiveLimit(effectiveLimit);
brokerResponse.setMseLiteFanOutAdjustedLimitApplied(
effectiveLimit != QueryOptionsUtils.getLiteModeLeafStageLimit(query.getOptions(),
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT));
}

long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
_brokerMetrics.addTimedValue(BrokerTimer.MULTI_STAGE_QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS);

Expand All @@ -841,6 +856,10 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
if (brokerResponse.isGroupsTrimmed()) {
_brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1);
}
if (brokerResponse.isMseLiteLeafStageLimitReached()) {
_brokerMetrics.addMeteredTableValue(table,
BrokerMeter.BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED, 1);
}
}

brokerResponse.setTimeUsedMs(totalTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,12 @@ enum MetadataKey {
// the merge (e.g., due to a schema conflict), so the merge ran over a strict subset of the
// inputs. How a downstream consumer reacts (skip, retry, accept with annotation) is the
// consumer's policy.
INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING);
INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING),
LITE_MODE_LEAF_STAGE_LIMIT_REACHED(44, "liteModeLeafStageLimitReached", MetadataValueType.STRING);

// We keep this constant to track the max id added so far for backward compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
private static final int MAX_ID = INCOMPLETE_MERGE.getId();
private static final int MAX_ID = LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getId();

private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ public class BrokerMeter implements AbstractMetrics.Meter {
public static final BrokerMeter BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED = create(
"BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED", "badResponses", false);

public static final BrokerMeter BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED = create(
"BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED", "badResponses", false);

// These metrics track the cost of the query.
public static final BrokerMeter DOCUMENTS_SCANNED = create(
"DOCUMENTS_SCANNED", "documents", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ default int getExceptionsSize() {
*/
boolean isMaxRowsInWindowReached();

/**
* Returns whether the MSE Lite leaf-stage limit has been reached.
*/
default boolean isMseLiteLeafStageLimitReached() {
return false;
}

/**
* Returns the total time used for query execution in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes",
"realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes",
"pools", "rlsFiltersApplied", "groupsTrimmed"
"pools", "rlsFiltersApplied", "groupsTrimmed",
"mseLiteLeafStageLimitReached", "mseLiteLeafStageEffectiveLimit", "mseLiteFanOutAdjustedLimitApplied"
})
public class BrokerResponseNativeV2 implements BrokerResponse {
private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
Expand Down Expand Up @@ -94,6 +95,10 @@ public class BrokerResponseNativeV2 implements BrokerResponse {

private Set<Integer> _pools = Set.of();
private boolean _rlsFiltersApplied = false;
@Nullable
private Integer _mseLiteLeafStageEffectiveLimit;
@Nullable
private Boolean _mseLiteFanOutAdjustedLimitApplied;

@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
Expand Down Expand Up @@ -121,7 +126,7 @@ public int getNumRowsResultSet() {
@Override
public boolean isPartialResult() {
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || !getEarlyTerminationReasons().isEmpty()
|| isMaxRowsInJoinReached();
|| isMaxRowsInJoinReached() || isMseLiteLeafStageLimitReached();
}

@Override
Expand Down Expand Up @@ -172,6 +177,35 @@ public void mergeNumGroupsWarningLimitReached(boolean numGroupsWarningLimitReach
_brokerStats.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, numGroupsWarningLimitReached);
}

public boolean isMseLiteLeafStageLimitReached() {
return _brokerStats.getBoolean(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED);
}

public void mergeMseLiteLeafStageLimitReached(boolean mseLiteLeafStageLimitReached) {
_brokerStats.merge(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED, mseLiteLeafStageLimitReached);
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public Integer getMseLiteLeafStageEffectiveLimit() {
return _mseLiteLeafStageEffectiveLimit;
}

public void setMseLiteLeafStageEffectiveLimit(int mseLiteLeafStageEffectiveLimit) {
_mseLiteLeafStageEffectiveLimit = mseLiteLeafStageEffectiveLimit;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("mseLiteFanOutAdjustedLimitApplied")
@Nullable
public Boolean getMseLiteFanOutAdjustedLimitApplied() {
return _mseLiteFanOutAdjustedLimitApplied;
}

public void setMseLiteFanOutAdjustedLimitApplied(boolean mseLiteFanOutAdjustedLimitApplied) {
_mseLiteFanOutAdjustedLimitApplied = mseLiteFanOutAdjustedLimitApplied;
}

@JsonInclude(JsonInclude.Include.NON_EMPTY)
public List<String> getEarlyTerminationReasons() {
return List.copyOf(_brokerStats.getStringSet(StatKey.EARLY_TERMINATION_REASONS));
Expand Down Expand Up @@ -515,7 +549,8 @@ public long merge(long value1, long value2) {
return Math.max(value1, value2);
}
},
EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET);
EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET),
LITE_MODE_LEAF_STAGE_LIMIT_REACHED(StatMap.Type.BOOLEAN);

private final StatMap.Type _type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,12 @@ public static Integer getLiteModeLeafStageFanOutAdjustedLimit(Map<String, String
: defaultValue;
}

@Nullable
public static Integer getLiteModeImplicitLeafStageLimit(Map<String, String> queryOptions) {
String val = queryOptions.get(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT);
return val != null ? Integer.parseInt(val) : null;
}

@Nullable
private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) {
if (optionValue == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.response.broker;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


public class BrokerResponseNativeV2LiteModeTest {

@Test
public void testLiteModeLeafStageLimitReachedMarksPartialResult() {
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
assertFalse(brokerResponse.isMseLiteLeafStageLimitReached());
assertFalse(brokerResponse.isPartialResult());

brokerResponse.mergeMseLiteLeafStageLimitReached(true);

assertTrue(brokerResponse.isMseLiteLeafStageLimitReached());
assertTrue(brokerResponse.isPartialResult());
}

@Test
public void testLiteModeLeafStageLimitReachedViaStatMap() {
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.addBrokerStats(new StatMap<>(BrokerResponseNativeV2.StatKey.class)
.merge(BrokerResponseNativeV2.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED, true));

assertTrue(brokerResponse.isMseLiteLeafStageLimitReached());
assertTrue(brokerResponse.isPartialResult());
}

@Test
public void testLiteModeFieldsInJsonSerialization()
throws Exception {
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.mergeMseLiteLeafStageLimitReached(true);
brokerResponse.setMseLiteLeafStageEffectiveLimit(100);
brokerResponse.setMseLiteFanOutAdjustedLimitApplied(true);

JsonNode json = JsonUtils.objectToJsonNode(brokerResponse);

assertTrue(json.path("mseLiteLeafStageLimitReached").asBoolean(false));
assertEquals(json.path("mseLiteLeafStageEffectiveLimit").asInt(), 100);
assertTrue(json.path("mseLiteFanOutAdjustedLimitApplied").asBoolean(false));
assertTrue(json.path("partialResult").asBoolean(false));
}

@Test
public void testLiteModeFieldsNotSerializedWhenDefault()
throws Exception {
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();

JsonNode json = JsonUtils.objectToJsonNode(brokerResponse);

assertFalse(json.has("mseLiteLeafStageEffectiveLimit"));
assertFalse(json.has("mseLiteFanOutAdjustedLimitApplied"));
}

@Test
public void testPartialResultNotSetWhenLimitNotReached() {
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setMseLiteLeafStageEffectiveLimit(100);

assertFalse(brokerResponse.isMseLiteLeafStageLimitReached());
assertFalse(brokerResponse.isPartialResult());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,20 @@ private static Object getValue(Map<String, String> map, String key) {
throw new IllegalArgumentException("Unexpected key!");
}
}

@Test
public void testGetLiteModeImplicitLeafStageLimit() {
Map<String, String> queryOptions = new HashMap<>();

// Absent → null
assertNull(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions));

// Present → parsed value
queryOptions.put(LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "42");
assertEquals(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions), Integer.valueOf(42));

// Zero
queryOptions.put(LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "0");
assertEquals(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions), Integer.valueOf(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
Expand All @@ -29,10 +30,13 @@
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock> {
private static final String EXPLAIN_NAME = "INSTANCE_RESPONSE";
private static final Logger LOGGER = LoggerFactory.getLogger(InstanceResponseOperator.class);

protected final BaseCombineOperator<?> _combineOperator;
protected final List<SegmentContext> _segmentContexts;
Expand Down Expand Up @@ -96,6 +100,13 @@ protected InstanceResponseBlock buildInstanceResponseBlock(BaseResultsBlock base
String.valueOf(_threadMemAllocatedBytes));
instanceResponseBlock.addMetadata(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
String.valueOf(_systemActivitiesCpuTimeNs));
Integer implicitLimit = QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(
_queryContext.getQueryOptions());
// false-positive when table has exactly implicitLimit rows
if (implicitLimit != null && baseResultsBlock.getNumRows() >= implicitLimit) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numRows >= implicitLimit is not a valid truncation signal. When the complete result cardinality is exactly the implicit limit, this marks mseLiteLeafStageLimitReached=true and partialResult=true even though nothing was dropped; the new equality-case test bakes in that false positive. This needs an explicit "limit actually hit" signal from the limiting operator (and the same fix is needed in StreamingInstanceResponseOperator) instead of inferring truncation from equality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a known and documented limitation (and should be rare in reality) both in code and PR. Will add a note in MSE Lite docs too

instanceResponseBlock.addMetadata(
MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), "true");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it better to log baseResultsBlock.getNumRows() instead of just a true boolean?

also, afair, baseResultsBlock should ideally have at most implicitLimit rows because both selection and group by operators will not allow returning more than that. in that case i guess we can just keep this true

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont think that's needed. the final response has a mseLiteLeafStageEffectiveLimit that conveys to the user what was the effective limit pushed to the server, so we already know each server is returning <= mseLiteLeafStageEffectiveLimit

}
return instanceResponseBlock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.core.operator.streaming;

import java.util.List;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.operator.InstanceResponseOperator;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
Expand Down Expand Up @@ -66,18 +68,22 @@ protected InstanceResponseBlock getNextBlock() {
prefetchAll();
if (_streamingCombineOperator != null) {
_streamingCombineOperator.start();
long totalRowsStreamed = 0;
BaseResultsBlock resultsBlock = getBaseBlock();
while (!(resultsBlock instanceof MetadataResultsBlock)) {
if (resultsBlock instanceof ExceptionResultsBlock) {
return new InstanceResponseBlock(resultsBlock);
}
if (resultsBlock.getNumRows() > 0) {
totalRowsStreamed += resultsBlock.getNumRows();
_streamer.send(resultsBlock);
}
resultsBlock = getBaseBlock();
}
// Return a metadata-only block in the end
return buildInstanceResponseBlock(resultsBlock);
InstanceResponseBlock responseBlock = buildInstanceResponseBlock(resultsBlock);
addLiteModeMetadataIfNeeded(responseBlock, totalRowsStreamed);
return responseBlock;
} else {
// Handle single block combine operator in streaming fashion
BaseResultsBlock resultsBlock = getBaseBlock();
Expand Down Expand Up @@ -115,6 +121,13 @@ protected BaseResultsBlock getCombinedResults() {
return _combineOperator.nextBlock();
}

private void addLiteModeMetadataIfNeeded(InstanceResponseBlock responseBlock, long totalRowsStreamed) {
Integer implicitLimit = QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(_queryContext.getQueryOptions());
if (implicitLimit != null && totalRowsStreamed >= implicitLimit) {
responseBlock.addMetadata(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), "true");
}
}

@Override
public String toExplainString() {
return EXPLAIN_NAME;
Expand Down
Loading
Loading