Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,17 @@ impl OldIndexDataFilter {
.collect(),
}
}

/// Apply this filter in place to a set of existing (old) row ids/addresses,
/// retaining only the rows the filter selects to keep. Used by index types
/// that merge old postings directly (e.g. bitmap) instead of re-scanning a
/// row-id array through [`Self::filter_row_ids`].
pub fn retain_old_rows(&self, rows: &mut RowAddrTreeMap) {
match self {
Self::Fragments { to_keep, .. } => rows.retain_fragments(to_keep.iter()),
Self::RowIds(valid_row_ids) => *rows &= valid_row_ids,
}
}
}

impl UpdateCriteria {
Expand Down
56 changes: 47 additions & 9 deletions rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,13 +812,14 @@ impl ScalarIndex for BitmapIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_old_data_filter: Option<super::OldIndexDataFilter>,
old_data_filter: Option<super::OldIndexDataFilter>,
) -> Result<CreatedIndex> {
let file = BitmapIndexPlugin::streaming_build_and_write(
new_data,
Some(self),
dest_store,
BITMAP_LOOKUP_NAME,
old_data_filter.as_ref(),
)
.await?;

Expand Down Expand Up @@ -1197,6 +1198,19 @@ async fn cleanup_bitmap_shard_files(store: &dyn IndexStore, shard_files: &[Strin
#[derive(Debug, Default)]
pub struct BitmapIndexPlugin;

/// Drop the rows an old posting should no longer expose -- rows whose fragment
/// was removed, or (under stable row ids) rows rewritten by an update -- keeping
/// only those `filter` still considers valid. A no-op when `filter` is `None`.
fn retain_valid(
mut bitmap: RowAddrTreeMap,
filter: Option<&super::OldIndexDataFilter>,
) -> RowAddrTreeMap {
if let Some(filter) = filter {
filter.retain_old_rows(&mut bitmap);
}
bitmap
}

impl BitmapIndexPlugin {
fn get_batch_from_arrays(
keys: Arc<dyn Array>,
Expand Down Expand Up @@ -1328,7 +1342,7 @@ impl BitmapIndexPlugin {
data: SendableRecordBatchStream,
index_store: &dyn IndexStore,
) -> Result<IndexFile> {
Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME).await
Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME, None).await
}

async fn train_bitmap_shard(
Expand All @@ -1343,7 +1357,8 @@ impl BitmapIndexPlugin {
progress
.stage_start("build_bitmap_shard", None, "rows")
.await?;
let file = Self::streaming_build_and_write(data, None, index_store, &file_name).await?;
let file =
Self::streaming_build_and_write(data, None, index_store, &file_name, None).await?;
progress.stage_complete("build_bitmap_shard").await?;
Ok(file)
}
Expand All @@ -1360,6 +1375,7 @@ impl BitmapIndexPlugin {
old_index: Option<&BitmapIndex>,
index_store: &dyn IndexStore,
output_file_name: &str,
old_data_filter: Option<&super::OldIndexDataFilter>,
) -> Result<IndexFile> {
let value_type = data_source.schema().field(0).data_type().clone();

Expand Down Expand Up @@ -1406,6 +1422,7 @@ impl BitmapIndexPlugin {
&mut old_pos,
&mut emitted_null,
&mut writer,
old_data_filter,
)
.await?;
}
Expand All @@ -1428,14 +1445,21 @@ impl BitmapIndexPlugin {
&mut old_pos,
&mut emitted_null,
&mut writer,
old_data_filter,
)
.await?;
}

// Emit any remaining old-only entries.
if let Some(idx) = old_index {
while old_pos < old_keys.len() {
let old_bitmap = idx.load_bitmap(&old_keys[old_pos], None).await?;
let old_bitmap = retain_valid(
idx.load_bitmap(&old_keys[old_pos], None)
.await?
.as_ref()
.clone(),
old_data_filter,
);
writer
.emit(old_keys[old_pos].0.clone(), &old_bitmap)
.await?;
Expand All @@ -1450,7 +1474,8 @@ impl BitmapIndexPlugin {
{
let null_key = new_null_array(&value_type, 1);
let null_key = ScalarValue::try_from_array(null_key.as_ref(), 0)?;
writer.emit(null_key, &idx.null_map).await?;
let null_bitmap = retain_valid((*idx.null_map).clone(), old_data_filter);
writer.emit(null_key, &null_bitmap).await?;
}

writer.finish().await
Expand All @@ -1459,6 +1484,7 @@ impl BitmapIndexPlugin {
/// Flush a completed value-run from the new data stream, emitting any
/// old-only entries that sort before it and merging the old bitmap if the
/// key exists in both old and new.
#[allow(clippy::too_many_arguments)]
async fn finish_run(
key: ScalarValue,
bitmap: &mut RowAddrTreeMap,
Expand All @@ -1467,13 +1493,14 @@ impl BitmapIndexPlugin {
old_pos: &mut usize,
emitted_null: &mut bool,
writer: &mut BitmapBatchWriter,
old_data_filter: Option<&super::OldIndexDataFilter>,
) -> Result<()> {
if key.is_null() {
// Null values are stored separately in the old index's null_map.
if let Some(idx) = old_index
&& !idx.null_map.is_empty()
{
*bitmap |= &*idx.null_map;
*bitmap |= &retain_valid((*idx.null_map).clone(), old_data_filter);
}
*emitted_null = true;
writer.emit(key, bitmap).await?;
Expand All @@ -1482,7 +1509,13 @@ impl BitmapIndexPlugin {

// Emit old-only entries that sort before this key.
while *old_pos < old_keys.len() && old_keys[*old_pos] < orderable {
let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
let old_bitmap = retain_valid(
idx.load_bitmap(&old_keys[*old_pos], None)
.await?
.as_ref()
.clone(),
old_data_filter,
);
writer
.emit(old_keys[*old_pos].0.clone(), &old_bitmap)
.await?;
Expand All @@ -1491,8 +1524,13 @@ impl BitmapIndexPlugin {

// If the old index also has this key, merge its bitmap.
if *old_pos < old_keys.len() && old_keys[*old_pos] == orderable {
let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?;
*bitmap |= &*old_bitmap;
*bitmap |= &retain_valid(
idx.load_bitmap(&old_keys[*old_pos], None)
.await?
.as_ref()
.clone(),
old_data_filter,
);
*old_pos += 1;
}

Expand Down
Loading
Loading