diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 93932f35332..1a100b94c4e 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -181,6 +181,58 @@ fn resolve_worker_memory_limit_bytes(params: &InvertedIndexParams, num_workers: .unwrap_or(default_worker_memory_limit_bytes) } +/// Minimum [`get_num_compute_intensive_cpus`] for the pipelined FTS posting write path. +/// +/// `write_posting_lists` runs batch encoding on `spawn_cpu` while `FileWriter::write_batch` +/// also submits column page encoding via `spawn_cpu`. With only one `lance-cpu` blocking +/// thread the producer blocks on the bounded channel while the writer waits for encoding, +/// which deadlocks with no further log output. +const MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE: usize = 2; + +fn fts_posting_pipeline_insufficient_cpu_threads_message(available_cpu_threads: usize) -> String { + format!( + "FTS inverted index build requires at least {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE} \ + lance-cpu blocking threads, but only {available_cpu_threads} is available. \ + Posting-list batch encoding and Lance file page encoding both use the same global \ + `spawn_cpu` pool; with fewer than \ + {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE} threads the pipelined writer deadlocks \ + silently after logging \"writing N posting lists\". \ + Set environment variable LANCE_CPU_THREADS to at least \ + {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE}, or use a machine where \ + num_cpus > LANCE_IO_CORE_RESERVATION (currently reserving {} cores for I/O).", + *IO_CORE_RESERVATION + ) +} + +fn check_fts_posting_pipeline_cpu_threads() -> Result<()> { + let available_cpu_threads = get_num_compute_intensive_cpus(); + if available_cpu_threads < MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE { + let message = fts_posting_pipeline_insufficient_cpu_threads_message(available_cpu_threads); + log::error!("{message}"); + return Err(Error::invalid_input(message)); + } + Ok(()) +} + +/// Limits concurrent [`InnerBuilder::write_posting_lists_pipelined`] calls so that +/// at least one [`spawn_cpu`] thread remains free for nested page encoding inside +/// `FileWriter::write_batch`. +/// +/// Each pipelined writer holds one `spawn_cpu` thread for its producer. Without +/// a free thread, the consumer deadlocks while waiting for page encoding. +/// The semaphore permits `available - 1` concurrent writers regardless of how +/// many workers exist or how many flush simultaneously. +fn fts_posting_write_semaphore() -> &'static tokio::sync::Semaphore { + static SEMAPHORE: LazyLock = LazyLock::new(|| { + let available = get_num_compute_intensive_cpus(); + // With available < 2 the permit count would be 0, but the early guard + // in write_posting_lists rejects before we reach acquire. + let permits = available.saturating_sub(1).max(1); + tokio::sync::Semaphore::new(permits) + }); + &SEMAPHORE +} + fn merge_all_tail_partitions(tails: Vec) -> Result> { if tails.is_empty() { return Ok(None); @@ -1039,6 +1091,24 @@ impl InnerBuilder { store: &dyn IndexStore, docs: Arc, path: &str, + ) -> Result { + check_fts_posting_pipeline_cpu_threads()?; + let _permit = fts_posting_write_semaphore() + .acquire() + .await + .expect("fts posting write semaphore is never closed"); + self.write_posting_lists_pipelined(store, docs, path).await + } + + /// Pipelined posting-list writer used by [`Self::write_posting_lists`]. + /// + /// Exposed for deadlock regression tests that must bypass the CPU-thread guard. + #[instrument(level = "debug", skip_all)] + async fn write_posting_lists_pipelined( + &mut self, + store: &dyn IndexStore, + docs: Arc, + path: &str, ) -> Result { let id = self.id; let mut writer = store @@ -2162,6 +2232,241 @@ mod tests { use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::time::Duration; + fn run_test_in_child(test_name: &str, child_env: &str, cpu_threads: usize) { + let output = std::process::Command::new(std::env::current_exe().expect("test executable")) + .env(child_env, "1") + .env("LANCE_CPU_THREADS", cpu_threads.to_string()) + .args(["--exact", test_name, "--nocapture"]) + .output() + .expect("spawn test child process"); + + assert!( + output.status.success(), + "child test failed:\nstdout: {}\nstderr: {}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr), + ); + } + + #[test] + fn test_fts_posting_pipeline_cpu_threads_error_message() { + let message = fts_posting_pipeline_insufficient_cpu_threads_message(1); + assert!( + message.contains("LANCE_CPU_THREADS"), + "message should mention LANCE_CPU_THREADS: {message}" + ); + assert!( + message.contains("deadlock"), + "message should mention deadlock: {message}" + ); + assert!( + message.contains("writing N posting lists"), + "message should reference the last log line users see: {message}" + ); + } + + async fn fts_posting_pipeline_deadlock_reproducer() -> Result<()> { + assert_eq!( + get_num_compute_intensive_cpus(), + 1, + "deadlock reproducer child must run with LANCE_CPU_THREADS=1" + ); + + let (tx, rx) = async_channel::bounded(1usize); + let producer = spawn_cpu(move || -> Result<()> { + tx.send_blocking(1u32) + .map_err(|err| Error::execution(format!("send batch 1: {err}")))?; + tx.send_blocking(2u32) + .map_err(|err| Error::execution(format!("send batch 2: {err}")))?; + // Third send blocks on the full queue while the consumer awaits nested `spawn_cpu`. + tx.send_blocking(3u32) + .map_err(|err| Error::execution(format!("send batch 3: {err}")))?; + Ok(()) + }); + + while let Ok(_batch) = rx.recv().await { + // Stand in for Lance `FileWriter::write_batch` page encoding (`spawn_cpu`). + spawn_cpu(|| Ok::<(), Error>(())).await?; + } + + producer.await?; + Ok(()) + } + + /// Reproduces the historical FTS posting-list write deadlock in a fresh child process + /// with `LANCE_CPU_THREADS=1` (single `lance-cpu` blocking thread). + /// + /// Mirrors [`InnerBuilder::write_posting_lists_pipelined`]: producer on `spawn_cpu` with a + /// depth-1 channel, consumer awaiting nested `spawn_cpu` (as in Lance page encoding). + /// The guarded [`InnerBuilder::write_posting_lists`] fails fast instead. + #[test] + fn test_fts_posting_pipeline_write_posting_lists_deadlocks_with_one_cpu_thread() { + if std::env::var("LANCE_FTS_DEADLOCK_CHILD").as_deref() == Ok("1") { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .build() + .expect("build tokio runtime for deadlock reproducer"); + + let timed_out = runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(10), + fts_posting_pipeline_deadlock_reproducer(), + ) + .await + }); + + assert!( + timed_out.is_err(), + "unguarded posting-list pipeline should deadlock with LANCE_CPU_THREADS=1" + ); + return; + } + + run_test_in_child( + "scalar::inverted::builder::tests::test_fts_posting_pipeline_write_posting_lists_deadlocks_with_one_cpu_thread", + "LANCE_FTS_DEADLOCK_CHILD", + 1, + ); + } + + #[test] + fn test_empty_update_with_one_cpu_thread_records_deleted_fragments() { + if std::env::var("LANCE_FTS_EMPTY_UPDATE_CHILD").as_deref() == Ok("1") { + assert_eq!( + get_num_compute_intensive_cpus(), + 1, + "empty-update child must run with LANCE_CPU_THREADS=1" + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .build() + .expect("build tokio runtime for empty update"); + runtime.block_on(async { + let index_dir = TempDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + index_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + let schema = make_doc_batch("unused", 0).schema(); + let stream = RecordBatchStreamAdapter::new( + schema, + stream::empty::>(), + ); + let old_data_filter = Some(crate::scalar::OldIndexDataFilter::Fragments { + to_keep: RoaringBitmap::new(), + to_remove: RoaringBitmap::from_iter([3, 7]), + }); + + let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default()); + builder + .update(Box::pin(stream), store.as_ref(), old_data_filter) + .await + .expect("empty update should not require the posting-list writer"); + + let index = InvertedIndex::load(store, None, &LanceCache::no_cache()) + .await + .expect("load updated index"); + assert_eq!(index.deleted_fragments(), &RoaringBitmap::from_iter([3, 7])); + }); + return; + } + + run_test_in_child( + "scalar::inverted::builder::tests::test_empty_update_with_one_cpu_thread_records_deleted_fragments", + "LANCE_FTS_EMPTY_UPDATE_CHILD", + 1, + ); + } + + /// With LANCE_CPU_THREADS=2 the semaphore grants only 1 permit. Two concurrent + /// pipelined writers should serialize without deadlock. + #[test] + fn test_fts_posting_semaphore_serializes_with_two_cpu_threads() { + if std::env::var("LANCE_FTS_SEMAPHORE_CHILD").as_deref() == Ok("1") { + assert_eq!( + get_num_compute_intensive_cpus(), + 2, + "semaphore child must run with LANCE_CPU_THREADS=2" + ); + + let semaphore = fts_posting_write_semaphore(); + assert_eq!( + semaphore.available_permits(), + 1, + "with 2 CPU threads the semaphore should have 1 permit" + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .build() + .expect("build tokio runtime for semaphore test"); + + runtime.block_on(async { + // Two tasks both acquire the semaphore. + let task1 = tokio::spawn(async { + let _permit = semaphore.acquire().await.expect("acquire permit"); + spawn_cpu(|| Ok::<_, Error>(())) + .await + .expect("spawn_cpu ok"); + }); + let task2 = tokio::spawn(async { + let _permit = semaphore.acquire().await.expect("acquire permit"); + spawn_cpu(|| Ok::<_, Error>(())) + .await + .expect("spawn_cpu ok"); + }); + + let result = tokio::time::timeout(Duration::from_secs(10), async { + task1.await.expect("task1 join"); + task2.await.expect("task2 join"); + }) + .await; + assert!( + result.is_ok(), + "two semaphore-controlled tasks should complete without deadlock" + ); + }); + return; + } + + run_test_in_child( + "scalar::inverted::builder::tests::test_fts_posting_semaphore_serializes_with_two_cpu_threads", + "LANCE_FTS_SEMAPHORE_CHILD", + 2, + ); + } + + /// With LANCE_CPU_THREADS=4 the semaphore should have 3 permits. + #[test] + fn test_fts_posting_semaphore_permits_scale_with_threads() { + if std::env::var("LANCE_FTS_SEMAPHORE_PERMITS_CHILD").as_deref() == Ok("1") { + assert_eq!( + get_num_compute_intensive_cpus(), + 4, + "semaphore permits child must run with LANCE_CPU_THREADS=4" + ); + + let semaphore = fts_posting_write_semaphore(); + assert_eq!( + semaphore.available_permits(), + 3, + "with 4 CPU threads the semaphore should have 3 permits (4 - 1)" + ); + return; + } + + run_test_in_child( + "scalar::inverted::builder::tests::test_fts_posting_semaphore_permits_scale_with_threads", + "LANCE_FTS_SEMAPHORE_PERMITS_CHILD", + 4, + ); + } + fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("doc", DataType::Utf8, true),