Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 134 additions & 66 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,20 @@ pub struct IndexProgress {
/// scan into edge resolution (e.g. "resolving references"). `None` during
/// the file scan. Lets the UI show what the otherwise-frozen bar is doing.
pub phase: Option<&'static str>,
/// Progress *within* the current `phase`, as `(done, total)` — e.g. files
/// resolved so far in the references pass. `None` when a phase has no
/// meaningful sub-count (or during the file scan). Lets the UI show that a
/// long phase is advancing rather than hung.
pub phase_progress: Option<(usize, usize)>,
}

/// A progress observer invoked once per candidate file as indexing proceeds.
/// A progress observer invoked as indexing proceeds.
///
/// Receives the current file path and a live [`IndexProgress`] snapshot. Kept as
/// a plain callback so the indexer stays UI-agnostic — the binary wires this to
/// a progress bar; tests can ignore it.
pub type ProgressFn<'a> = dyn Fn(&str, &IndexProgress) + 'a;
/// a progress bar; tests can ignore it. `Sync` so a scoped observer thread can
/// drive it while the parallel parse runs (see `index_repo`).
pub type ProgressFn<'a> = dyn Fn(&str, &IndexProgress) + Sync + 'a;

/// The result of parsing one candidate file off the main thread — everything
/// needed to write it to the store, with zero store access. The parallel parse
Expand Down Expand Up @@ -288,20 +294,12 @@ pub fn index_repo(
// input order, so the resulting `Vec` is index-aligned with `candidates` —
// the sequential drain below then writes in candidate order, making symbol
// insertion and `pending_*` ordering byte-identical to the old loop.
if let Some(cb) = progress {
cb(
"",
&IndexProgress {
processed: 0,
total,
files_indexed: 0,
symbols: 0,
projects: 0,
packages: 0,
phase: Some("parsing files"),
},
);
}
//
// The parse is where the real per-file wall-clock now is, but `ProgressFn`
// isn't `Sync`, so rayon workers can't call it. Instead each worker bumps a
// shared atomic, and a scoped observer thread on the side polls that counter
// and drives the progress callback (which stays on this thread) — so the bar
// climbs smoothly during the parse instead of jumping 0 -> N.
let parse_ctx = ParseContext {
repo,
config,
Expand All @@ -313,10 +311,46 @@ pub fn index_repo(
central: &central,
now,
};
let parsed: Vec<Result<FileWork>> = candidates
.par_iter()
.map(|rel| parse_file(&parse_ctx, rel))
.collect();
let parsed_count = std::sync::atomic::AtomicUsize::new(0);
let done = std::sync::atomic::AtomicBool::new(false);
let parsed: Vec<Result<FileWork>> = std::thread::scope(|scope| {
// Observer: report parse progress until the parse finishes. Only spawned
// when there's a progress sink; it borrows `progress`/`parsed_count` and
// is joined before the scope ends (so the borrows are sound).
if let Some(cb) = progress {
let parsed_count = &parsed_count;
let done = &done;
scope.spawn(move || {
loop {
let n = parsed_count.load(std::sync::atomic::Ordering::Relaxed);
cb(
"",
&IndexProgress {
processed: n,
total,
phase: Some("parsing files"),
phase_progress: Some((n, total)),
..IndexProgress::default()
},
);
if done.load(std::sync::atomic::Ordering::Acquire) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(80));
}
});
}
let out: Vec<Result<FileWork>> = candidates
.par_iter()
.map(|rel| {
let w = parse_file(&parse_ctx, rel);
parsed_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
w
})
.collect();
done.store(true, std::sync::atomic::Ordering::Release);
out
});

// Stage 2 — sequential drain. Walk the parsed results in candidate order,
// accumulating each changed file's node payload into `file_writes` and its
Expand All @@ -327,21 +361,8 @@ pub fn index_repo(
// worker, and write order stays candidate-ordered so output is unchanged.
let mut file_writes: Vec<crate::graph::model::FileWrite> = Vec::new();
let mut manifest_writes: Vec<ManifestWrite> = Vec::new();
for (i, (rel, work)) in candidates.iter().zip(parsed).enumerate() {
for (_rel, work) in candidates.iter().zip(parsed) {
let work = work?;
if let Some(cb) = progress {
let snap = IndexProgress {
processed: i + 1,
total,
files_indexed: outcome.files_indexed,
symbols: outcome.symbols,
projects: projects_seen,
packages: 0,
phase: None,
};
cb(rel, &snap);
}

let work = match work {
FileWork::SkipCounted => {
outcome.files_skipped_unchanged += 1;
Expand Down Expand Up @@ -373,40 +394,52 @@ pub fn index_repo(
}
}

// Write every file + its symbols + DECLARES edges in one transaction.
// Post-loop store work runs after the per-file scan (so link targets all
// exist) and can be the bulk of wall-clock on large repos, so each step
// reports a phase to the progress UI — otherwise the bar sits frozen at N/N
// and looks hung. `report_phase` emits a snapshot tagged with the phase and
// an optional `(done, total)` sub-count so a long phase visibly advances.
let report_phase =
|phase: &'static str, files: usize, symbols: usize, sub: Option<(usize, usize)>| {
if let Some(cb) = progress {
cb(
"",
&IndexProgress {
processed: total,
total,
files_indexed: files,
symbols,
projects: projects_seen,
packages: 0,
phase: Some(phase),
phase_progress: sub,
},
);
}
};

// Write every file + its symbols + DECLARES edges in one transaction, then
// the manifests (projects/packages + their edges), in candidate order.
report_phase(
"writing graph",
outcome.files_indexed,
outcome.symbols,
None,
);
store.write_files_batch(&file_writes)?;
// Then the manifests (projects/packages + their edges), in candidate order.
for manifest in &manifest_writes {
outcome.edges += write_manifest(store, manifest)?;
}
// Post-loop edge-resolution passes. These run after the per-file scan (so
// their link targets all exist) and can be the bulk of wall-clock on large
// repos, so each reports a phase to the progress UI — otherwise the bar
// sits frozen at N/N and looks hung. `report_phase` reuses the same
// callback as the file scan, emitting a snapshot tagged with the phase.
// It's a fn (not a closure) taking the live counts so it doesn't hold a
// borrow of `outcome` across the mutations below.
let report_phase = |phase: &'static str, files: usize, symbols: usize| {
if let Some(cb) = progress {
cb(
"",
&IndexProgress {
processed: total,
total,
files_indexed: files,
symbols,
projects: projects_seen,
packages: 0,
phase: Some(phase),
},
);
}
};

// Resolve collected imports to known packages -> IMPORTS_PACKAGE edges.
// Done after the main loop so every manifest has registered its packages.
if !pending_imports.is_empty() {
report_phase("resolving imports", outcome.files_indexed, outcome.symbols);
report_phase(
"resolving imports",
outcome.files_indexed,
outcome.symbols,
None,
);
outcome.edges += resolve_imports(store, &pending_imports)?;
}

Expand All @@ -416,27 +449,47 @@ pub fn index_repo(
// bottleneck on large repos), build one in-memory index from a single full
// scan and resolve against it. Done after the main loop so every symbol
// (the link targets) exists. Skip building it when there's nothing to
// resolve.
// resolve. Each pass reports its per-file progress so the bar advances.
if !pending_supertypes.is_empty() || !pending_references.is_empty() {
let symbol_index = SymbolIndex::build(store)?;

if !pending_supertypes.is_empty() {
let n = pending_supertypes.len();
report_phase(
"resolving type relationships",
outcome.files_indexed,
outcome.symbols,
Some((0, n)),
);
outcome.edges += resolve_supertypes(store, &symbol_index, &pending_supertypes)?;
outcome.edges +=
resolve_supertypes(store, &symbol_index, &pending_supertypes, &|done| {
report_phase(
"resolving type relationships",
outcome.files_indexed,
outcome.symbols,
Some((done, n)),
);
})?;
}

// References are cross-file, so all declarations must already exist.
if !pending_references.is_empty() {
let n = pending_references.len();
report_phase(
"resolving references",
outcome.files_indexed,
outcome.symbols,
Some((0, n)),
);
outcome.edges += resolve_references(store, &symbol_index, &pending_references)?;
outcome.edges +=
resolve_references(store, &symbol_index, &pending_references, &|done| {
report_phase(
"resolving references",
outcome.files_indexed,
outcome.symbols,
Some((done, n)),
);
})?;
}
}

Expand All @@ -448,6 +501,7 @@ pub fn index_repo(
"linking project membership",
outcome.files_indexed,
outcome.symbols,
None,
);
let contains_edges: Vec<crate::graph::model::GraphEdge> = candidates
.iter()
Expand Down Expand Up @@ -607,6 +661,16 @@ impl SymbolIndex {
}
}

/// Report resolve progress at a coarse cadence — at the start and roughly every
/// 1% of `total` — so the progress bar advances without the callback dominating
/// the (now cheap) per-file work on large repos.
fn maybe_report(report: &dyn Fn(usize), i: usize, total: usize) {
let step = (total / 100).max(1);
if i.is_multiple_of(step) {
report(i);
}
}

/// Whether `candidate_path` lives in the same project directory as a file whose
/// parent directory is `project_dir`. Segment-safe: `src` does not match
/// `src2/foo` — the match must fall on a `/` boundary (or be the dir itself).
Expand All @@ -632,11 +696,13 @@ fn resolve_supertypes(
store: &dyn GraphStore,
index: &SymbolIndex,
pending: &[(String, Vec<tree_sitter::Supertype>)],
report: &dyn Fn(usize),
) -> Result<usize> {
use crate::graph::model::{GraphEdge, SymbolKind};

let mut batch: Vec<GraphEdge> = Vec::new();
for (file, supers) in pending {
for (i, (file, supers)) in pending.iter().enumerate() {
maybe_report(report, i, pending.len());
let project_prefix = file.rsplit_once('/').map(|(d, _)| d).unwrap_or("");
for st in supers {
// The child symbol must be declared in this file.
Expand Down Expand Up @@ -720,13 +786,15 @@ fn resolve_references(
store: &dyn GraphStore,
index: &SymbolIndex,
pending: &[(String, Vec<tree_sitter::Reference>)],
report: &dyn Fn(usize),
) -> Result<usize> {
use crate::graph::model::IndexedSymbol;

// Accumulate edges and write them in one batch (one transaction) at the end
// rather than one DB statement per edge — this is what removes the stall.
let mut batch: Vec<crate::graph::model::GraphEdge> = Vec::new();
for (file, refs) in pending {
for (i, (file, refs)) in pending.iter().enumerate() {
maybe_report(report, i, pending.len());
let project_prefix = file.rsplit_once('/').map(|(d, _)| d).unwrap_or("");

for r in refs {
Expand Down
13 changes: 9 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,16 @@ fn cmd_index(cwd: &Path, args: cli::IndexArgs) -> Result<()> {
pb.set_length(p.total as u64);
pb.set_position(p.processed as u64);
// Bottom line: the current file during the scan, or the active
// post-loop phase (e.g. "resolving references…") once the per-file
// scan is done — so the bar at N/N shows work rather than looking
// hung while edges are written.
// post-loop phase (e.g. "resolving references 412/1559…") once the
// per-file scan is done — so the bar shows work rather than looking
// hung while the graph is written and edges resolved.
let bottom = match p.phase {
Some(phase) => format!("\x1b[2m{phase}…\x1b[0m"),
Some(phase) => match p.phase_progress {
Some((done, total)) => {
format!("\x1b[2m{phase} {done}/{total}…\x1b[0m")
}
None => format!("\x1b[2m{phase}…\x1b[0m"),
},
None => format!("\x1b[2m{}\x1b[0m", truncate_middle(current, 64)),
};
pb.set_message(format!(
Expand Down
Loading