Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 305 additions & 0 deletions rust/lance-index/src/scalar/inverted/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,58 @@ fn resolve_worker_memory_limit_bytes(params: &InvertedIndexParams, num_workers:
.unwrap_or(default_worker_memory_limit_bytes)
}

/// Minimum [`get_num_compute_intensive_cpus`] for the pipelined FTS posting write path.
///
/// `write_posting_lists` runs batch encoding on `spawn_cpu` while `FileWriter::write_batch`
/// also submits column page encoding via `spawn_cpu`. With only one `lance-cpu` blocking
/// thread the producer blocks on the bounded channel while the writer waits for encoding,
/// which deadlocks with no further log output.
const MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE: usize = 2;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fixed threshold of 2 threads does not cover concurrent worker flushes. resolve_num_workers can still choose all available CPU threads, so with LANCE_CPU_THREADS=2 two workers can each occupy a spawn_cpu producer and then both writers wait for nested page-encoding spawn_cpu work with no free thread. This should reserve capacity for nested encoding or limit the number of concurrent posting writers/workers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BubbleCal Thanks for the review. Addressed the threshold concern using a tokio::sync::Semaphore:

Fix: Added fts_posting_write_semaphore() with available_cpu_threads - 1 permits (at least 1). write_posting_lists now acquires a permit before calling write_posting_lists_pipelined, so at most available - 1 producers hold spawn_cpu threads concurrently, always leaving one free for the consumer's nested page encoding.

This handles the concurrent-worker case correctly:

  • LANCE_CPU_THREADS=2, num_workers=2: semaphore has 1 permit → only 1 writer runs at a time → no deadlock.
  • LANCE_CPU_THREADS=4, num_workers=4: semaphore has 3 permits → up to 3 writers concurrently, 1 thread remains free for consumers.

Two new tests verify the semaphore behavior in child processes with LANCE_CPU_THREADS=2 and LANCE_CPU_THREADS=4.


fn fts_posting_pipeline_insufficient_cpu_threads_message(available_cpu_threads: usize) -> String {
format!(
"FTS inverted index build requires at least {MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE} \
lance-cpu blocking threads, but only {available_cpu_threads} is available. \
Posting-list batch encoding and Lance file page encoding both use the same global \
`spawn_cpu` pool; with fewer than \
{MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE} threads the pipelined writer deadlocks \
silently after logging \"writing N posting lists\". \
Set environment variable LANCE_CPU_THREADS to at least \
{MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE}, or use a machine where \
num_cpus > LANCE_IO_CORE_RESERVATION (currently reserving {} cores for I/O).",
*IO_CORE_RESERVATION
)
}

fn check_fts_posting_pipeline_cpu_threads() -> Result<()> {
let available_cpu_threads = get_num_compute_intensive_cpus();
if available_cpu_threads < MIN_CPU_THREADS_FOR_FTS_POSTING_PIPELINE {
let message = fts_posting_pipeline_insufficient_cpu_threads_message(available_cpu_threads);
log::error!("{message}");
return Err(Error::invalid_input(message));
}
Ok(())
}

/// Limits concurrent [`InnerBuilder::write_posting_lists_pipelined`] calls so that
/// at least one [`spawn_cpu`] thread remains free for nested page encoding inside
/// `FileWriter::write_batch`.
///
/// Each pipelined writer holds one `spawn_cpu` thread for its producer. Without
/// a free thread, the consumer deadlocks while waiting for page encoding.
/// The semaphore permits `available - 1` concurrent writers regardless of how
/// many workers exist or how many flush simultaneously.
fn fts_posting_write_semaphore() -> &'static tokio::sync::Semaphore {
static SEMAPHORE: LazyLock<tokio::sync::Semaphore> = LazyLock::new(|| {
let available = get_num_compute_intensive_cpus();
// With available < 2 the permit count would be 0, but the early guard
// in write_posting_lists rejects before we reach acquire.
let permits = available.saturating_sub(1).max(1);
tokio::sync::Semaphore::new(permits)
});
&SEMAPHORE
}

