Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions rust/lance/src/dataset/mem_wal/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1915,6 +1915,37 @@ impl ShardWriter {
}
}

/// Abort the writer without flushing.
///
/// Shuts down the background flush tasks and leaves all buffered
/// memtable state to be dropped with the writer. Unlike
/// [`Self::close`], no WAL/MemTable flush is issued: pending in-memory
/// rows are discarded, not made durable, and no object-store IO is
/// performed. Used on drop-table, where the dataset directory is about
/// to be removed and a flush would only race fresh files back into a
/// doomed path.
///
/// Caller-quiesce contract: `abort` takes `&self` (so it can be called
/// through the `Arc<ShardWriter>` callers hold) and therefore cannot
/// structurally bar a concurrent or subsequent `put` the way consuming
/// `close(self)` does. After abort the dispatchers are gone, so a later
/// `put` would buffer data that never flushes. Callers MUST stop
/// issuing writes before calling abort.
///
/// Blocks until any flush already mid-`handle()` settles —
/// cancellation only fires between messages — so no flush task lingers
/// after abort returns. Idempotent: a second call re-cancels an
/// already-cancelled token and joins an already-emptied task set.
#[instrument(name = "sw_abort", level = "info", skip_all, fields(shard_id = %self.config.shard_id, epoch = self.epoch))]
pub async fn abort(&self) -> Result<()> {
info!(
"Aborting ShardWriter for shard {} (no flush)",
self.config.shard_id
);
self.task_executor.shutdown_all().await?;
Ok(())
}

/// Close the writer gracefully.
///
/// Flushes pending data and shuts down background tasks.
Expand Down Expand Up @@ -4220,6 +4251,63 @@ mod tests {
writer.close().await.unwrap();
}

/// `abort` tears down the background flush tasks WITHOUT flushing —
/// buffered memtable rows are discarded, not sealed into an L0
/// generation the way `close` would. Idempotent on a second call.
#[tokio::test]
async fn test_abort_discards_without_flushing_and_is_idempotent() {
let (store, base_path, base_uri, _temp_dir) = create_local_store().await;
let schema = create_test_schema();

// Thresholds high enough that nothing auto-flushes; the rows stay
// in the active memtable until abort discards them.
let config = ShardWriterConfig {
shard_id: Uuid::new_v4(),
shard_spec_id: 0,
durable_write: false,
sync_indexed_write: false,
max_wal_buffer_size: 64 * 1024 * 1024,
max_wal_flush_interval: None,
max_memtable_size: 64 * 1024 * 1024,
manifest_scan_batch_size: 2,
..Default::default()
};

let writer = ShardWriter::open(store, base_path, base_uri, config, schema.clone(), vec![])
.await
.unwrap();

writer
.put(vec![create_test_batch(&schema, 0, 10)])
.await
.unwrap();
let flushed_before = writer
.manifest()
.await
.unwrap()
.map(|m| m.flushed_generations.len())
.unwrap_or(0);

writer.abort().await.unwrap();

// No generation was sealed — contrast with `close`, which flushes
// the 10 buffered rows into a new L0 generation.
let flushed_after = writer
.manifest()
.await
.unwrap()
.map(|m| m.flushed_generations.len())
.unwrap_or(0);
assert_eq!(
flushed_after, flushed_before,
"abort must not flush a new L0 generation"
);

// Idempotent: re-cancels the already-cancelled token, joins an
// already-emptied task set.
writer.abort().await.unwrap();
}

/// On a successful flush commit the sealed generation is dropped from
/// the queryable set (no leak), and its rows land in the manifest.
#[tokio::test]
Expand Down
99 changes: 99 additions & 0 deletions rust/lance/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,29 @@ impl Session {
&self.metadata_cache.0
}

/// Invalidate every cached entry belonging to the dataset at `uri`.
///
/// Clears both Session caches scoped to the dataset: the metadata cache
/// (manifests, deletion vectors, row-id maps, per-file metadata) and the
/// index cache (every vector/scalar/FTS index nested under the URI). Both
/// cache families namespace their keys by the dataset URI via
/// [`LanceCache::with_key_prefix`], which appends a trailing `/`, so the
/// `"{uri}/"` prefix targets exactly this dataset and never bleeds into a
/// sibling whose URI shares this one as a prefix (e.g. `t.lance` vs
/// `t.lance2`).
///
/// Used when a dataset at a reusable URI is dropped so a later
/// same-URI recreate cold-reads its own fresh state instead of serving
/// the dropped incarnation's cached manifests/indices. The `uri` MUST be
/// byte-identical to the URI the dataset was opened/cached under — the
/// match is a raw prefix comparison, so any normalization drift silently
/// clears nothing.
pub async fn invalidate_dataset(&self, uri: &str) {
let scoped = format!("{uri}/");
self.metadata_cache.0.invalidate_prefix(&scoped).await;
self.index_cache.0.invalidate_prefix(&scoped).await;
}

/// Fetch statistics for the metadata cache
pub async fn metadata_cache_stats(&self) -> lance_core::cache::CacheStats {
self.metadata_cache.0.stats().await
Expand Down Expand Up @@ -339,4 +362,80 @@ mod tests {

assert_ne!(index_keys, metadata_keys);
}

#[tokio::test]
async fn test_invalidate_dataset_clears_only_that_uri() {
let session = Session::new(10_000, 10_000, Default::default());

// Two datasets whose URIs share a prefix (`t.lance` is a prefix of
// `t.lance2`) — the trailing-slash scoping must clear the first
// without touching the second.
let uri = "memory://db/t.lance";
let sibling = "memory://db/t.lance2";

// Per-dataset caches namespace their keys under `{uri}/`, exactly
// as the read path does via `for_dataset(uri)`.
session
.metadata_cache
.for_dataset(uri)
.insert_with_key(&TestKey("manifest/1"), Arc::new(vec![1]))
.await;
session
.index_cache
.for_dataset(uri)
.insert_with_key(&TestKey("idx/1"), Arc::new(vec![1]))
.await;
session
.metadata_cache
.for_dataset(sibling)
.insert_with_key(&TestKey("manifest/1"), Arc::new(vec![2]))
.await;
session
.index_cache
.for_dataset(sibling)
.insert_with_key(&TestKey("idx/1"), Arc::new(vec![2]))
.await;

session.invalidate_dataset(uri).await;

// The dropped dataset's metadata + index entries are gone...
assert!(
session
.metadata_cache
.for_dataset(uri)
.get_with_key(&TestKey("manifest/1"))
.await
.is_none(),
"metadata entry for the invalidated dataset must be cleared"
);
assert!(
session
.index_cache
.for_dataset(uri)
.get_with_key(&TestKey("idx/1"))
.await
.is_none(),
"index entry for the invalidated dataset must be cleared"
);

// ...while the prefix-sharing sibling is untouched.
assert!(
session
.metadata_cache
.for_dataset(sibling)
.get_with_key(&TestKey("manifest/1"))
.await
.is_some(),
"sibling dataset whose URI shares a prefix must NOT be cleared"
);
assert!(
session
.index_cache
.for_dataset(sibling)
.get_with_key(&TestKey("idx/1"))
.await
.is_some(),
"sibling index entry must NOT be cleared"
);
}
}
Loading