From 0db98bbbce4ee3ecc7152c0090ee630e3f0033d4 Mon Sep 17 00:00:00 2001 From: Prom3theu5 Date: Tue, 2 Jun 2026 11:49:45 +0100 Subject: [PATCH] fix(index): live progress during parallel parse and resolve phases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The parallel-parse refactor broke the progress bar: the parse ran inside one `par_iter().collect()` with no per-file callback, so the bar jumped straight from 0 to N, and each resolve phase ("resolving references", etc.) reported once and then sat on a static label for seconds, looking hung. - Parse: a scoped observer thread polls an atomic counter the rayon workers bump and drives the progress callback (which stays off the worker threads), so the bar climbs smoothly 0 -> N during the parse — where the per-file wall-clock now is. `ProgressFn` gains a `Sync` bound so the observer can share it; the binary's closure already qualifies. - Resolve/write phases: `IndexProgress` gains `phase_progress: (done, total)`. `resolve_supertypes`/`resolve_references` report per-file progress (coarsely, ~every 1%), and a "writing graph" phase is shown around the batched node write. The bar's bottom line now reads e.g. "resolving references 1203/1559…" and advances. Cosmetic only — indexed output (counts, edges, deterministic `related`) is unchanged. --- src/indexer/mod.rs | 200 ++++++++++++++++++++++++++++++--------------- src/main.rs | 13 ++- 2 files changed, 143 insertions(+), 70 deletions(-) diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 60432c2..d227377 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -69,14 +69,20 @@ pub struct IndexProgress { /// scan into edge resolution (e.g. "resolving references"). `None` during /// the file scan. Lets the UI show what the otherwise-frozen bar is doing. pub phase: Option<&'static str>, + /// Progress *within* the current `phase`, as `(done, total)` — e.g. files + /// resolved so far in the references pass. `None` when a phase has no + /// meaningful sub-count (or during the file scan). Lets the UI show that a + /// long phase is advancing rather than hung. + pub phase_progress: Option<(usize, usize)>, } -/// A progress observer invoked once per candidate file as indexing proceeds. +/// A progress observer invoked as indexing proceeds. /// /// Receives the current file path and a live [`IndexProgress`] snapshot. Kept as /// a plain callback so the indexer stays UI-agnostic — the binary wires this to -/// a progress bar; tests can ignore it. -pub type ProgressFn<'a> = dyn Fn(&str, &IndexProgress) + 'a; +/// a progress bar; tests can ignore it. `Sync` so a scoped observer thread can +/// drive it while the parallel parse runs (see `index_repo`). +pub type ProgressFn<'a> = dyn Fn(&str, &IndexProgress) + Sync + 'a; /// The result of parsing one candidate file off the main thread — everything /// needed to write it to the store, with zero store access. The parallel parse @@ -288,20 +294,12 @@ pub fn index_repo( // input order, so the resulting `Vec` is index-aligned with `candidates` — // the sequential drain below then writes in candidate order, making symbol // insertion and `pending_*` ordering byte-identical to the old loop. - if let Some(cb) = progress { - cb( - "", - &IndexProgress { - processed: 0, - total, - files_indexed: 0, - symbols: 0, - projects: 0, - packages: 0, - phase: Some("parsing files"), - }, - ); - } + // + // The parse is where the real per-file wall-clock now is, but `ProgressFn` + // isn't `Sync`, so rayon workers can't call it. Instead each worker bumps a + // shared atomic, and a scoped observer thread on the side polls that counter + // and drives the progress callback (which stays on this thread) — so the bar + // climbs smoothly during the parse instead of jumping 0 -> N. let parse_ctx = ParseContext { repo, config, @@ -313,10 +311,46 @@ pub fn index_repo( central: ¢ral, now, }; - let parsed: Vec> = candidates - .par_iter() - .map(|rel| parse_file(&parse_ctx, rel)) - .collect(); + let parsed_count = std::sync::atomic::AtomicUsize::new(0); + let done = std::sync::atomic::AtomicBool::new(false); + let parsed: Vec> = std::thread::scope(|scope| { + // Observer: report parse progress until the parse finishes. Only spawned + // when there's a progress sink; it borrows `progress`/`parsed_count` and + // is joined before the scope ends (so the borrows are sound). + if let Some(cb) = progress { + let parsed_count = &parsed_count; + let done = &done; + scope.spawn(move || { + loop { + let n = parsed_count.load(std::sync::atomic::Ordering::Relaxed); + cb( + "", + &IndexProgress { + processed: n, + total, + phase: Some("parsing files"), + phase_progress: Some((n, total)), + ..IndexProgress::default() + }, + ); + if done.load(std::sync::atomic::Ordering::Acquire) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(80)); + } + }); + } + let out: Vec> = candidates + .par_iter() + .map(|rel| { + let w = parse_file(&parse_ctx, rel); + parsed_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + w + }) + .collect(); + done.store(true, std::sync::atomic::Ordering::Release); + out + }); // Stage 2 — sequential drain. Walk the parsed results in candidate order, // accumulating each changed file's node payload into `file_writes` and its @@ -327,21 +361,8 @@ pub fn index_repo( // worker, and write order stays candidate-ordered so output is unchanged. let mut file_writes: Vec = Vec::new(); let mut manifest_writes: Vec = Vec::new(); - for (i, (rel, work)) in candidates.iter().zip(parsed).enumerate() { + for (_rel, work) in candidates.iter().zip(parsed) { let work = work?; - if let Some(cb) = progress { - let snap = IndexProgress { - processed: i + 1, - total, - files_indexed: outcome.files_indexed, - symbols: outcome.symbols, - projects: projects_seen, - packages: 0, - phase: None, - }; - cb(rel, &snap); - } - let work = match work { FileWork::SkipCounted => { outcome.files_skipped_unchanged += 1; @@ -373,40 +394,52 @@ pub fn index_repo( } } - // Write every file + its symbols + DECLARES edges in one transaction. + // Post-loop store work runs after the per-file scan (so link targets all + // exist) and can be the bulk of wall-clock on large repos, so each step + // reports a phase to the progress UI — otherwise the bar sits frozen at N/N + // and looks hung. `report_phase` emits a snapshot tagged with the phase and + // an optional `(done, total)` sub-count so a long phase visibly advances. + let report_phase = + |phase: &'static str, files: usize, symbols: usize, sub: Option<(usize, usize)>| { + if let Some(cb) = progress { + cb( + "", + &IndexProgress { + processed: total, + total, + files_indexed: files, + symbols, + projects: projects_seen, + packages: 0, + phase: Some(phase), + phase_progress: sub, + }, + ); + } + }; + + // Write every file + its symbols + DECLARES edges in one transaction, then + // the manifests (projects/packages + their edges), in candidate order. + report_phase( + "writing graph", + outcome.files_indexed, + outcome.symbols, + None, + ); store.write_files_batch(&file_writes)?; - // Then the manifests (projects/packages + their edges), in candidate order. for manifest in &manifest_writes { outcome.edges += write_manifest(store, manifest)?; } - // Post-loop edge-resolution passes. These run after the per-file scan (so - // their link targets all exist) and can be the bulk of wall-clock on large - // repos, so each reports a phase to the progress UI — otherwise the bar - // sits frozen at N/N and looks hung. `report_phase` reuses the same - // callback as the file scan, emitting a snapshot tagged with the phase. - // It's a fn (not a closure) taking the live counts so it doesn't hold a - // borrow of `outcome` across the mutations below. - let report_phase = |phase: &'static str, files: usize, symbols: usize| { - if let Some(cb) = progress { - cb( - "", - &IndexProgress { - processed: total, - total, - files_indexed: files, - symbols, - projects: projects_seen, - packages: 0, - phase: Some(phase), - }, - ); - } - }; // Resolve collected imports to known packages -> IMPORTS_PACKAGE edges. // Done after the main loop so every manifest has registered its packages. if !pending_imports.is_empty() { - report_phase("resolving imports", outcome.files_indexed, outcome.symbols); + report_phase( + "resolving imports", + outcome.files_indexed, + outcome.symbols, + None, + ); outcome.edges += resolve_imports(store, &pending_imports)?; } @@ -416,27 +449,47 @@ pub fn index_repo( // bottleneck on large repos), build one in-memory index from a single full // scan and resolve against it. Done after the main loop so every symbol // (the link targets) exists. Skip building it when there's nothing to - // resolve. + // resolve. Each pass reports its per-file progress so the bar advances. if !pending_supertypes.is_empty() || !pending_references.is_empty() { let symbol_index = SymbolIndex::build(store)?; if !pending_supertypes.is_empty() { + let n = pending_supertypes.len(); report_phase( "resolving type relationships", outcome.files_indexed, outcome.symbols, + Some((0, n)), ); - outcome.edges += resolve_supertypes(store, &symbol_index, &pending_supertypes)?; + outcome.edges += + resolve_supertypes(store, &symbol_index, &pending_supertypes, &|done| { + report_phase( + "resolving type relationships", + outcome.files_indexed, + outcome.symbols, + Some((done, n)), + ); + })?; } // References are cross-file, so all declarations must already exist. if !pending_references.is_empty() { + let n = pending_references.len(); report_phase( "resolving references", outcome.files_indexed, outcome.symbols, + Some((0, n)), ); - outcome.edges += resolve_references(store, &symbol_index, &pending_references)?; + outcome.edges += + resolve_references(store, &symbol_index, &pending_references, &|done| { + report_phase( + "resolving references", + outcome.files_indexed, + outcome.symbols, + Some((done, n)), + ); + })?; } } @@ -448,6 +501,7 @@ pub fn index_repo( "linking project membership", outcome.files_indexed, outcome.symbols, + None, ); let contains_edges: Vec = candidates .iter() @@ -607,6 +661,16 @@ impl SymbolIndex { } } +/// Report resolve progress at a coarse cadence — at the start and roughly every +/// 1% of `total` — so the progress bar advances without the callback dominating +/// the (now cheap) per-file work on large repos. +fn maybe_report(report: &dyn Fn(usize), i: usize, total: usize) { + let step = (total / 100).max(1); + if i.is_multiple_of(step) { + report(i); + } +} + /// Whether `candidate_path` lives in the same project directory as a file whose /// parent directory is `project_dir`. Segment-safe: `src` does not match /// `src2/foo` — the match must fall on a `/` boundary (or be the dir itself). @@ -632,11 +696,13 @@ fn resolve_supertypes( store: &dyn GraphStore, index: &SymbolIndex, pending: &[(String, Vec)], + report: &dyn Fn(usize), ) -> Result { use crate::graph::model::{GraphEdge, SymbolKind}; let mut batch: Vec = Vec::new(); - for (file, supers) in pending { + for (i, (file, supers)) in pending.iter().enumerate() { + maybe_report(report, i, pending.len()); let project_prefix = file.rsplit_once('/').map(|(d, _)| d).unwrap_or(""); for st in supers { // The child symbol must be declared in this file. @@ -720,13 +786,15 @@ fn resolve_references( store: &dyn GraphStore, index: &SymbolIndex, pending: &[(String, Vec)], + report: &dyn Fn(usize), ) -> Result { use crate::graph::model::IndexedSymbol; // Accumulate edges and write them in one batch (one transaction) at the end // rather than one DB statement per edge — this is what removes the stall. let mut batch: Vec = Vec::new(); - for (file, refs) in pending { + for (i, (file, refs)) in pending.iter().enumerate() { + maybe_report(report, i, pending.len()); let project_prefix = file.rsplit_once('/').map(|(d, _)| d).unwrap_or(""); for r in refs { diff --git a/src/main.rs b/src/main.rs index 93802b6..cde79d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -204,11 +204,16 @@ fn cmd_index(cwd: &Path, args: cli::IndexArgs) -> Result<()> { pb.set_length(p.total as u64); pb.set_position(p.processed as u64); // Bottom line: the current file during the scan, or the active - // post-loop phase (e.g. "resolving references…") once the per-file - // scan is done — so the bar at N/N shows work rather than looking - // hung while edges are written. + // post-loop phase (e.g. "resolving references 412/1559…") once the + // per-file scan is done — so the bar shows work rather than looking + // hung while the graph is written and edges resolved. let bottom = match p.phase { - Some(phase) => format!("\x1b[2m{phase}…\x1b[0m"), + Some(phase) => match p.phase_progress { + Some((done, total)) => { + format!("\x1b[2m{phase} {done}/{total}…\x1b[0m") + } + None => format!("\x1b[2m{phase}…\x1b[0m"), + }, None => format!("\x1b[2m{}\x1b[0m", truncate_middle(current, 64)), }; pb.set_message(format!(