From 55c22427aed475294e500c254783e0575eb95236 Mon Sep 17 00:00:00 2001 From: Prom3theu5 Date: Tue, 2 Jun 2026 12:07:51 +0100 Subject: [PATCH] fix(index): address qodo review findings on batch write and parse progress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ladybug_store: move single-use row lists into Value::List instead of cloning them per stage; only the small shared path_rows is cloned. Adds a run_stage helper so each stage executes directly. - indexer: correct the manifest-error comment — parse errors now abort the run before the batched write stage (a more atomic failure mode than the old write-as-you-go loop). - indexer: harden the parse progress observer against a termination race — read `done` before the counter and always emit the final count before breaking; strengthen the worker increment to Release for a real happens-before on the final count. - Cargo.lock: bump synapse entry 0.2.1 -> 0.2.2 to match Cargo.toml so --locked CI/release builds pass. Verified: cargo fmt --check, clippy --all-targets, test --locked (47 pass), and three independent re-index runs of a 1,593-file repo produce byte-identical symbol/package/edge output (determinism preserved). --- Cargo.lock | 2 +- src/graph/ladybug_store.rs | 49 +++++++++++++++++++------------------- src/indexer/mod.rs | 23 ++++++++++++++---- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f14bcfe..d8374f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1989,7 +1989,7 @@ dependencies = [ [[package]] name = "synapse" -version = "0.2.1" +version = "0.2.2" dependencies = [ "anyhow", "blake3", diff --git a/src/graph/ladybug_store.rs b/src/graph/ladybug_store.rs index 4b4d123..6410df0 100644 --- a/src/graph/ladybug_store.rs +++ b/src/graph/ladybug_store.rs @@ -99,6 +99,21 @@ fn parse_kind(s: &str) -> SymbolKind { SymbolKind::from_str_opt(s).unwrap_or(SymbolKind::Function) } +/// Execute one `UNWIND $rows`-style batch stage, moving `rows` into the list +/// parameter so no clone is needed. An empty list is a no-op. +fn run_stage(conn: &Connection<'_>, cypher: &str, rows: Vec) -> Result<()> { + if rows.is_empty() { + return Ok(()); + } + let child_ty: lbug::LogicalType = (&rows[0]).into(); + let mut stmt = conn + .prepare(cypher) + .map_err(|e| anyhow!("preparing `{cypher}`: {e}"))?; + conn.execute(&mut stmt, vec![("rows", Value::List(child_ty, rows))]) + .map_err(|e| anyhow!("executing `{cypher}`: {e}"))?; + Ok(()) +} + impl GraphStore for LadybugGraphStore { fn initialize_schema(&self) -> Result<()> { let _guard = self.lock.lock().unwrap(); @@ -561,33 +576,19 @@ impl GraphStore for LadybugGraphStore { }) .collect(); - // (cypher, rows): runs in order. Empty row lists are skipped. - let stages: [(&str, &Vec); 6] = [ - (REMOVE_DECLARED, &path_rows), - (REMOVE_BY_FILEPATH, &path_rows), - (REMOVE_FILE, &path_rows), - (UPSERT_FILE, &file_rows), - (UPSERT_SYMBOL, &symbol_rows), - (LINK_DECLARES, &declares_rows), - ]; - conn.query("BEGIN TRANSACTION") .map_err(|e| anyhow!("begin transaction: {e}"))?; let result = (|| -> Result<()> { - for (q, rows) in stages { - if rows.is_empty() { - continue; - } - let child_ty: lbug::LogicalType = (&rows[0]).into(); - let mut stmt = conn - .prepare(q) - .map_err(|e| anyhow!("preparing `{q}`: {e}"))?; - conn.execute( - &mut stmt, - vec![("rows", Value::List(child_ty, rows.clone()))], - ) - .map_err(|e| anyhow!("executing `{q}`: {e}"))?; - } + // Run each stage in order; empty row lists are skipped. The three + // `path_rows` stages share one list, so it's cloned per use (it's the + // small per-file list). The large per-symbol lists (`symbol_rows`, + // `declares_rows`) are single-use and moved in by value — no clone. + run_stage(&conn, REMOVE_DECLARED, path_rows.clone())?; + run_stage(&conn, REMOVE_BY_FILEPATH, path_rows.clone())?; + run_stage(&conn, REMOVE_FILE, path_rows)?; + run_stage(&conn, UPSERT_FILE, file_rows)?; + run_stage(&conn, UPSERT_SYMBOL, symbol_rows)?; + run_stage(&conn, LINK_DECLARES, declares_rows)?; Ok(()) })(); match result { diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index d227377..ff57385 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -200,7 +200,10 @@ fn parse_file(ctx: &ParseContext<'_>, rel: &str) -> Result { } // Manifest parsing -> projects/packages/edges (pure parse only). A parse - // error aborts the whole index, exactly as the previous inline `?` did. + // error propagates out of `parse_file`; the sequential drain surfaces it via + // `work?` and aborts the run *before* the batched write stage executes. This + // is a more atomic failure mode than the old write-as-you-go loop, which + // could leave earlier files already committed when a later manifest failed. let manifest = if rel.ends_with(".csproj") { Some(parse_csproj_manifest(rel, &abs, ctx.central)?) } else if rel.ends_with("package.json") { @@ -321,8 +324,7 @@ pub fn index_repo( let parsed_count = &parsed_count; let done = &done; scope.spawn(move || { - loop { - let n = parsed_count.load(std::sync::atomic::Ordering::Relaxed); + let emit = |n: usize| { cb( "", &IndexProgress { @@ -333,7 +335,18 @@ pub fn index_repo( ..IndexProgress::default() }, ); - if done.load(std::sync::atomic::Ordering::Acquire) { + }; + loop { + // Observe `done` first, then read the counter. `done` is + // stored (Release) only after `par_iter().collect()` has + // joined every worker, so all the `fetch_add(Release)` + // increments happen-before it. Reading `done` via Acquire + // therefore guarantees the following Acquire load sees the + // final count — emit it and stop, so the last update always + // reflects the true total rather than a stale lower value. + let finished = done.load(std::sync::atomic::Ordering::Acquire); + emit(parsed_count.load(std::sync::atomic::Ordering::Acquire)); + if finished { break; } std::thread::sleep(std::time::Duration::from_millis(80)); @@ -344,7 +357,7 @@ pub fn index_repo( .par_iter() .map(|rel| { let w = parse_file(&parse_ctx, rel); - parsed_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + parsed_count.fetch_add(1, std::sync::atomic::Ordering::Release); w }) .collect();