From 91cbb82afabbd0bc671d270d2da303d41378066d Mon Sep 17 00:00:00 2001 From: gstamatakis95 <126914070+gstamatakis95@users.noreply.github.com> Date: Tue, 2 Jun 2026 21:44:57 +0200 Subject: [PATCH 1/3] feat(scalar-index): push scan limit into index search for early termination --- rust/lance-index/src/scalar.rs | 21 +++ rust/lance-index/src/scalar/btree.rs | 144 ++++++++++++++++++-- rust/lance-index/src/scalar/expression.rs | 34 ++++- rust/lance/src/dataset/scanner.rs | 154 ++++++++++++++++++++-- rust/lance/src/index/scalar_logical.rs | 18 +++ rust/lance/src/io/exec/scalar_index.rs | 43 +++++- 6 files changed, 382 insertions(+), 32 deletions(-) diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 5ab138ff481..3100a6892d8 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -983,6 +983,27 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf { metrics: &dyn MetricsCollector, ) -> Result; + /// Like [`Self::search`] but with a best-effort `limit` hint: when `limit` is `Some(n)` + /// an index may stop after finding `n` matches (it may still return more). Only push a + /// limit for a single positive lookup. The default ignores it and calls [`Self::search`]. + /// + /// ``` + /// # use lance_core::Result; + /// # use lance_index::{metrics::NoOpMetricsCollector, scalar::{AnyQuery, ScalarIndex}}; + /// # async fn example(index: &dyn ScalarIndex, query: &dyn AnyQuery) -> Result<()> { + /// let _result = index.search_limited(query, &NoOpMetricsCollector, Some(10)).await?; + /// # Ok(()) + /// # } + /// ``` + async fn search_limited( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + _limit: Option, + ) -> Result { + self.search(query, metrics).await + } + /// Returns true if the remap operation is supported fn can_remap(&self) -> bool; diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 6de490b9572..024cd515800 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1696,14 +1696,19 @@ impl Index for BTreeIndex { } } -#[async_trait] -impl ScalarIndex for BTreeIndex { - async fn search( +impl BTreeIndex { + /// Shared implementation for [`ScalarIndex::search`] and + /// [`ScalarIndex::search_limited`]. + /// + /// When `limit` is `Some(n)` the pages are searched in order and the search stops once + /// it has at least `n` matching row ids. The result may hold more than `n` rows but + /// never fewer unless the query matches fewer. + async fn do_search( &self, - query: &dyn AnyQuery, + query: &SargableQuery, metrics: &dyn MetricsCollector, + limit: Option, ) -> Result { - let query = query.as_any().downcast_ref::().unwrap(); let mut pages = match query { SargableQuery::Equals(val) => self .page_lookup @@ -1764,7 +1769,10 @@ impl ScalarIndex for BTreeIndex { // We add them as Matches::Some (not Matches::All) so that // FlatIndex::search() evaluates the predicate and correctly marks // the rows as NULL rather than TRUE. - if !matches!(query, SargableQuery::IsNull()) { + // + // When a `limit` is set the query is a single positive lookup, so null tracking + // is not needed and skipping null pages helps us stop early. + if limit.is_none() && !matches!(query, SargableQuery::IsNull()) { let existing: HashSet = pages.iter().map(|m| m.page_id()).collect(); for &page_id in self .page_lookup @@ -1789,19 +1797,60 @@ impl ScalarIndex for BTreeIndex { .collect::>(); debug!("Searching {} btree pages", page_tasks.len()); - // Collect both matching row IDs and null row IDs from all pages - let results: Vec = stream::iter(page_tasks) - // I/O and compute mixed here but important case is index in cache so - // use compute intensive thread count - .buffered(get_num_compute_intensive_cpus()) - .try_collect() - .await?; + // Collect row IDs from the pages. `buffered` keeps page order. When a `limit` is + // set we read one page at a time and stop once we have enough matches, so we do + // not issue I/O for pages we never need. Without a limit we fan out across CPUs + // (I/O and compute are mixed, but the important case is the index being cached). + let parallelism = if limit.is_some() { + 1 + } else { + get_num_compute_intensive_cpus() + }; + let mut page_stream = stream::iter(page_tasks).buffered(parallelism); + + let mut results: Vec = Vec::new(); + let mut matches_found: u64 = 0; + while let Some(page_result) = page_stream.try_next().await? { + if let Some(limit) = limit { + // Count only TRUE matches. NULL rows never match, so they must not count + // toward the limit. `len()` already excludes nulls. + matches_found += page_result.len().unwrap_or(0); + results.push(page_result); + if matches_found >= limit as u64 { + break; + } + } else { + results.push(page_result); + } + } // Merge matching row IDs let selection = NullableRowAddrSet::union_all(&results); Ok(SearchResult::Exact(selection)) } +} + +#[async_trait] +impl ScalarIndex for BTreeIndex { + async fn search( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + ) -> Result { + let query = query.as_any().downcast_ref::().unwrap(); + self.do_search(query, metrics, None).await + } + + async fn search_limited( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + limit: Option, + ) -> Result { + let query = query.as_any().downcast_ref::().unwrap(); + self.do_search(query, metrics, limit).await + } fn can_remap(&self) -> bool { true @@ -4990,6 +5039,75 @@ mod tests { } } + /// `search_limited` returns at least `limit` matches but stops reading pages early, + /// so for a multi-page range it returns fewer rows than an unlimited search. + #[tokio::test] + async fn test_search_limited_short_circuits() { + use arrow_array::{Int32Array, UInt64Array}; + + let tmpdir = TempObjDir::default(); + let test_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + // Enough rows to span several btree pages, with no nulls so every row matches an + // unbounded range. `train_btree_index` makes pages of `DEFAULT_BTREE_BATCH_SIZE` + // rows, so this gives five pages. + let num_rows = 5 * DEFAULT_BTREE_BATCH_SIZE; + let values: Int32Array = (0..num_rows).map(|i| Some(i as i32)).collect(); + let row_ids = UInt64Array::from_iter_values(0..num_rows); + let data = arrow_array::RecordBatch::try_from_iter(vec![ + ("value", Arc::new(values) as arrow_array::ArrayRef), + ("_rowid", Arc::new(row_ids) as arrow_array::ArrayRef), + ]) + .unwrap(); + let schema = data.schema(); + let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::iter(vec![Ok(data)]), + )); + train_btree_index( + stream, + test_store.as_ref(), + DEFAULT_BTREE_BATCH_SIZE, + None, + None, + ) + .await + .unwrap(); + + let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + let metrics = NoOpMetricsCollector; + let everything = + SargableQuery::Range(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded); + + // Baseline: an unlimited search returns every row. + let full = index.search(&everything, &metrics).await.unwrap(); + let full_len = full.row_addrs().len().unwrap(); + assert_eq!(full_len, num_rows); + + // A limit that reaches into the second page. The search must satisfy it but stop + // well before reading all five pages. + let limit = (DEFAULT_BTREE_BATCH_SIZE + 100) as usize; + let limited = index + .search_limited(&everything, &metrics, Some(limit)) + .await + .unwrap(); + let limited_len = limited.row_addrs().len().unwrap(); + assert!( + limited_len >= limit as u64, + "expected at least {limit} matches, got {limited_len}" + ); + assert!( + limited_len < full_len, + "expected the search to short-circuit, but it returned all {full_len} rows" + ); + } + fn sample_lookup_batch() -> RecordBatch { record_batch!( ("min", Int32, [Some(0), Some(10), Some(20)]), diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 187d5be999f..51e826787ca 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -1309,21 +1309,24 @@ impl ScalarIndexExpr { &self, index_loader: &dyn ScalarIndexLoader, metrics: &dyn MetricsCollector, + limit: Option, ) -> Result { match self { + // A limit only applies to a single positive lookup. NOT, AND, and OR need the + // full result of each side, so the limit is dropped when recursing into them. Self::Not(inner) => { - let result = inner.evaluate_nullable(index_loader, metrics).await?; + let result = inner.evaluate_nullable(index_loader, metrics, None).await?; Ok(!result) } Self::And(lhs, rhs) => { - let lhs_result = lhs.evaluate_nullable(index_loader, metrics); - let rhs_result = rhs.evaluate_nullable(index_loader, metrics); + let lhs_result = lhs.evaluate_nullable(index_loader, metrics, None); + let rhs_result = rhs.evaluate_nullable(index_loader, metrics, None); let (lhs_result, rhs_result) = try_join!(lhs_result, rhs_result)?; Ok(lhs_result & rhs_result) } Self::Or(lhs, rhs) => { - let lhs_result = lhs.evaluate_nullable(index_loader, metrics); - let rhs_result = rhs.evaluate_nullable(index_loader, metrics); + let lhs_result = lhs.evaluate_nullable(index_loader, metrics, None); + let rhs_result = rhs.evaluate_nullable(index_loader, metrics, None); let (lhs_result, rhs_result) = try_join!(lhs_result, rhs_result)?; Ok(lhs_result | rhs_result) } @@ -1331,7 +1334,9 @@ impl ScalarIndexExpr { let index = index_loader .load_index(&search.column, &search.index_name, metrics) .await?; - let search_result = index.search(search.query.as_ref(), metrics).await?; + let search_result = index + .search_limited(search.query.as_ref(), metrics, limit) + .await?; Ok(search_result.into()) } } @@ -1342,9 +1347,24 @@ impl ScalarIndexExpr { &self, index_loader: &dyn ScalarIndexLoader, metrics: &dyn MetricsCollector, + ) -> Result { + self.evaluate_limited(index_loader, metrics, None).await + } + + /// Like [`Self::evaluate`] but pushes a `limit` hint into the index search so it can + /// stop once it has found at least `limit` matches. + /// + /// See [`crate::scalar::ScalarIndex::search_limited`] for the rules on when a limit + /// may be pushed down. + #[instrument(level = "debug", skip_all)] + pub async fn evaluate_limited( + &self, + index_loader: &dyn ScalarIndexLoader, + metrics: &dyn MetricsCollector, + limit: Option, ) -> Result { Ok(self - .evaluate_nullable(index_loader, metrics) + .evaluate_nullable(index_loader, metrics, limit) .await? .drop_nulls()) } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 9a5cd94dd09..f635e420c20 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2821,6 +2821,16 @@ impl Scanner { fragments: Option>>, scan_range: Option>, ) -> Result> { + // Decide whether a limit can be pushed into the index search. The fragments the + // read covers (used for the deletion check) are the requested subset, or the whole + // dataset when none was given. + let all_fragments = self.dataset.fragments(); + let scanned_fragments: &[Fragment] = fragments + .as_ref() + .map(|frags| frags.as_slice()) + .unwrap_or_else(|| all_fragments.as_slice()); + let pushdown_limit = self.index_search_limit(filter_plan, scanned_fragments); + let mut read_options = FilteredReadOptions::basic_full_read(&self.dataset) .with_filter_plan(filter_plan.clone()) .with_projection(projection); @@ -2859,11 +2869,10 @@ impl Scanner { let result_format = self.index_expr_result_format(); let index_input = filter_plan.index_query.clone().map(|index_query| { - Arc::new(ScalarIndexExec::new( - self.dataset.clone(), - index_query, - result_format, - )) as Arc + Arc::new( + ScalarIndexExec::new(self.dataset.clone(), index_query, result_format) + .with_limit(pushdown_limit), + ) as Arc }); Ok(Arc::new(FilteredReadExec::try_new( @@ -4042,6 +4051,55 @@ impl Scanner { Ok((relevant_frags, missing_frags)) } + /// Compute the limit hint that can be safely pushed into a scalar index search. + /// + /// Pushing a limit is only an optimization. A `GlobalLimitExec` still applies the + /// exact limit and offset, so the index only needs to return at least `limit + offset` + /// rows. The first N matches are as good as any N matches only when all of these hold. + /// + /// - There is a positive row limit. + /// - The rows are not reordered before the limit (no `ORDER BY`, vector or FTS search). + /// - There is no aggregate (the limit applies after aggregation). + /// - The index result is used as is, with no refine filter and no recheck. Either of + /// those re-filters rows later and could drop matches. + /// - The relevant fragments have no deletions. Deleted rows are pruned after the index + /// search, so stopping early could leave fewer than `limit` live rows. + /// + /// Returns `None` when no limit can be pushed. + fn index_search_limit( + &self, + filter_plan: &ExprFilterPlan, + relevant_fragments: &[Fragment], + ) -> Option { + let limit = self.limit?; + if limit <= 0 { + return None; + } + if self.ordering.is_some() + || self.nearest.is_some() + || self.full_text_query.is_some() + || self.aggregate.is_some() + || filter_plan.has_refine() + { + return None; + } + if filter_plan + .index_query + .as_ref() + .is_some_and(|query| query.needs_recheck()) + { + return None; + } + if relevant_fragments + .iter() + .any(|fragment| fragment.deletion_file.is_some()) + { + return None; + } + let offset = self.offset.unwrap_or(0).max(0) as usize; + Some((limit as usize).saturating_add(offset)) + } + // First perform a lookup in a scalar index for ids and then perform a take on the // target fragments with those ids async fn scalar_indexed_scan( @@ -4066,11 +4124,18 @@ impl Scanner { .partition_frags_by_coverage(index_expr, fragments) .await?; - let mut plan: Arc = Arc::new(MaterializeIndexExec::new( - self.dataset.clone(), - index_expr.clone(), - Arc::new(relevant_frags), - )); + // A limit can be pushed into the index search, but only when its rows are used as + // is and the relevant fragments have no deletions. + let pushdown_limit = self.index_search_limit(filter_plan, &relevant_frags); + + let mut plan: Arc = Arc::new( + MaterializeIndexExec::new( + self.dataset.clone(), + index_expr.clone(), + Arc::new(relevant_frags), + ) + .with_limit(pushdown_limit), + ); let refine_expr = filter_plan.refine_expr.as_ref(); @@ -5768,6 +5833,75 @@ mod test { assert_eq!(ids, &(10..20).collect::>()); } + #[tokio::test] + async fn test_limit_pushed_into_scalar_index() { + // When a scan filter is fully served by a scalar index (no refine, no recheck, no + // ordering) the limit can be pushed into the index search. The result must still + // be exactly `limit` rows that all match the filter. Early stop must not drop or + // duplicate matches. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + // Span several btree pages so a small limit short-circuits before the end. + let num_rows = 20_000; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..num_rows))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap(); + dataset + .create_index( + &["id"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let limit = 100; + let scan_ids = |dataset: Arc| async move { + let batch = dataset + .scan() + .filter("id >= 5") + .unwrap() + .limit(Some(limit), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + batch + .column_by_name("id") + .unwrap() + .as_primitive::() + .values() + .to_vec() + }; + + let ids = scan_ids(Arc::new(dataset.clone())).await; + assert_eq!(ids.len(), limit as usize); + assert!( + ids.iter().all(|&id| id >= 5), + "every returned row must satisfy the filter" + ); + + // With deletions present the limit must not be pushed, since deleted rows are + // pruned after the index search. The scan must still return exactly `limit` live + // matches. + dataset.delete("id >= 5 AND id < 10000").await.unwrap(); + let ids = scan_ids(Arc::new(dataset)).await; + assert_eq!(ids.len(), limit as usize); + assert!( + ids.iter().all(|&id| id >= 10000), + "deleted rows must not be returned" + ); + } + #[test_log::test(tokio::test)] async fn test_limit_cancel() { // If there is a filter and a limit and we can't use the index to satisfy diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index 162a36a0c97..8e49dd03af6 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -132,6 +132,24 @@ impl ScalarIndex for LogicalScalarIndex { combine_search_results(results) } + async fn search_limited( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + limit: Option, + ) -> Result { + // Forwarding the limit to every segment is safe. Each segment returns at least + // `limit` matches when it has them, so the combined result still has at least + // `limit` matches overall. + let results = try_join_all( + self.segments + .iter() + .map(|segment| segment.search_limited(query, metrics, limit)), + ) + .await?; + combine_search_results(results) + } + fn can_remap(&self) -> bool { false } diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index ade4995fb4b..0eb35cc9eea 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -76,6 +76,12 @@ pub struct ScalarIndexExec { properties: Arc, metrics: ExecutionPlanMetricsSet, result_format: IndexExprResultWireFormat, + /// Hint passed to the index search so it can stop once it has found this many + /// matches. `None` means search all matches. + /// + /// This is only an optimization. A downstream `GlobalLimitExec` still applies the + /// exact limit, so the index only needs to return at least this many rows. + limit: Option, } impl DisplayAs for ScalarIndexExec { @@ -109,9 +115,20 @@ impl ScalarIndexExec { properties, metrics: ExecutionPlanMetricsSet::new(), result_format, + limit: None, } } + /// Push a `limit` hint into the index search so it can stop early. + /// + /// Only set this when returning any `limit` matching rows is safe, such as an + /// unordered scan whose results are not filtered further. Correctness still relies on + /// a downstream limit operator. + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + pub fn dataset(&self) -> &Arc { &self.dataset } @@ -161,12 +178,14 @@ impl ScalarIndexExec { dataset: Arc, plan_metrics: ExecutionPlanMetricsSet, result_format: IndexExprResultWireFormat, + limit: Option, ) -> Result { let metrics = IndexMetrics::new(&plan_metrics, 0); let query_result = { let search_time = plan_metrics.new_time(SCALAR_INDEX_SEARCH_TIME_METRIC, 0); let _timer = search_time.timer(); - expr.evaluate(dataset.as_ref(), &metrics).await? + expr.evaluate_limited(dataset.as_ref(), &metrics, limit) + .await? }; let fragments_covered_by_result = Self::fragments_covered_by_index_query(&expr, dataset.as_ref()).await?; @@ -218,6 +237,7 @@ impl ExecutionPlan for ScalarIndexExec { self.dataset.clone(), self.metrics.clone(), self.result_format, + self.limit, ); let stream = futures::stream::iter(vec![batch_fut]) .then(|batch_fut| batch_fut.map_err(|err| err.into())) @@ -485,6 +505,12 @@ pub struct MaterializeIndexExec { fragments: Arc>, properties: Arc, metrics: ExecutionPlanMetricsSet, + /// Hint passed to the index search so it can stop once it has found this many + /// matches. `None` means materialize all matches. + /// + /// This is only an optimization. A downstream `GlobalLimitExec` still applies the + /// exact limit, so the index only needs to return at least this many rows. + limit: Option, } impl DisplayAs for MaterializeIndexExec { @@ -557,17 +583,29 @@ impl MaterializeIndexExec { fragments, properties, metrics: ExecutionPlanMetricsSet::new(), + limit: None, } } + /// Push a `limit` hint into the index search so it can stop early. + /// + /// Only set this when returning any `limit` matching rows is safe, such as an + /// unordered scan whose results are not filtered further. Correctness still relies on + /// a downstream limit operator. + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + #[instrument(name = "materialize_scalar_index", skip_all, level = "debug")] async fn do_execute( expr: ScalarIndexExpr, dataset: Arc, fragments: Arc>, metrics: Arc, + limit: Option, ) -> Result { - let expr_result = expr.evaluate(dataset.as_ref(), metrics.as_ref()); + let expr_result = expr.evaluate_limited(dataset.as_ref(), metrics.as_ref(), limit); let span = debug_span!("create_prefilter"); let prefilter = span.in_scope(|| { let fragment_bitmap = @@ -734,6 +772,7 @@ impl ExecutionPlan for MaterializeIndexExec { self.dataset.clone(), self.fragments.clone(), metrics, + self.limit, ); let stream = futures::stream::iter(vec![batch_fut]) .then(|batch_fut| batch_fut.map_err(|err| err.into())) From 8de2c7b020d880fc021c940d2f4aa234720afbc7 Mon Sep 17 00:00:00 2001 From: gstamatakis95 <126914070+gstamatakis95@users.noreply.github.com> Date: Sat, 20 Jun 2026 14:14:35 +0200 Subject: [PATCH 2/3] fix(scanner): only push scalar-index limit for unordered scans --- rust/lance-index/src/scalar/btree.rs | 19 +++----- rust/lance-index/src/scalar/expression.rs | 3 +- rust/lance/src/dataset/scanner.rs | 55 +++++++++++++---------- rust/lance/src/index/scalar_logical.rs | 4 +- 4 files changed, 39 insertions(+), 42 deletions(-) diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 45e691da3fb..19141ad4033 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1802,9 +1802,7 @@ impl BTreeIndex { // We add them as Matches::Some (not Matches::All) so that // FlatIndex::search() evaluates the predicate and correctly marks // the rows as NULL rather than TRUE. - // - // When a `limit` is set the query is a single positive lookup, so null tracking - // is not needed and skipping null pages helps us stop early. + // A `limit` implies a single positive lookup, so skip null tracking to stop early. if limit.is_none() && !matches!(query, SargableQuery::IsNull()) { let existing: HashSet = pages.iter().map(|m| m.page_id()).collect(); for &page_id in self @@ -1830,10 +1828,7 @@ impl BTreeIndex { .collect::>(); debug!("Searching {} btree pages", page_tasks.len()); - // Collect row IDs from the pages. `buffered` keeps page order. When a `limit` is - // set we read one page at a time and stop once we have enough matches, so we do - // not issue I/O for pages we never need. Without a limit we fan out across CPUs - // (I/O and compute are mixed, but the important case is the index being cached). + // With a `limit`, read one page at a time and stop once we have enough; otherwise fan out across CPUs. let parallelism = if limit.is_some() { 1 } else { @@ -1845,8 +1840,7 @@ impl BTreeIndex { let mut matches_found: u64 = 0; while let Some(page_result) = page_stream.try_next().await? { if let Some(limit) = limit { - // Count only TRUE matches. NULL rows never match, so they must not count - // toward the limit. `len()` already excludes nulls. + // Count only TRUE matches toward the limit; `len()` already excludes nulls. matches_found += page_result.len().unwrap_or(0); results.push(page_result); if matches_found >= limit as u64 { @@ -5025,9 +5019,7 @@ mod tests { Arc::new(LanceCache::no_cache()), )); - // Enough rows to span several btree pages, with no nulls so every row matches an - // unbounded range. `train_btree_index` makes pages of `DEFAULT_BTREE_BATCH_SIZE` - // rows, so this gives five pages. + // Five btree pages of `DEFAULT_BTREE_BATCH_SIZE` rows, with no nulls so every row matches. let num_rows = 5 * DEFAULT_BTREE_BATCH_SIZE; let values: Int32Array = (0..num_rows).map(|i| Some(i as i32)).collect(); let row_ids = UInt64Array::from_iter_values(0..num_rows); @@ -5063,8 +5055,7 @@ mod tests { let full_len = full.row_addrs().len().unwrap(); assert_eq!(full_len, num_rows); - // A limit that reaches into the second page. The search must satisfy it but stop - // well before reading all five pages. + // A limit reaching into the second page: satisfied but stops before reading all five. let limit = (DEFAULT_BTREE_BATCH_SIZE + 100) as usize; let limited = index .search_limited(&everything, &metrics, Some(limit)) diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index d5844c948bd..d4e05ffa058 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -1327,8 +1327,7 @@ impl ScalarIndexExpr { limit: Option, ) -> Result { match self { - // A limit only applies to a single positive lookup. NOT, AND, and OR need the - // full result of each side, so the limit is dropped when recursing into them. + // A limit applies only to a single positive lookup, so drop it for NOT/AND/OR. Self::Not(inner) => { let result = inner.evaluate_nullable(index_loader, metrics, None).await?; Ok(!result) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index c8c6832ae38..192ff4b8d17 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -4084,6 +4084,11 @@ impl Scanner { /// rows. The first N matches are as good as any N matches only when all of these hold. /// /// - There is a positive row limit. + /// - The scan is unordered (`scan_in_order(false)`). In the default ordered mode the + /// scan returns the first matches in storage (row address) order, but a B-tree + /// stops after collecting matches in index-value page order. Those are different + /// subsets whenever storage order and index order disagree, so pushing the limit + /// would silently change which rows `LIMIT`/`OFFSET` returns. /// - The rows are not reordered before the limit (no `ORDER BY`, vector or FTS search). /// - There is no aggregate (the limit applies after aggregation). /// - The index result is used as is, with no refine filter and no recheck. Either of @@ -4101,7 +4106,9 @@ impl Scanner { if limit <= 0 { return None; } - if self.ordering.is_some() + // Ordered scans return storage-order matches, while a B-tree stops in index-value order. + if self.ordered + || self.ordering.is_some() || self.nearest.is_some() || self.full_text_query.is_some() || self.aggregate.is_some() @@ -4150,8 +4157,7 @@ impl Scanner { .partition_frags_by_coverage(index_expr, fragments) .await?; - // A limit can be pushed into the index search, but only when its rows are used as - // is and the relevant fragments have no deletions. + // A limit can be pushed into the index search only when safe; see index_search_limit. let pushdown_limit = self.index_search_limit(filter_plan, &relevant_frags); let mut plan: Arc = Arc::new( @@ -5882,20 +5888,17 @@ mod test { #[tokio::test] async fn test_limit_pushed_into_scalar_index() { - // When a scan filter is fully served by a scalar index (no refine, no recheck, no - // ordering) the limit can be pushed into the index search. The result must still - // be exactly `limit` rows that all match the filter. Early stop must not drop or - // duplicate matches. + // A scalar-index limit can be pushed only for an unordered scan, since the B-tree stops in index-value order while an ordered scan returns storage-order matches. let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "id", DataType::Int32, false, )])); - // Span several btree pages so a small limit short-circuits before the end. - let num_rows = 20_000; + // Span several btree pages, with ids in descending order so storage order is the reverse of index-value order. + let num_rows = 20_000i32; let batch = RecordBatch::try_new( schema.clone(), - vec![Arc::new(Int32Array::from_iter_values(0..num_rows))], + vec![Arc::new(Int32Array::from_iter_values((0..num_rows).rev()))], ) .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); @@ -5911,17 +5914,15 @@ mod test { .await .unwrap(); - let limit = 100; - let scan_ids = |dataset: Arc| async move { - let batch = dataset - .scan() - .filter("id >= 5") + let limit = 100i64; + let scan_ids = |dataset: Arc, ordered: bool| async move { + let mut scan = dataset.scan(); + scan.filter("id >= 5") .unwrap() + .scan_in_order(ordered) .limit(Some(limit), None) - .unwrap() - .try_into_batch() - .await .unwrap(); + let batch = scan.try_into_batch().await.unwrap(); batch .column_by_name("id") .unwrap() @@ -5930,18 +5931,26 @@ mod test { .to_vec() }; - let ids = scan_ids(Arc::new(dataset.clone())).await; + // Ordered scan (the default): limit not pushed, so the first matches are the largest ids (descending storage). + let ids = scan_ids(Arc::new(dataset.clone()), true).await; + assert_eq!(ids.len(), limit as usize); + assert!( + ids.iter().all(|&id| id >= num_rows - limit as i32), + "ordered scan must return the storage-order subset, got {:?}", + &ids[..ids.len().min(5)] + ); + + // Unordered scan: limit pushed into the index, but still exactly `limit` matching rows. + let ids = scan_ids(Arc::new(dataset.clone()), false).await; assert_eq!(ids.len(), limit as usize); assert!( ids.iter().all(|&id| id >= 5), "every returned row must satisfy the filter" ); - // With deletions present the limit must not be pushed, since deleted rows are - // pruned after the index search. The scan must still return exactly `limit` live - // matches. + // With deletions the limit must not be pushed even when unordered, since deleted rows are pruned after the index search. dataset.delete("id >= 5 AND id < 10000").await.unwrap(); - let ids = scan_ids(Arc::new(dataset)).await; + let ids = scan_ids(Arc::new(dataset), false).await; assert_eq!(ids.len(), limit as usize); assert!( ids.iter().all(|&id| id >= 10000), diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index 9d63e6b4368..dfbca7a0e5b 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -138,9 +138,7 @@ impl ScalarIndex for LogicalScalarIndex { metrics: &dyn MetricsCollector, limit: Option, ) -> Result { - // Forwarding the limit to every segment is safe. Each segment returns at least - // `limit` matches when it has them, so the combined result still has at least - // `limit` matches overall. + // Forwarding the limit to every segment is safe: the combined result still has at least `limit` matches. let results = try_join_all( self.segments .iter() From d6d9377a23cbf16345b5f821c246da502af12338 Mon Sep 17 00:00:00 2001 From: gstamatakis95 <126914070+gstamatakis95@users.noreply.github.com> Date: Sat, 20 Jun 2026 15:46:31 +0200 Subject: [PATCH 3/3] fix(scanner): don't push scalar-index limit for fragment-subset scans --- rust/lance-index/src/scalar.rs | 7 +- rust/lance-index/src/scalar/btree.rs | 3 + rust/lance/src/dataset/scanner.rs | 118 +++++++++++++++++++++++-- rust/lance/src/index/scalar_logical.rs | 61 +++++++++++++ 4 files changed, 180 insertions(+), 9 deletions(-) diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 021a93e9ea8..aceffdce17f 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -1047,8 +1047,11 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf { ) -> Result; /// Like [`Self::search`] but with a best-effort `limit` hint: when `limit` is `Some(n)` - /// an index may stop after finding `n` matches (it may still return more). Only push a - /// limit for a single positive lookup. The default ignores it and calls [`Self::search`]. + /// an index may stop after finding `n` matching rows (it may still return more). The hint + /// applies to positive lookups that keep matches as-is (equality, range, `IsIn`); negating + /// or combining operators ignore it. The caller must also discard null rows, since an index + /// may skip null tracking when a limit is set. The default ignores the hint and calls + /// [`Self::search`]. /// /// ``` /// # use lance_core::Result; diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index fc8470b9623..bd24a5e55e3 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -2168,6 +2168,9 @@ impl BTreeIndex { // could refine that classification (see #6802). // // A `limit` implies a single positive lookup, so skip null tracking to stop early. + // Correctness then relies on the caller discarding nulls: every `search_limited` + // path goes through `evaluate_limited` -> `drop_nulls`, so the untracked null rows + // are dropped anyway. A future caller that keeps nulls must not pass a limit here. if limit.is_none() && !matches!(query, SargableQuery::IsNull()) { let existing: HashSet = pages.iter().map(|m| m.page_id()).collect(); for &page_id in self diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 678e194eec4..111dba9b467 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -4107,6 +4107,10 @@ impl Scanner { /// those re-filters rows later and could drop matches. /// - The relevant fragments have no deletions. Deleted rows are pruned after the index /// search, so stopping early could leave fewer than `limit` live rows. + /// - The scan is not restricted to a fragment subset (`with_fragments`). The index search + /// runs over the whole dataset, and the fragment restriction is applied afterwards, so + /// an early stop could return `limit` matches that all fall outside the subset and leave + /// fewer than `limit` rows once it is applied. /// /// Returns `None` when no limit can be pushed. fn index_search_limit( @@ -4119,7 +4123,10 @@ impl Scanner { return None; } // Ordered scans return storage-order matches, while a B-tree stops in index-value order. + // A fragment subset is restricted only after the (global) index search, so an early stop + // could leave fewer than `limit` rows once the restriction is applied. if self.ordered + || self.fragments.is_some() || self.ordering.is_some() || self.nearest.is_some() || self.full_text_query.is_some() @@ -5898,8 +5905,14 @@ mod test { assert_eq!(ids, &(10..20).collect::>()); } + #[rstest] #[tokio::test] - async fn test_limit_pushed_into_scalar_index() { + async fn test_limit_pushed_into_scalar_index( + // Legacy storage routes through `scalar_indexed_scan` (MaterializeIndexExec), Stable + // through `new_filtered_read` (ScalarIndexExec); both push the limit via the same gate. + #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] + data_storage_version: LanceFileVersion, + ) { // A scalar-index limit can be pushed only for an unordered scan, since the B-tree stops in index-value order while an ordered scan returns storage-order matches. let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "id", @@ -5914,7 +5927,13 @@ mod test { ) .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap(); + let write_params = WriteParams { + data_storage_version: Some(data_storage_version), + ..Default::default() + }; + let mut dataset = Dataset::write(reader, "memory://", Some(write_params)) + .await + .unwrap(); dataset .create_index( &["id"], @@ -5927,12 +5946,12 @@ mod test { .unwrap(); let limit = 100i64; - let scan_ids = |dataset: Arc, ordered: bool| async move { + let scan_ids = |dataset: Arc, ordered: bool, offset: Option| async move { let mut scan = dataset.scan(); scan.filter("id >= 5") .unwrap() .scan_in_order(ordered) - .limit(Some(limit), None) + .limit(Some(limit), offset) .unwrap(); let batch = scan.try_into_batch().await.unwrap(); batch @@ -5944,7 +5963,7 @@ mod test { }; // Ordered scan (the default): limit not pushed, so the first matches are the largest ids (descending storage). - let ids = scan_ids(Arc::new(dataset.clone()), true).await; + let ids = scan_ids(Arc::new(dataset.clone()), true, None).await; assert_eq!(ids.len(), limit as usize); assert!( ids.iter().all(|&id| id >= num_rows - limit as i32), @@ -5953,16 +5972,26 @@ mod test { ); // Unordered scan: limit pushed into the index, but still exactly `limit` matching rows. - let ids = scan_ids(Arc::new(dataset.clone()), false).await; + let ids = scan_ids(Arc::new(dataset.clone()), false, None).await; assert_eq!(ids.len(), limit as usize); assert!( ids.iter().all(|&id| id >= 5), "every returned row must satisfy the filter" ); + // With an offset the pushed limit must cover `limit + offset` rows, otherwise the + // downstream skip would leave fewer than `limit`. This guards the `saturating_add(offset)`. + let ids = scan_ids(Arc::new(dataset.clone()), false, Some(50)).await; + assert_eq!( + ids.len(), + limit as usize, + "offset must not reduce the returned row count" + ); + assert!(ids.iter().all(|&id| id >= 5)); + // With deletions the limit must not be pushed even when unordered, since deleted rows are pruned after the index search. dataset.delete("id >= 5 AND id < 10000").await.unwrap(); - let ids = scan_ids(Arc::new(dataset), false).await; + let ids = scan_ids(Arc::new(dataset), false, None).await; assert_eq!(ids.len(), limit as usize); assert!( ids.iter().all(|&id| id >= 10000), @@ -5970,6 +5999,81 @@ mod test { ); } + #[rstest] + #[tokio::test] + async fn test_limit_not_pushed_with_fragment_subset( + #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] + data_storage_version: LanceFileVersion, + ) { + // The scalar-index search runs over the whole dataset; a `with_fragments` subset is + // applied only afterwards. If the limit were pushed, an unordered scan restricted to a + // fragment whose ids sort *last* would early-stop on matches in the other fragment and + // return too few (here zero) rows. The limit must therefore not be pushed for a subset. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + // Two fragments with ascending ids (index order == storage order): frag 0 holds the + // smallest ids, so the first matches in index order all live outside the second fragment. + let num_rows = 20_000i32; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..num_rows))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + data_storage_version: Some(data_storage_version), + max_rows_per_file: 10_000, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, "memory://", Some(write_params)) + .await + .unwrap(); + dataset + .create_index( + &["id"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let fragments = dataset.fragments().as_ref().clone(); + assert_eq!(fragments.len(), 2, "expected two fragments"); + // Restrict to the second fragment (the largest ids). + let second_fragment = fragments[1].clone(); + + let limit = 100i64; + let mut scan = dataset.scan(); + scan.with_fragments(vec![second_fragment]) + .filter("id >= 0") + .unwrap() + .scan_in_order(false) + .limit(Some(limit), None) + .unwrap(); + let batch = scan.try_into_batch().await.unwrap(); + let ids = batch + .column_by_name("id") + .unwrap() + .as_primitive::() + .values() + .to_vec(); + + assert_eq!( + ids.len(), + limit as usize, + "a fragment-subset scan must still return `limit` rows" + ); + assert!( + ids.iter().all(|&id| id >= 10_000), + "must only return rows from the requested fragment" + ); + } + #[test_log::test(tokio::test)] async fn test_limit_cancel() { // If there is a filter and a limit and we can't use the index to satisfy diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index eda4b8ca117..4e1c8edea88 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -538,6 +538,67 @@ mod tests { ); } + #[tokio::test] + async fn test_btree_segment_search_limited_across_segments() { + // `search_limited` forwards the limit to every segment and combines the results, so the + // combined result must still hold at least `limit` matches across the segments. + let test_dir = TempStrDir::default(); + let dataset = lance_datagen::gen_batch() + .col("value", array::step::()) + .into_dataset( + test_dir.as_str(), + FragmentCount::from(4), + FragmentRowCount::from(16), + ) + .await + .unwrap(); + let mut dataset = dataset; + let fragments = dataset.get_fragments(); + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree); + let mut segments = Vec::new(); + for fragment in &fragments { + segments.push( + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::BTree, ¶ms) + .name("value_btree_limited".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(), + ); + } + dataset + .commit_existing_index_segments("value_btree_limited", "value", segments) + .await + .unwrap(); + + let logical = open_named_scalar_index( + &dataset, + "value", + "value_btree_limited", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + + // All 64 rows match the unbounded range; with a limit the combined result across the + // four segments must still satisfy at least `limit` matches. + let query = SargableQuery::Range(Bound::Unbounded, Bound::Unbounded); + let limit = 10usize; + let result = logical + .search_limited(&query, &NoOpMetricsCollector, Some(limit)) + .await + .unwrap(); + let row_addrs = match result { + SearchResult::Exact(row_addrs) | SearchResult::AtLeast(row_addrs) => row_addrs, + other => panic!("unexpected result variant from limited search: {:?}", other), + }; + let count = row_addrs.true_rows().row_addrs().unwrap().count(); + assert!( + count >= limit, + "limited search must return at least {limit} matches, got {count}" + ); + } + #[tokio::test] async fn test_bitmap_segments_commit_and_query_as_logical_index() { let test_dir = TempStrDir::default();