Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/lance/src/dataset/mem_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub use sharding::{
evaluate_sharding_spec, evaluate_sharding_spec_with_embedded_columns,
evaluate_sharding_spec_with_source_columns,
};
pub use wal::{WalAppendResult, WalAppender, WalReadEntry, WalTailer};
pub use wal::{BatchDurableWatcher, WalAppendResult, WalAppender, WalReadEntry, WalTailer};
pub use write::ShardWriter;
pub use write::ShardWriterConfig;
pub use write::WriteResult;
168 changes: 153 additions & 15 deletions rust/lance/src/dataset/mem_wal/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ pub use super::wal::{WalEntry, WalEntryData, WalFlushResult, WalFlusher};
use super::memtable::flush::TriggerMemTableFlush;
use super::scanner::GenerationWarmer;
use super::wal::{
TriggerWalFlush, WalAppender, WalFlushSource, WalOnlyState, WalTailer, empty_flush_result,
BatchDurableWatcher, TriggerWalFlush, WalAppender, WalFlushSource, WalOnlyState, WalTailer,
empty_flush_result,
};

use super::manifest::ShardManifestStore;
Expand Down Expand Up @@ -1528,14 +1529,7 @@ impl ShardWriter {
/// `AlreadyExists`, indicating this writer has been fenced.
#[instrument(name = "sw_put", level = "info", skip_all, fields(batch_count = batches.len(), shard_id = %self.config.shard_id))]
pub async fn put(&self, batches: Vec<RecordBatch>) -> Result<WriteResult> {
if batches.is_empty() {
return Err(Error::invalid_input("Cannot write empty batch list"));
}
for (i, batch) in batches.iter().enumerate() {
if batch.num_rows() == 0 {
return Err(Error::invalid_input(format!("Batch {} is empty", i)));
}
}
Self::validate_non_empty(&batches)?;

match &self.mode {
WriterMode::MemTable {
Expand All @@ -1558,13 +1552,84 @@ impl ShardWriter {
}
}

/// Like [`Self::put`], but performs only the visible in-memory insert (and
/// triggers the durable flush) and returns the durability watcher *without*
/// awaiting it. The row is visible to subsequent reads on this writer the
/// instant this returns; durability is the caller's to await via the
/// returned watcher (`None` when `durable_write` is off).
///
/// This lets a caller hold an *external* serialization lock across only the
/// in-memory critical section (read-merge-insert) and await durability
/// after releasing it, so concurrent durable flushes still coalesce. The
/// in-memory insert remains guarded by the writer's internal `state_lock`
/// exactly as [`Self::put`], so `BatchStore`'s single-writer invariant is
/// upheld without relying on the external lock.
///
/// MemTable mode only; WAL-only mode has no in-memory tier to read back and
/// returns an error.
#[instrument(name = "sw_put_no_wait", level = "info", skip_all, fields(batch_count = batches.len(), shard_id = %self.config.shard_id))]
pub async fn put_no_wait(
&self,
batches: Vec<RecordBatch>,
) -> Result<(WriteResult, Option<BatchDurableWatcher>)> {
Self::validate_non_empty(&batches)?;

match &self.mode {
WriterMode::MemTable {
state,
writer_state,
backpressure,
} => {
self.put_memtable_no_wait(batches, state, writer_state, backpressure)
.await
}
WriterMode::WalOnly { .. } => Err(Error::invalid_input(
"put_no_wait is only supported in MemTable mode",
)),
}
}

fn validate_non_empty(batches: &[RecordBatch]) -> Result<()> {
if batches.is_empty() {
return Err(Error::invalid_input("Cannot write empty batch list"));
}
for (i, batch) in batches.iter().enumerate() {
if batch.num_rows() == 0 {
return Err(Error::invalid_input(format!("Batch {} is empty", i)));
}
}
Ok(())
}

async fn put_memtable(
&self,
batches: Vec<RecordBatch>,
state_lock: &Arc<RwLock<WriterState>>,
writer_state: &Arc<SharedWriterState>,
backpressure: &BackpressureController,
) -> Result<WriteResult> {
let (result, watcher) = self
.put_memtable_no_wait(batches, state_lock, writer_state, backpressure)
.await?;
// Wait for durability if configured (outside the lock).
if let Some(mut watcher) = watcher {
watcher.wait().await?;
}
Ok(result)
}

/// In-memory half of [`Self::put_memtable`]: insert under `state_lock`,
/// trigger the WAL flush, and return the durability watcher *without*
/// awaiting it. The caller decides when (and outside which locks) to await
/// durability. The returned watcher is `None` when `durable_write` is off
/// (nothing to wait on). See [`Self::put_no_wait`].
async fn put_memtable_no_wait(
&self,
batches: Vec<RecordBatch>,
state_lock: &Arc<RwLock<WriterState>>,
writer_state: &Arc<SharedWriterState>,
backpressure: &BackpressureController,
) -> Result<(WriteResult, Option<BatchDurableWatcher>)> {
// Apply backpressure if needed (before acquiring main lock)
backpressure
.maybe_apply_backpressure(|| {
Expand All @@ -1578,7 +1643,7 @@ impl ShardWriter {
let start = std::time::Instant::now();

// Acquire write lock for entire operation (atomic approach)
let (batch_positions, mut durable_watcher, batch_store, indexes) = {
let (batch_positions, durable_watcher, batch_store, indexes) = {
let mut state = state_lock.write().await;

// 1. Insert all batches into memtable atomically
Expand Down Expand Up @@ -1609,8 +1674,10 @@ impl ShardWriter {

self.stats.record_put(start.elapsed());

// Wait for durability if configured (outside the lock)
if self.config.durable_write {
// Trigger the durable flush (outside the lock) and hand the watcher back
// un-awaited. The flush must be triggered here so the watcher can ever
// resolve; only the `wait()` is the caller's to schedule.
let watcher = if self.config.durable_write {
self.wal_flusher.trigger_flush(
WalFlushSource::BatchStore {
batch_store,
Expand All @@ -1619,10 +1686,12 @@ impl ShardWriter {
batch_positions.end,
None,
)?;
durable_watcher.wait().await?;
}
Some(durable_watcher)
} else {
None
};

Ok(WriteResult { batch_positions })
Ok((WriteResult { batch_positions }, watcher))
}

async fn put_wal_only(
Expand Down Expand Up @@ -2814,6 +2883,75 @@ mod tests {
writer.close().await.unwrap();
}

#[tokio::test]
async fn test_put_no_wait_durable_visible_then_durable() {
let (store, base_path, base_uri, _temp_dir) = create_local_store().await;
let schema = create_test_schema();

let config = ShardWriterConfig {
shard_id: Uuid::new_v4(),
shard_spec_id: 0,
durable_write: true,
sync_indexed_write: false,
max_wal_buffer_size: 1024 * 1024,
max_wal_flush_interval: None,
max_memtable_size: 64 * 1024 * 1024,
manifest_scan_batch_size: 2,
..Default::default()
};

let writer = ShardWriter::open(store, base_path, base_uri, config, schema.clone(), vec![])
.await
.unwrap();

let batch = create_test_batch(&schema, 0, 10);
let (result, watcher) = writer.put_no_wait(vec![batch]).await.unwrap();
assert_eq!(result.batch_positions, 0..1);

// Row is visible in memory before durability is awaited.
let stats = writer.memtable_stats().await.unwrap();
assert_eq!(stats.row_count, 10);

// durable_write is on, so a watcher is returned and resolves once the
// triggered flush lands.
let mut watcher = watcher.expect("durable_write returns a watcher");
watcher.wait().await.unwrap();

writer.close().await.unwrap();
}

#[tokio::test]
async fn test_put_no_wait_non_durable_returns_no_watcher() {
let (store, base_path, base_uri, _temp_dir) = create_local_store().await;
let schema = create_test_schema();

let config = ShardWriterConfig {
shard_id: Uuid::new_v4(),
shard_spec_id: 0,
durable_write: false,
sync_indexed_write: false,
max_wal_buffer_size: 1024 * 1024,
max_wal_flush_interval: None,
max_memtable_size: 64 * 1024 * 1024,
manifest_scan_batch_size: 2,
..Default::default()
};

let writer = ShardWriter::open(store, base_path, base_uri, config, schema.clone(), vec![])
.await
.unwrap();

let batch = create_test_batch(&schema, 0, 10);
let (result, watcher) = writer.put_no_wait(vec![batch]).await.unwrap();
assert_eq!(result.batch_positions, 0..1);
assert!(watcher.is_none(), "non-durable put has nothing to await");

let stats = writer.memtable_stats().await.unwrap();
assert_eq!(stats.row_count, 10);

writer.close().await.unwrap();
}

#[tokio::test]
async fn test_shard_writer_multiple_writes() {
let (store, base_path, base_uri, _temp_dir) = create_local_store().await;
Expand Down
Loading