fn merge_all_tail_partitions(tails: Vec<TailPartition>) -> Result<Option<InnerBuilder>> {
if tails.is_empty() {
return Ok(None);
Expand Down Expand Up @@ -1039,6 +1091,24 @@ impl InnerBuilder {
store: &dyn IndexStore,
docs: Arc<DocSet>,
path: &str,
) -> Result<IndexFile> {
check_fts_posting_pipeline_cpu_threads()?;
let _permit = fts_posting_write_semaphore()
.acquire()
.await
.expect("fts posting write semaphore is never closed");
self.write_posting_lists_pipelined(store, docs, path).await
}

/// Pipelined posting-list writer used by [`Self::write_posting_lists`].
///
/// Exposed for deadlock regression tests that must bypass the CPU-thread guard.
#[instrument(level = "debug", skip_all)]
async fn write_posting_lists_pipelined(
&mut self,
store: &dyn IndexStore,
docs: Arc<DocSet>,
path: &str,
) -> Result<IndexFile> {
let id = self.id;
let mut writer = store
Expand Down Expand Up @@ -2162,6 +2232,241 @@ mod tests {
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;

fn run_test_in_child(test_name: &str, child_env: &str, cpu_threads: usize) {
let output = std::process::Command::new(std::env::current_exe().expect("test executable"))
.env(child_env, "1")
.env("LANCE_CPU_THREADS", cpu_threads.to_string())
.args(["--exact", test_name, "--nocapture"])
.output()
.expect("spawn test child process");

assert!(
output.status.success(),
"child test failed:\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
);
}

#[test]
fn test_fts_posting_pipeline_cpu_threads_error_message() {
let message = fts_posting_pipeline_insufficient_cpu_threads_message(1);
assert!(
message.contains("LANCE_CPU_THREADS"),
"message should mention LANCE_CPU_THREADS: {message}"
);
assert!(
message.contains("deadlock"),
"message should mention deadlock: {message}"
);
assert!(
message.contains("writing N posting lists"),
"message should reference the last log line users see: {message}"
);
}

async fn fts_posting_pipeline_deadlock_reproducer() -> Result<()> {
assert_eq!(
get_num_compute_intensive_cpus(),
1,
"deadlock reproducer child must run with LANCE_CPU_THREADS=1"
);

let (tx, rx) = async_channel::bounded(1usize);
let producer = spawn_cpu(move || -> Result<()> {
tx.send_blocking(1u32)
.map_err(|err| Error::execution(format!("send batch 1: {err}")))?;
tx.send_blocking(2u32)
.map_err(|err| Error::execution(format!("send batch 2: {err}")))?;
// Third send blocks on the full queue while the consumer awaits nested `spawn_cpu`.
tx.send_blocking(3u32)
.map_err(|err| Error::execution(format!("send batch 3: {err}")))?;
Ok(())
});

while let Ok(_batch) = rx.recv().await {
// Stand in for Lance `FileWriter::write_batch` page encoding (`spawn_cpu`).
spawn_cpu(|| Ok::<(), Error>(())).await?;
}

producer.await?;
Ok(())
}

/// Reproduces the historical FTS posting-list write deadlock in a fresh child process
/// with `LANCE_CPU_THREADS=1` (single `lance-cpu` blocking thread).
///
/// Mirrors [`InnerBuilder::write_posting_lists_pipelined`]: producer on `spawn_cpu` with a
/// depth-1 channel, consumer awaiting nested `spawn_cpu` (as in Lance page encoding).
/// The guarded [`InnerBuilder::write_posting_lists`] fails fast instead.
#[test]
fn test_fts_posting_pipeline_write_posting_lists_deadlocks_with_one_cpu_thread() {
if std::env::var("LANCE_FTS_DEADLOCK_CHILD").as_deref() == Ok("1") {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.build()
.expect("build tokio runtime for deadlock reproducer");

let timed_out = runtime.block_on(async {
tokio::time::timeout(
Duration::from_secs(10),
fts_posting_pipeline_deadlock_reproducer(),
)
.await
});

assert!(
timed_out.is_err(),
"unguarded posting-list pipeline should deadlock with LANCE_CPU_THREADS=1"
);
return;
}

run_test_in_child(
"scalar::inverted::builder::tests::test_fts_posting_pipeline_write_posting_lists_deadlocks_with_one_cpu_thread",
"LANCE_FTS_DEADLOCK_CHILD",
1,
);
}

#[test]
fn test_empty_update_with_one_cpu_thread_records_deleted_fragments() {
if std::env::var("LANCE_FTS_EMPTY_UPDATE_CHILD").as_deref() == Ok("1") {
assert_eq!(
get_num_compute_intensive_cpus(),
1,
"empty-update child must run with LANCE_CPU_THREADS=1"
);

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.build()
.expect("build tokio runtime for empty update");
runtime.block_on(async {
let index_dir = TempDir::default();
let store = Arc::new(LanceIndexStore::new(
ObjectStore::local().into(),
index_dir.obj_path(),
Arc::new(LanceCache::no_cache()),
));
let schema = make_doc_batch("unused", 0).schema();
let stream = RecordBatchStreamAdapter::new(
schema,
stream::empty::<lance_core::error::DataFusionResult<RecordBatch>>(),
);
let old_data_filter = Some(crate::scalar::OldIndexDataFilter::Fragments {
to_keep: RoaringBitmap::new(),
to_remove: RoaringBitmap::from_iter([3, 7]),
});

let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
builder
.update(Box::pin(stream), store.as_ref(), old_data_filter)
.await
.expect("empty update should not require the posting-list writer");

let index = InvertedIndex::load(store, None, &LanceCache::no_cache())
.await
.expect("load updated index");
assert_eq!(index.deleted_fragments(), &RoaringBitmap::from_iter([3, 7]));
});
return;
}

run_test_in_child(
"scalar::inverted::builder::tests::test_empty_update_with_one_cpu_thread_records_deleted_fragments",
"LANCE_FTS_EMPTY_UPDATE_CHILD",
1,
);
}

/// With LANCE_CPU_THREADS=2 the semaphore grants only 1 permit. Two concurrent
/// pipelined writers should serialize without deadlock.
#[test]
fn test_fts_posting_semaphore_serializes_with_two_cpu_threads() {
if std::env::var("LANCE_FTS_SEMAPHORE_CHILD").as_deref() == Ok("1") {
assert_eq!(
get_num_compute_intensive_cpus(),
2,
"semaphore child must run with LANCE_CPU_THREADS=2"
);

let semaphore = fts_posting_write_semaphore();
assert_eq!(
semaphore.available_permits(),
1,
"with 2 CPU threads the semaphore should have 1 permit"
);

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.build()
.expect("build tokio runtime for semaphore test");

runtime.block_on(async {
// Two tasks both acquire the semaphore.
let task1 = tokio::spawn(async {
let _permit = semaphore.acquire().await.expect("acquire permit");
spawn_cpu(|| Ok::<_, Error>(()))
.await
.expect("spawn_cpu ok");
});
let task2 = tokio::spawn(async {
let _permit = semaphore.acquire().await.expect("acquire permit");
spawn_cpu(|| Ok::<_, Error>(()))
.await
.expect("spawn_cpu ok");
});

let result = tokio::time::timeout(Duration::from_secs(10), async {
task1.await.expect("task1 join");
task2.await.expect("task2 join");
})
.await;
assert!(
result.is_ok(),
"two semaphore-controlled tasks should complete without deadlock"
);
});
return;
}

run_test_in_child(
"scalar::inverted::builder::tests::test_fts_posting_semaphore_serializes_with_two_cpu_threads",
"LANCE_FTS_SEMAPHORE_CHILD",
2,
);
}

/// With LANCE_CPU_THREADS=4 the semaphore should have 3 permits.
#[test]
fn test_fts_posting_semaphore_permits_scale_with_threads() {
if std::env::var("LANCE_FTS_SEMAPHORE_PERMITS_CHILD").as_deref() == Ok("1") {
assert_eq!(
get_num_compute_intensive_cpus(),
4,
"semaphore permits child must run with LANCE_CPU_THREADS=4"
);

let semaphore = fts_posting_write_semaphore();
assert_eq!(
semaphore.available_permits(),
3,
"with 4 CPU threads the semaphore should have 3 permits (4 - 1)"
);
return;
}

run_test_in_child(
"scalar::inverted::builder::tests::test_fts_posting_semaphore_permits_scale_with_threads",
"LANCE_FTS_SEMAPHORE_PERMITS_CHILD",
4,
);
}

fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("doc", DataType::Utf8, true),
Expand Down
Loading