diff --git a/rust/lance/src/dataset/mem_wal.rs b/rust/lance/src/dataset/mem_wal.rs index 5f3bc2ed483..7eaa8ffb83f 100644 --- a/rust/lance/src/dataset/mem_wal.rs +++ b/rust/lance/src/dataset/mem_wal.rs @@ -51,6 +51,7 @@ pub use sharding::{ evaluate_sharding_spec, evaluate_sharding_spec_with_embedded_columns, evaluate_sharding_spec_with_source_columns, }; -pub use wal::{WalAppendResult, WalAppender, WalReadEntry, WalTailer}; +pub use wal::{BatchDurableWatcher, WalAppendResult, WalAppender, WalReadEntry, WalTailer}; pub use write::ShardWriter; pub use write::ShardWriterConfig; +pub use write::WriteResult; diff --git a/rust/lance/src/dataset/mem_wal/write.rs b/rust/lance/src/dataset/mem_wal/write.rs index 491bb68aec5..53a2e3f2210 100644 --- a/rust/lance/src/dataset/mem_wal/write.rs +++ b/rust/lance/src/dataset/mem_wal/write.rs @@ -49,7 +49,8 @@ pub use super::wal::{WalEntry, WalEntryData, WalFlushResult, WalFlusher}; use super::memtable::flush::TriggerMemTableFlush; use super::scanner::GenerationWarmer; use super::wal::{ - TriggerWalFlush, WalAppender, WalFlushSource, WalOnlyState, WalTailer, empty_flush_result, + BatchDurableWatcher, TriggerWalFlush, WalAppender, WalFlushSource, WalOnlyState, WalTailer, + empty_flush_result, }; use super::manifest::ShardManifestStore; @@ -1528,14 +1529,7 @@ impl ShardWriter { /// `AlreadyExists`, indicating this writer has been fenced. #[instrument(name = "sw_put", level = "info", skip_all, fields(batch_count = batches.len(), shard_id = %self.config.shard_id))] pub async fn put(&self, batches: Vec) -> Result { - if batches.is_empty() { - return Err(Error::invalid_input("Cannot write empty batch list")); - } - for (i, batch) in batches.iter().enumerate() { - if batch.num_rows() == 0 { - return Err(Error::invalid_input(format!("Batch {} is empty", i))); - } - } + Self::validate_non_empty(&batches)?; match &self.mode { WriterMode::MemTable { @@ -1558,6 +1552,55 @@ impl ShardWriter { } } + /// Like [`Self::put`], but performs only the visible in-memory insert (and + /// triggers the durable flush) and returns the durability watcher *without* + /// awaiting it. The row is visible to subsequent reads on this writer the + /// instant this returns; durability is the caller's to await via the + /// returned watcher (`None` when `durable_write` is off). + /// + /// This lets a caller hold an *external* serialization lock across only the + /// in-memory critical section (read-merge-insert) and await durability + /// after releasing it, so concurrent durable flushes still coalesce. The + /// in-memory insert remains guarded by the writer's internal `state_lock` + /// exactly as [`Self::put`], so `BatchStore`'s single-writer invariant is + /// upheld without relying on the external lock. + /// + /// MemTable mode only; WAL-only mode has no in-memory tier to read back and + /// returns an error. + #[instrument(name = "sw_put_no_wait", level = "info", skip_all, fields(batch_count = batches.len(), shard_id = %self.config.shard_id))] + pub async fn put_no_wait( + &self, + batches: Vec, + ) -> Result<(WriteResult, Option)> { + Self::validate_non_empty(&batches)?; + + match &self.mode { + WriterMode::MemTable { + state, + writer_state, + backpressure, + } => { + self.put_memtable_no_wait(batches, state, writer_state, backpressure) + .await + } + WriterMode::WalOnly { .. } => Err(Error::invalid_input( + "put_no_wait is only supported in MemTable mode", + )), + } + } + + fn validate_non_empty(batches: &[RecordBatch]) -> Result<()> { + if batches.is_empty() { + return Err(Error::invalid_input("Cannot write empty batch list")); + } + for (i, batch) in batches.iter().enumerate() { + if batch.num_rows() == 0 { + return Err(Error::invalid_input(format!("Batch {} is empty", i))); + } + } + Ok(()) + } + async fn put_memtable( &self, batches: Vec, @@ -1565,6 +1608,28 @@ impl ShardWriter { writer_state: &Arc, backpressure: &BackpressureController, ) -> Result { + let (result, watcher) = self + .put_memtable_no_wait(batches, state_lock, writer_state, backpressure) + .await?; + // Wait for durability if configured (outside the lock). + if let Some(mut watcher) = watcher { + watcher.wait().await?; + } + Ok(result) + } + + /// In-memory half of [`Self::put_memtable`]: insert under `state_lock`, + /// trigger the WAL flush, and return the durability watcher *without* + /// awaiting it. The caller decides when (and outside which locks) to await + /// durability. The returned watcher is `None` when `durable_write` is off + /// (nothing to wait on). See [`Self::put_no_wait`]. + async fn put_memtable_no_wait( + &self, + batches: Vec, + state_lock: &Arc>, + writer_state: &Arc, + backpressure: &BackpressureController, + ) -> Result<(WriteResult, Option)> { // Apply backpressure if needed (before acquiring main lock) backpressure .maybe_apply_backpressure(|| { @@ -1578,7 +1643,7 @@ impl ShardWriter { let start = std::time::Instant::now(); // Acquire write lock for entire operation (atomic approach) - let (batch_positions, mut durable_watcher, batch_store, indexes) = { + let (batch_positions, durable_watcher, batch_store, indexes) = { let mut state = state_lock.write().await; // 1. Insert all batches into memtable atomically @@ -1609,8 +1674,10 @@ impl ShardWriter { self.stats.record_put(start.elapsed()); - // Wait for durability if configured (outside the lock) - if self.config.durable_write { + // Trigger the durable flush (outside the lock) and hand the watcher back + // un-awaited. The flush must be triggered here so the watcher can ever + // resolve; only the `wait()` is the caller's to schedule. + let watcher = if self.config.durable_write { self.wal_flusher.trigger_flush( WalFlushSource::BatchStore { batch_store, @@ -1619,10 +1686,12 @@ impl ShardWriter { batch_positions.end, None, )?; - durable_watcher.wait().await?; - } + Some(durable_watcher) + } else { + None + }; - Ok(WriteResult { batch_positions }) + Ok((WriteResult { batch_positions }, watcher)) } async fn put_wal_only( @@ -2814,6 +2883,75 @@ mod tests { writer.close().await.unwrap(); } + #[tokio::test] + async fn test_put_no_wait_durable_visible_then_durable() { + let (store, base_path, base_uri, _temp_dir) = create_local_store().await; + let schema = create_test_schema(); + + let config = ShardWriterConfig { + shard_id: Uuid::new_v4(), + shard_spec_id: 0, + durable_write: true, + sync_indexed_write: false, + max_wal_buffer_size: 1024 * 1024, + max_wal_flush_interval: None, + max_memtable_size: 64 * 1024 * 1024, + manifest_scan_batch_size: 2, + ..Default::default() + }; + + let writer = ShardWriter::open(store, base_path, base_uri, config, schema.clone(), vec![]) + .await + .unwrap(); + + let batch = create_test_batch(&schema, 0, 10); + let (result, watcher) = writer.put_no_wait(vec![batch]).await.unwrap(); + assert_eq!(result.batch_positions, 0..1); + + // Row is visible in memory before durability is awaited. + let stats = writer.memtable_stats().await.unwrap(); + assert_eq!(stats.row_count, 10); + + // durable_write is on, so a watcher is returned and resolves once the + // triggered flush lands. + let mut watcher = watcher.expect("durable_write returns a watcher"); + watcher.wait().await.unwrap(); + + writer.close().await.unwrap(); + } + + #[tokio::test] + async fn test_put_no_wait_non_durable_returns_no_watcher() { + let (store, base_path, base_uri, _temp_dir) = create_local_store().await; + let schema = create_test_schema(); + + let config = ShardWriterConfig { + shard_id: Uuid::new_v4(), + shard_spec_id: 0, + durable_write: false, + sync_indexed_write: false, + max_wal_buffer_size: 1024 * 1024, + max_wal_flush_interval: None, + max_memtable_size: 64 * 1024 * 1024, + manifest_scan_batch_size: 2, + ..Default::default() + }; + + let writer = ShardWriter::open(store, base_path, base_uri, config, schema.clone(), vec![]) + .await + .unwrap(); + + let batch = create_test_batch(&schema, 0, 10); + let (result, watcher) = writer.put_no_wait(vec![batch]).await.unwrap(); + assert_eq!(result.batch_positions, 0..1); + assert!(watcher.is_none(), "non-durable put has nothing to await"); + + let stats = writer.memtable_stats().await.unwrap(); + assert_eq!(stats.row_count, 10); + + writer.close().await.unwrap(); + } + #[tokio::test] async fn test_shard_writer_multiple_writes() { let (store, base_path, base_uri, _temp_dir) = create_local_store().await;