From 217577a2de37385df18ff3422113af81d46d5214 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Thu, 18 Jun 2026 10:27:38 +0800 Subject: [PATCH 1/3] fix(index): fail fast when FTS posting pipeline would deadlock When only one lance-cpu blocking thread is available, the pipelined FTS posting-list writer deadlocks silently after logging "writing N posting lists". Reject the build early with a descriptive error and add regression tests that reproduce the deadlock in a child process. Co-authored-by: Cursor --- .../src/scalar/inverted/builder.rs | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 93932f35332..df498b9e984 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -181,6 +181,38 @@ fn resolve_worker_memory_limit_bytes(params: &InvertedIndexParams, num_workers: .unwrap_or(default_worker_memory_limit_bytes) } +/// Minimum [`get_num_compute_intensive_cpus`] for the pipelined FTS posting write path. +/// +/// `write_posting_lists` runs batch encoding on `spawn_cpu` while `FileWriter::write_batch` +/// also submits column page encoding via `spawn_cpu`. With only one `lance-cpu` blocking +/// thread the producer blocks on the bounded channel while the writer waits for encoding, +/// which deadlocks with no further log output. +const MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE: usize = 2; + +fn fts_posting_pipeline_insufficient_cpu_threads_message(available_cpu_threads: usize) -> String { + format!( + "FTS inverted index build requires at least {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE} \ + lance-cpu blocking threads, but only {available_cpu_threads} is available. \ + Posting-list batch encoding and Lance file page encoding both use the same global \ + `spawn_cpu` pool; with a single thread the pipelined writer deadlocks silently \ + after logging \"writing N posting lists\". \ + Set environment variable LANCE_CPU_THREADS to at least \ + {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE}, or use a machine where \ + num_cpus > LANCE_IO_CORE_RESERVATION (currently reserving {} cores for I/O).", + *IO_CORE_RESERVATION + ) +} + +fn check_fts_posting_pipeline_cpu_threads() -> Result<()> { + let available_cpu_threads = get_num_compute_intensive_cpus(); + if available_cpu_threads < MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE { + let message = fts_posting_pipeline_insufficient_cpu_threads_message(available_cpu_threads); + log::error!("{message}"); + return Err(Error::invalid_input(message)); + } + Ok(()) +} + fn merge_all_tail_partitions(tails: Vec) -> Result> { if tails.is_empty() { return Ok(None); @@ -437,6 +469,7 @@ impl InvertedIndexBuilder { stream: SendableRecordBatchStream, dest_store: &dyn IndexStore, ) -> Result> { + check_fts_posting_pipeline_cpu_threads()?; let num_workers = resolve_num_workers(&self.params); let tokenizer = self.params.build()?; let with_position = self.params.with_position; @@ -1039,6 +1072,20 @@ impl InnerBuilder { store: &dyn IndexStore, docs: Arc, path: &str, + ) -> Result { + check_fts_posting_pipeline_cpu_threads()?; + self.write_posting_lists_pipelined(store, docs, path).await + } + + /// Pipelined posting-list writer used by [`Self::write_posting_lists`]. + /// + /// Exposed for deadlock regression tests that must bypass the CPU-thread guard. + #[instrument(level = "debug", skip_all)] + async fn write_posting_lists_pipelined( + &mut self, + store: &dyn IndexStore, + docs: Arc, + path: &str, ) -> Result { let id = self.id; let mut writer = store @@ -2162,6 +2209,100 @@ mod tests { use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::time::Duration; + #[test] + fn test_fts_posting_pipeline_cpu_threads_error_message() { + let message = fts_posting_pipeline_insufficient_cpu_threads_message(1); + assert!( + message.contains("LANCE_CPU_THREADS"), + "message should mention LANCE_CPU_THREADS: {message}" + ); + assert!( + message.contains("deadlock"), + "message should mention deadlock: {message}" + ); + assert!( + message.contains("writing N posting lists"), + "message should reference the last log line users see: {message}" + ); + } + + async fn fts_posting_pipeline_deadlock_reproducer() -> Result<()> { + assert_eq!( + get_num_compute_intensive_cpus(), + 1, + "deadlock reproducer child must run with LANCE_CPU_THREADS=1" + ); + + let (tx, rx) = async_channel::bounded(1usize); + let producer = spawn_cpu(move || -> Result<()> { + tx.send_blocking(1u32) + .map_err(|err| Error::execution(format!("send batch 1: {err}")))?; + tx.send_blocking(2u32) + .map_err(|err| Error::execution(format!("send batch 2: {err}")))?; + // Third send blocks on the full queue while the consumer awaits nested `spawn_cpu`. + tx.send_blocking(3u32) + .map_err(|err| Error::execution(format!("send batch 3: {err}")))?; + Ok(()) + }); + + while let Ok(_batch) = rx.recv().await { + // Stand in for Lance `FileWriter::write_batch` page encoding (`spawn_cpu`). + spawn_cpu(|| Ok::<(), Error>(())).await?; + } + + producer.await?; + Ok(()) + } + + /// Reproduces the historical FTS posting-list write deadlock in a fresh child process + /// with `LANCE_CPU_THREADS=1` (single `lance-cpu` blocking thread). + /// + /// Mirrors [`InnerBuilder::write_posting_lists_pipelined`]: producer on `spawn_cpu` with a + /// depth-1 channel, consumer awaiting nested `spawn_cpu` (as in Lance page encoding). + /// The guarded [`InnerBuilder::write_posting_lists`] fails fast instead. + #[test] + fn test_fts_posting_pipeline_write_posting_lists_deadlocks_with_one_cpu_thread() { + if std::env::var("LANCE_FTS_DEADLOCK_CHILD").as_deref() == Ok("1") { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .build() + .expect("build tokio runtime for deadlock reproducer"); + + let timed_out = runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(10), + fts_posting_pipeline_deadlock_reproducer(), + ) + .await + }); + + assert!( + timed_out.is_err(), + "unguarded posting-list pipeline should deadlock with LANCE_CPU_THREADS=1" + ); + return; + } + + let output = std::process::Command::new(std::env::current_exe().expect("test executable")) + .env("LANCE_FTS_DEADLOCK_CHILD", "1") + .env("LANCE_CPU_THREADS", "1") + .args([ + "--exact", + "scalar::inverted::builder::tests::test_fts_posting_pipeline_write_posting_lists_deadlocks_with_one_cpu_thread", + "--nocapture", + ]) + .output() + .expect("spawn FTS posting-list deadlock reproducer child"); + + assert!( + output.status.success(), + "child deadlock reproducer failed:\nstdout: {}\nstderr: {}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr), + ); + } + fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("doc", DataType::Utf8, true), From 0a3980d0574b9aaad7316cd1202c192c49ab2ed8 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Thu, 18 Jun 2026 16:06:49 +0800 Subject: [PATCH 2/3] fix(index): scope FTS CPU guard to posting writes --- .../src/scalar/inverted/builder.rs | 87 +++++++++++++++---- 1 file changed, 71 insertions(+), 16 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index df498b9e984..027742beb64 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -469,7 +469,6 @@ impl InvertedIndexBuilder { stream: SendableRecordBatchStream, dest_store: &dyn IndexStore, ) -> Result> { - check_fts_posting_pipeline_cpu_threads()?; let num_workers = resolve_num_workers(&self.params); let tokenizer = self.params.build()?; let with_position = self.params.with_position; @@ -2209,6 +2208,22 @@ mod tests { use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::time::Duration; + fn run_test_in_child(test_name: &str, child_env: &str, cpu_threads: usize) { + let output = std::process::Command::new(std::env::current_exe().expect("test executable")) + .env(child_env, "1") + .env("LANCE_CPU_THREADS", cpu_threads.to_string()) + .args(["--exact", test_name, "--nocapture"]) + .output() + .expect("spawn test child process"); + + assert!( + output.status.success(), + "child test failed:\nstdout: {}\nstderr: {}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr), + ); + } + #[test] fn test_fts_posting_pipeline_cpu_threads_error_message() { let message = fts_posting_pipeline_insufficient_cpu_threads_message(1); @@ -2284,22 +2299,62 @@ mod tests { return; } - let output = std::process::Command::new(std::env::current_exe().expect("test executable")) - .env("LANCE_FTS_DEADLOCK_CHILD", "1") - .env("LANCE_CPU_THREADS", "1") - .args([ - "--exact", - "scalar::inverted::builder::tests::test_fts_posting_pipeline_write_posting_lists_deadlocks_with_one_cpu_thread", - "--nocapture", - ]) - .output() - .expect("spawn FTS posting-list deadlock reproducer child"); + run_test_in_child( + "scalar::inverted::builder::tests::test_fts_posting_pipeline_write_posting_lists_deadlocks_with_one_cpu_thread", + "LANCE_FTS_DEADLOCK_CHILD", + 1, + ); + } - assert!( - output.status.success(), - "child deadlock reproducer failed:\nstdout: {}\nstderr: {}", - String::from_utf8_lossy(&output.stdout), - String::from_utf8_lossy(&output.stderr), + #[test] + fn test_empty_update_with_one_cpu_thread_records_deleted_fragments() { + if std::env::var("LANCE_FTS_EMPTY_UPDATE_CHILD").as_deref() == Ok("1") { + assert_eq!( + get_num_compute_intensive_cpus(), + 1, + "empty-update child must run with LANCE_CPU_THREADS=1" + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .build() + .expect("build tokio runtime for empty update"); + runtime.block_on(async { + let index_dir = TempDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + index_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + let schema = make_doc_batch("unused", 0).schema(); + let stream = RecordBatchStreamAdapter::new( + schema, + stream::empty::>(), + ); + let old_data_filter = Some(crate::scalar::OldIndexDataFilter::Fragments { + to_keep: RoaringBitmap::new(), + to_remove: RoaringBitmap::from_iter([3, 7]), + }); + + let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default()); + builder + .update(Box::pin(stream), store.as_ref(), old_data_filter) + .await + .expect("empty update should not require the posting-list writer"); + + let index = InvertedIndex::load(store, None, &LanceCache::no_cache()) + .await + .expect("load updated index"); + assert_eq!(index.deleted_fragments(), &RoaringBitmap::from_iter([3, 7])); + }); + return; + } + + run_test_in_child( + "scalar::inverted::builder::tests::test_empty_update_with_one_cpu_thread_records_deleted_fragments", + "LANCE_FTS_EMPTY_UPDATE_CHILD", + 1, ); } From a16e749a546a9b645156749f1aa641b2f6be0a49 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Fri, 19 Jun 2026 11:27:03 +0800 Subject: [PATCH 3/3] perf(index): limit concurrent FTS posting-list writers to prevent deadlock Add a tokio::sync::Semaphore that limits concurrent write_posting_lists_pipelined calls to available_cpu_threads - 1, ensuring at least one spawn_cpu thread is always free for the consumer's nested page encoding inside FileWriter::write_batch. Previously only an early guard rejected configurations with fewer than 2 lance-cpu threads, but with LANCE_CPU_THREADS=2 and num_workers=2, both workers could occupy both spawn_cpu threads as producers, deadlocking when consumers needed a thread for page encoding. The semaphore fixes the concurrent-worker deadlock case while preserving the early guard for hopeless single-thread configs. --- .../src/scalar/inverted/builder.rs | 113 +++++++++++++++++- 1 file changed, 111 insertions(+), 2 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 027742beb64..1a100b94c4e 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -194,8 +194,9 @@ fn fts_posting_pipeline_insufficient_cpu_threads_message(available_cpu_threads: "FTS inverted index build requires at least {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE} \ lance-cpu blocking threads, but only {available_cpu_threads} is available. \ Posting-list batch encoding and Lance file page encoding both use the same global \ - `spawn_cpu` pool; with a single thread the pipelined writer deadlocks silently \ - after logging \"writing N posting lists\". \ + `spawn_cpu` pool; with fewer than \ + {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE} threads the pipelined writer deadlocks \ + silently after logging \"writing N posting lists\". \ Set environment variable LANCE_CPU_THREADS to at least \ {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE}, or use a machine where \ num_cpus > LANCE_IO_CORE_RESERVATION (currently reserving {} cores for I/O).", @@ -213,6 +214,25 @@ fn check_fts_posting_pipeline_cpu_threads() -> Result<()> { Ok(()) } +/// Limits concurrent [`InnerBuilder::write_posting_lists_pipelined`] calls so that +/// at least one [`spawn_cpu`] thread remains free for nested page encoding inside +/// `FileWriter::write_batch`. +/// +/// Each pipelined writer holds one `spawn_cpu` thread for its producer. Without +/// a free thread, the consumer deadlocks while waiting for page encoding. +/// The semaphore permits `available - 1` concurrent writers regardless of how +/// many workers exist or how many flush simultaneously. +fn fts_posting_write_semaphore() -> &'static tokio::sync::Semaphore { + static SEMAPHORE: LazyLock = LazyLock::new(|| { + let available = get_num_compute_intensive_cpus(); + // With available < 2 the permit count would be 0, but the early guard + // in write_posting_lists rejects before we reach acquire. + let permits = available.saturating_sub(1).max(1); + tokio::sync::Semaphore::new(permits) + }); + &SEMAPHORE +} + fn merge_all_tail_partitions(tails: Vec) -> Result> { if tails.is_empty() { return Ok(None); @@ -1073,6 +1093,10 @@ impl InnerBuilder { path: &str, ) -> Result { check_fts_posting_pipeline_cpu_threads()?; + let _permit = fts_posting_write_semaphore() + .acquire() + .await + .expect("fts posting write semaphore is never closed"); self.write_posting_lists_pipelined(store, docs, path).await } @@ -2358,6 +2382,91 @@ mod tests { ); } + /// With LANCE_CPU_THREADS=2 the semaphore grants only 1 permit. Two concurrent + /// pipelined writers should serialize without deadlock. + #[test] + fn test_fts_posting_semaphore_serializes_with_two_cpu_threads() { + if std::env::var("LANCE_FTS_SEMAPHORE_CHILD").as_deref() == Ok("1") { + assert_eq!( + get_num_compute_intensive_cpus(), + 2, + "semaphore child must run with LANCE_CPU_THREADS=2" + ); + + let semaphore = fts_posting_write_semaphore(); + assert_eq!( + semaphore.available_permits(), + 1, + "with 2 CPU threads the semaphore should have 1 permit" + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .build() + .expect("build tokio runtime for semaphore test"); + + runtime.block_on(async { + // Two tasks both acquire the semaphore. + let task1 = tokio::spawn(async { + let _permit = semaphore.acquire().await.expect("acquire permit"); + spawn_cpu(|| Ok::<_, Error>(())) + .await + .expect("spawn_cpu ok"); + }); + let task2 = tokio::spawn(async { + let _permit = semaphore.acquire().await.expect("acquire permit"); + spawn_cpu(|| Ok::<_, Error>(())) + .await + .expect("spawn_cpu ok"); + }); + + let result = tokio::time::timeout(Duration::from_secs(10), async { + task1.await.expect("task1 join"); + task2.await.expect("task2 join"); + }) + .await; + assert!( + result.is_ok(), + "two semaphore-controlled tasks should complete without deadlock" + ); + }); + return; + } + + run_test_in_child( + "scalar::inverted::builder::tests::test_fts_posting_semaphore_serializes_with_two_cpu_threads", + "LANCE_FTS_SEMAPHORE_CHILD", + 2, + ); + } + + /// With LANCE_CPU_THREADS=4 the semaphore should have 3 permits. + #[test] + fn test_fts_posting_semaphore_permits_scale_with_threads() { + if std::env::var("LANCE_FTS_SEMAPHORE_PERMITS_CHILD").as_deref() == Ok("1") { + assert_eq!( + get_num_compute_intensive_cpus(), + 4, + "semaphore permits child must run with LANCE_CPU_THREADS=4" + ); + + let semaphore = fts_posting_write_semaphore(); + assert_eq!( + semaphore.available_permits(), + 3, + "with 4 CPU threads the semaphore should have 3 permits (4 - 1)" + ); + return; + } + + run_test_in_child( + "scalar::inverted::builder::tests::test_fts_posting_semaphore_permits_scale_with_threads", + "LANCE_FTS_SEMAPHORE_PERMITS_CHILD", + 4, + ); + } + fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("doc", DataType::Utf8, true),