diff --git a/rust/lance/src/dataset/mem_wal/write.rs b/rust/lance/src/dataset/mem_wal/write.rs index 57acaf42ccd..c286ba42cc2 100644 --- a/rust/lance/src/dataset/mem_wal/write.rs +++ b/rust/lance/src/dataset/mem_wal/write.rs @@ -1915,6 +1915,37 @@ impl ShardWriter { } } + /// Abort the writer without flushing. + /// + /// Shuts down the background flush tasks and leaves all buffered + /// memtable state to be dropped with the writer. Unlike + /// [`Self::close`], no WAL/MemTable flush is issued: pending in-memory + /// rows are discarded, not made durable, and no object-store IO is + /// performed. Used on drop-table, where the dataset directory is about + /// to be removed and a flush would only race fresh files back into a + /// doomed path. + /// + /// Caller-quiesce contract: `abort` takes `&self` (so it can be called + /// through the `Arc` callers hold) and therefore cannot + /// structurally bar a concurrent or subsequent `put` the way consuming + /// `close(self)` does. After abort the dispatchers are gone, so a later + /// `put` would buffer data that never flushes. Callers MUST stop + /// issuing writes before calling abort. + /// + /// Blocks until any flush already mid-`handle()` settles — + /// cancellation only fires between messages — so no flush task lingers + /// after abort returns. Idempotent: a second call re-cancels an + /// already-cancelled token and joins an already-emptied task set. + #[instrument(name = "sw_abort", level = "info", skip_all, fields(shard_id = %self.config.shard_id, epoch = self.epoch))] + pub async fn abort(&self) -> Result<()> { + info!( + "Aborting ShardWriter for shard {} (no flush)", + self.config.shard_id + ); + self.task_executor.shutdown_all().await?; + Ok(()) + } + /// Close the writer gracefully. /// /// Flushes pending data and shuts down background tasks. @@ -4220,6 +4251,63 @@ mod tests { writer.close().await.unwrap(); } + /// `abort` tears down the background flush tasks WITHOUT flushing — + /// buffered memtable rows are discarded, not sealed into an L0 + /// generation the way `close` would. Idempotent on a second call. + #[tokio::test] + async fn test_abort_discards_without_flushing_and_is_idempotent() { + let (store, base_path, base_uri, _temp_dir) = create_local_store().await; + let schema = create_test_schema(); + + // Thresholds high enough that nothing auto-flushes; the rows stay + // in the active memtable until abort discards them. + let config = ShardWriterConfig { + shard_id: Uuid::new_v4(), + shard_spec_id: 0, + durable_write: false, + sync_indexed_write: false, + max_wal_buffer_size: 64 * 1024 * 1024, + max_wal_flush_interval: None, + max_memtable_size: 64 * 1024 * 1024, + manifest_scan_batch_size: 2, + ..Default::default() + }; + + let writer = ShardWriter::open(store, base_path, base_uri, config, schema.clone(), vec![]) + .await + .unwrap(); + + writer + .put(vec![create_test_batch(&schema, 0, 10)]) + .await + .unwrap(); + let flushed_before = writer + .manifest() + .await + .unwrap() + .map(|m| m.flushed_generations.len()) + .unwrap_or(0); + + writer.abort().await.unwrap(); + + // No generation was sealed — contrast with `close`, which flushes + // the 10 buffered rows into a new L0 generation. + let flushed_after = writer + .manifest() + .await + .unwrap() + .map(|m| m.flushed_generations.len()) + .unwrap_or(0); + assert_eq!( + flushed_after, flushed_before, + "abort must not flush a new L0 generation" + ); + + // Idempotent: re-cancels the already-cancelled token, joins an + // already-emptied task set. + writer.abort().await.unwrap(); + } + /// On a successful flush commit the sealed generation is dropped from /// the queryable set (no leak), and its rows land in the manifest. #[tokio::test] diff --git a/rust/lance/src/session.rs b/rust/lance/src/session.rs index 8d5e9717570..e6940ebc035 100644 --- a/rust/lance/src/session.rs +++ b/rust/lance/src/session.rs @@ -200,6 +200,29 @@ impl Session { &self.metadata_cache.0 } + /// Invalidate every cached entry belonging to the dataset at `uri`. + /// + /// Clears both Session caches scoped to the dataset: the metadata cache + /// (manifests, deletion vectors, row-id maps, per-file metadata) and the + /// index cache (every vector/scalar/FTS index nested under the URI). Both + /// cache families namespace their keys by the dataset URI via + /// [`LanceCache::with_key_prefix`], which appends a trailing `/`, so the + /// `"{uri}/"` prefix targets exactly this dataset and never bleeds into a + /// sibling whose URI shares this one as a prefix (e.g. `t.lance` vs + /// `t.lance2`). + /// + /// Used when a dataset at a reusable URI is dropped so a later + /// same-URI recreate cold-reads its own fresh state instead of serving + /// the dropped incarnation's cached manifests/indices. The `uri` MUST be + /// byte-identical to the URI the dataset was opened/cached under — the + /// match is a raw prefix comparison, so any normalization drift silently + /// clears nothing. + pub async fn invalidate_dataset(&self, uri: &str) { + let scoped = format!("{uri}/"); + self.metadata_cache.0.invalidate_prefix(&scoped).await; + self.index_cache.0.invalidate_prefix(&scoped).await; + } + /// Fetch statistics for the metadata cache pub async fn metadata_cache_stats(&self) -> lance_core::cache::CacheStats { self.metadata_cache.0.stats().await @@ -339,4 +362,80 @@ mod tests { assert_ne!(index_keys, metadata_keys); } + + #[tokio::test] + async fn test_invalidate_dataset_clears_only_that_uri() { + let session = Session::new(10_000, 10_000, Default::default()); + + // Two datasets whose URIs share a prefix (`t.lance` is a prefix of + // `t.lance2`) — the trailing-slash scoping must clear the first + // without touching the second. + let uri = "memory://db/t.lance"; + let sibling = "memory://db/t.lance2"; + + // Per-dataset caches namespace their keys under `{uri}/`, exactly + // as the read path does via `for_dataset(uri)`. + session + .metadata_cache + .for_dataset(uri) + .insert_with_key(&TestKey("manifest/1"), Arc::new(vec![1])) + .await; + session + .index_cache + .for_dataset(uri) + .insert_with_key(&TestKey("idx/1"), Arc::new(vec![1])) + .await; + session + .metadata_cache + .for_dataset(sibling) + .insert_with_key(&TestKey("manifest/1"), Arc::new(vec![2])) + .await; + session + .index_cache + .for_dataset(sibling) + .insert_with_key(&TestKey("idx/1"), Arc::new(vec![2])) + .await; + + session.invalidate_dataset(uri).await; + + // The dropped dataset's metadata + index entries are gone... + assert!( + session + .metadata_cache + .for_dataset(uri) + .get_with_key(&TestKey("manifest/1")) + .await + .is_none(), + "metadata entry for the invalidated dataset must be cleared" + ); + assert!( + session + .index_cache + .for_dataset(uri) + .get_with_key(&TestKey("idx/1")) + .await + .is_none(), + "index entry for the invalidated dataset must be cleared" + ); + + // ...while the prefix-sharing sibling is untouched. + assert!( + session + .metadata_cache + .for_dataset(sibling) + .get_with_key(&TestKey("manifest/1")) + .await + .is_some(), + "sibling dataset whose URI shares a prefix must NOT be cleared" + ); + assert!( + session + .index_cache + .for_dataset(sibling) + .get_with_key(&TestKey("idx/1")) + .await + .is_some(), + "sibling index entry must NOT be cleared" + ); + } }