diff --git a/Cargo.lock b/Cargo.lock index 9852a5f..f14bcfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -570,6 +570,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "either" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" + [[package]] name = "encode_unicode" version = "1.0.0" @@ -1521,6 +1527,26 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rayon" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb39b166781f92d482534ef4b4b1b2568f42613b53e5b6c160e24cfbfa30926d" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "regex" version = "1.12.3" @@ -1963,7 +1989,7 @@ dependencies = [ [[package]] name = "synapse" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "blake3", @@ -1976,6 +2002,7 @@ dependencies = [ "lbug", "oci-client", "quick-xml", + "rayon", "serde", "serde_json", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 73f37ce..4937680 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "synapse" -version = "0.2.0" +version = "0.2.1" edition = "2024" [[bin]] @@ -27,6 +27,7 @@ chrono = { version = "0.4.44", features = ["serde"] } clap = { version = "4.6.1", features = ["derive"] } globset = "0.4.18" ignore = "0.4.25" +rayon = "1.10" quick-xml = "0.40.1" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.150" diff --git a/src/graph/ladybug_store.rs b/src/graph/ladybug_store.rs index af28055..4b4d123 100644 --- a/src/graph/ladybug_store.rs +++ b/src/graph/ladybug_store.rs @@ -362,26 +362,31 @@ impl GraphStore for LadybugGraphStore { let _guard = self.lock.lock().unwrap(); let conn = self.conn()?; - // The Cypher for each edge kind. Same MERGE shape as the per-edge - // `link_*` methods; here the statement is prepared ONCE per kind and - // re-executed per row, all inside one transaction — so we pay a single - // commit instead of one per edge (the source of the end-of-index stall). + // One `UNWIND $rows` statement per edge kind: all edges of a kind are + // passed as a single list-of-structs parameter and merged in one + // `execute`, so a 61k-edge batch is ~5 FFI calls instead of 61k. Same + // MERGE shape as the per-edge `link_*` methods; idempotent. let cypher = |e: &GraphEdge| -> &'static str { match e { GraphEdge::SymbolReferences { .. } => { - "MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:REFERENCES]->(b)" + "UNWIND $rows AS r MATCH (a:Symbol {id: r.a}), (b:Symbol {id: r.b}) \ + MERGE (a)-[:REFERENCES]->(b)" } GraphEdge::SymbolInherits { .. } => { - "MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:INHERITS]->(b)" + "UNWIND $rows AS r MATCH (a:Symbol {id: r.a}), (b:Symbol {id: r.b}) \ + MERGE (a)-[:INHERITS]->(b)" } GraphEdge::SymbolImplements { .. } => { - "MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:IMPLEMENTS]->(b)" + "UNWIND $rows AS r MATCH (a:Symbol {id: r.a}), (b:Symbol {id: r.b}) \ + MERGE (a)-[:IMPLEMENTS]->(b)" } GraphEdge::FileImportsPackage { .. } => { - "MATCH (f:File {id: $a}), (k:Package {id: $b}) MERGE (f)-[:IMPORTS_PACKAGE]->(k)" + "UNWIND $rows AS r MATCH (f:File {id: r.a}), (k:Package {id: r.b}) \ + MERGE (f)-[:IMPORTS_PACKAGE]->(k)" } GraphEdge::ProjectContainsFile { .. } => { - "MATCH (p:Project {id: $a}), (f:File {id: $b}) MERGE (p)-[:CONTAINS_FILE]->(f)" + "UNWIND $rows AS r MATCH (p:Project {id: r.a}), (f:File {id: r.b}) \ + MERGE (p)-[:CONTAINS_FILE]->(f)" } } }; @@ -395,35 +400,48 @@ impl GraphStore for LadybugGraphStore { GraphEdge::ProjectContainsFile { project, file } => (project, file), } } + // Discriminant index, so edges keep their kinds grouped while preserving + // per-kind order (the Cypher is selected from the first edge of a group). + fn kind_ix(e: &GraphEdge) -> u8 { + match e { + GraphEdge::SymbolReferences { .. } => 0, + GraphEdge::SymbolInherits { .. } => 1, + GraphEdge::SymbolImplements { .. } => 2, + GraphEdge::FileImportsPackage { .. } => 3, + GraphEdge::ProjectContainsFile { .. } => 4, + } + } - // One prepared statement per distinct edge-kind Cypher, reused across - // all rows of that kind. - let mut prepared: std::collections::HashMap<&'static str, lbug::PreparedStatement> = - std::collections::HashMap::new(); + // Group edges by kind, preserving order within each kind. + let mut by_kind: std::collections::BTreeMap> = + std::collections::BTreeMap::new(); + for e in edges { + by_kind.entry(kind_ix(e)).or_default().push(e); + } conn.query("BEGIN TRANSACTION") .map_err(|e| anyhow!("begin transaction: {e}"))?; // Run the batch; on any error, roll back so a partial batch isn't left // half-committed, then surface the error. let result = (|| -> Result<()> { - for edge in edges { - let q = cypher(edge); - if !prepared.contains_key(q) { - let stmt = conn - .prepare(q) - .map_err(|e| anyhow!("preparing `{q}`: {e}"))?; - prepared.insert(q, stmt); - } - let stmt = prepared.get_mut(q).expect("just inserted"); - let (a, b) = endpoints(edge); - conn.execute( - stmt, - vec![ - ("a", Value::String(a.to_string())), - ("b", Value::String(b.to_string())), - ], - ) - .map_err(|e| anyhow!("executing batch edge: {e}"))?; + for group in by_kind.values() { + let q = cypher(group[0]); + let rows: Vec = group + .iter() + .map(|e| { + let (a, b) = endpoints(e); + Value::Struct(vec![ + ("a".to_string(), Value::String(a.to_string())), + ("b".to_string(), Value::String(b.to_string())), + ]) + }) + .collect(); + let child_ty: lbug::LogicalType = (&rows[0]).into(); + let mut stmt = conn + .prepare(q) + .map_err(|e| anyhow!("preparing `{q}`: {e}"))?; + conn.execute(&mut stmt, vec![("rows", Value::List(child_ty, rows))]) + .map_err(|e| anyhow!("executing batch edges: {e}"))?; } Ok(()) })(); @@ -446,6 +464,147 @@ impl GraphStore for LadybugGraphStore { } } + fn write_files_batch(&self, files: &[crate::graph::model::FileWrite]) -> Result<()> { + if files.is_empty() { + return Ok(()); + } + let _guard = self.lock.lock().unwrap(); + let conn = self.conn()?; + + // The whole batch is a handful of `UNWIND $rows` statements (one list + // parameter each) inside one transaction, instead of ~2 `execute` calls + // per symbol. Removes run first (clearing stale nodes), then file and + // symbol upserts, then DECLARES edges. MERGE is idempotent and all ids + // are deterministic, so the final graph is identical regardless of the + // intra-batch ordering this collapses. + const REMOVE_DECLARED: &str = "UNWIND $rows AS r MATCH (f:File {path: r.path})-[:DECLARES]->(s:Symbol) DETACH DELETE s"; + const REMOVE_BY_FILEPATH: &str = + "UNWIND $rows AS r MATCH (s:Symbol {filePath: r.path}) DETACH DELETE s"; + const REMOVE_FILE: &str = "UNWIND $rows AS r MATCH (f:File {path: r.path}) DETACH DELETE f"; + const UPSERT_FILE: &str = "UNWIND $rows AS r MERGE (f:File {id: r.id}) \ + SET f.path = r.path, f.language = r.language, f.hash = r.hash, \ + f.sizeBytes = r.size, f.tracked = r.tracked, f.lastIndexedAt = r.indexed"; + const UPSERT_SYMBOL: &str = "UNWIND $rows AS r MERGE (s:Symbol {id: r.id}) \ + SET s.name = r.name, s.fullName = r.full, s.kind = r.kind, s.language = r.language, \ + s.filePath = r.file, s.startLine = r.startln, s.endLine = r.endln, \ + s.visibility = r.vis, s.exported = r.exported"; + const LINK_DECLARES: &str = "UNWIND $rows AS r MATCH (f:File {id: r.f}), (s:Symbol {id: r.s}) \ + MERGE (f)-[:DECLARES]->(s)"; + + // Build the row lists up front (owned), so the transaction body is just + // a sequence of single `execute` calls. + let path_rows: Vec = files + .iter() + .map(|fw| { + Value::Struct(vec![( + "path".to_string(), + Value::String(fw.file.path.clone()), + )]) + }) + .collect(); + let file_rows: Vec = files + .iter() + .map(|fw| { + let f = &fw.file; + Value::Struct(vec![ + ("id".to_string(), Value::String(f.id.clone())), + ("path".to_string(), Value::String(f.path.clone())), + ( + "language".to_string(), + Value::String(lang_to_str(f.language)), + ), + ("hash".to_string(), Value::String(f.hash.clone())), + ("size".to_string(), Value::Int64(f.size_bytes as i64)), + ("tracked".to_string(), Value::Bool(f.tracked)), + ( + "indexed".to_string(), + Value::String(f.last_indexed_at.clone()), + ), + ]) + }) + .collect(); + let symbol_rows: Vec = files + .iter() + .flat_map(|fw| { + fw.symbols.iter().map(|sym| { + Value::Struct(vec![ + ("id".to_string(), Value::String(sym.id.clone())), + ("name".to_string(), Value::String(sym.name.clone())), + ("full".to_string(), Value::String(sym.full_name.clone())), + ( + "kind".to_string(), + Value::String(sym.kind.as_str().to_string()), + ), + ( + "language".to_string(), + Value::String(lang_to_str(sym.language)), + ), + ("file".to_string(), Value::String(sym.file_path.clone())), + ("startln".to_string(), Value::Int64(sym.start_line as i64)), + ("endln".to_string(), Value::Int64(sym.end_line as i64)), + ("vis".to_string(), Value::String(sym.visibility.clone())), + ("exported".to_string(), Value::Bool(sym.exported)), + ]) + }) + }) + .collect(); + let declares_rows: Vec = files + .iter() + .flat_map(|fw| { + let fid = fw.file.id.clone(); + fw.symbols.iter().map(move |sym| { + Value::Struct(vec![ + ("f".to_string(), Value::String(fid.clone())), + ("s".to_string(), Value::String(sym.id.clone())), + ]) + }) + }) + .collect(); + + // (cypher, rows): runs in order. Empty row lists are skipped. + let stages: [(&str, &Vec); 6] = [ + (REMOVE_DECLARED, &path_rows), + (REMOVE_BY_FILEPATH, &path_rows), + (REMOVE_FILE, &path_rows), + (UPSERT_FILE, &file_rows), + (UPSERT_SYMBOL, &symbol_rows), + (LINK_DECLARES, &declares_rows), + ]; + + conn.query("BEGIN TRANSACTION") + .map_err(|e| anyhow!("begin transaction: {e}"))?; + let result = (|| -> Result<()> { + for (q, rows) in stages { + if rows.is_empty() { + continue; + } + let child_ty: lbug::LogicalType = (&rows[0]).into(); + let mut stmt = conn + .prepare(q) + .map_err(|e| anyhow!("preparing `{q}`: {e}"))?; + conn.execute( + &mut stmt, + vec![("rows", Value::List(child_ty, rows.clone()))], + ) + .map_err(|e| anyhow!("executing `{q}`: {e}"))?; + } + Ok(()) + })(); + match result { + Ok(()) => match conn.query("COMMIT") { + Ok(_) => Ok(()), + Err(e) => { + let _ = conn.query("ROLLBACK"); + Err(anyhow!("commit transaction: {e}")) + } + }, + Err(err) => { + let _ = conn.query("ROLLBACK"); + Err(err) + } + } + } + fn symbol_type_relations(&self, symbol_name: &str) -> Result> { let _guard = self.lock.lock().unwrap(); let conn = self.conn()?; diff --git a/src/graph/model.rs b/src/graph/model.rs index 6067fe0..20fa74b 100644 --- a/src/graph/model.rs +++ b/src/graph/model.rs @@ -265,6 +265,23 @@ pub enum GraphEdge { ProjectContainsFile { project: String, file: String }, } +/// One file's complete (re)index payload, for the batched node-write path. +/// +/// Carries everything needed to replace a file's nodes in the graph: the file +/// node and every symbol it declares. The indexer collects these during the +/// drain and writes them in one transaction via +/// [`GraphStore::write_files_batch`](crate::graph::GraphStore::write_files_batch), +/// rather than one auto-committed statement per symbol — which is the dominant +/// cost of indexing a large repo. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileWrite { + /// The file node to upsert. Its declared symbols are removed first (clearing + /// stale ones from a previous index) then re-inserted from `symbols`. + pub file: IndexedFile, + /// Every symbol declared in the file, in deterministic insertion order. + pub symbols: Vec, +} + /// Aggregate counts for `status`/`index --stats`. #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct IndexStats { diff --git a/src/graph/store.rs b/src/graph/store.rs index f0cf3d5..3cbb894 100644 --- a/src/graph/store.rs +++ b/src/graph/store.rs @@ -5,8 +5,8 @@ //! replaceable. No backend-specific type ever appears in this signature. use crate::graph::model::{ - FileSearchQuery, IndexStats, IndexedFile, IndexedPackage, IndexedProject, IndexedSymbol, - RelatedItem, SymbolSearchQuery, + FileSearchQuery, FileWrite, IndexStats, IndexedFile, IndexedPackage, IndexedProject, + IndexedSymbol, RelatedItem, SymbolSearchQuery, }; use anyhow::Result; @@ -68,6 +68,28 @@ pub trait GraphStore { Ok(()) } + /// Apply many files' (re)index writes in one batch. For each [`FileWrite`]: + /// remove the file's existing nodes, upsert the file, upsert every declared + /// symbol, and create the `DECLARES` edges. Backends that support + /// transactions should write the whole batch in one transaction with reused + /// prepared statements — far faster than one auto-committed `upsert_symbol` + /// per symbol, which dominates indexing time on large repos. The default + /// implementation falls back to the per-file/per-symbol methods, preserving + /// the original ordering (remove -> upsert file -> upsert+link each symbol). + fn write_files_batch(&self, files: &[FileWrite]) -> Result<()> { + for fw in files { + self.remove_file(&fw.file.path)?; + let fid = fw.file.id.clone(); + self.upsert_file(fw.file.clone())?; + for sym in &fw.symbols { + let sid = sym.id.clone(); + self.upsert_symbol(sym.clone())?; + self.link_file_declares_symbol(&fid, &sid)?; + } + } + Ok(()) + } + fn symbols_matching(&self, query: &SymbolSearchQuery) -> Result>; fn files_matching(&self, query: &FileSearchQuery) -> Result>; fn related_to_symbol(&self, symbol: &str, depth: usize) -> Result>; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 3b131a6..60432c2 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -16,6 +16,7 @@ use crate::graph::GraphStore; use crate::graph::model::{IndexedFile, IndexedPackage, IndexedProject, Language}; use crate::repo::Repo; use anyhow::{Context, Result}; +use rayon::prelude::*; use std::collections::HashSet; use std::path::Path; @@ -77,6 +78,143 @@ pub struct IndexProgress { /// a progress bar; tests can ignore it. pub type ProgressFn<'a> = dyn Fn(&str, &IndexProgress) + 'a; +/// The result of parsing one candidate file off the main thread — everything +/// needed to write it to the store, with zero store access. The parallel parse +/// stage produces one of these per candidate (order-aligned with `candidates`); +/// the sequential drain consumes them in order, so symbol-insertion and +/// pending-vec ordering are byte-identical to the old single-threaded loop. +enum FileWork { + /// File was filtered (`--changed`) or unchanged — counted as "skipped + /// unchanged" in the outcome, matching the old loop's two `continue`s that + /// bumped `files_skipped_unchanged`. + SkipCounted, + /// File was unreadable — silently skipped with no counter bump, matching the + /// old loop's bare `continue` on a read error. + SkipUnreadable, + /// A changed/new file to (re)index. + Indexed(Box), +} + +/// Parsed facts for one changed file, ready to drain into the store in order. +struct IndexedFileWork { + rel: String, + fid: String, + file: IndexedFile, + /// Extracted symbols (empty for unsupported/disabled languages). + symbols: Vec, + /// `(imports, language)` for later package resolution, when non-empty. + imports: Option<(Vec, Language)>, + /// Supertype relationships discovered in this file, when non-empty. + supers: Vec, + /// Usage references discovered in this file, when non-empty. + references: Vec, + /// Parsed manifest (`.csproj`/`package.json`) ready to write, when this file + /// is a manifest that parsed successfully. + manifest: Option, +} + +/// Read-only inputs shared by every [`parse_file`] call, gathered once before +/// the parallel parse so each rayon worker borrows them immutably. None of +/// these are touched by the store, so sharing across threads is safe. +struct ParseContext<'a> { + repo: &'a Repo, + config: &'a SynapseConfig, + force: bool, + changed_only: bool, + changed: &'a HashSet, + existing: &'a [IndexedFile], + tracked: &'a HashSet, + central: &'a CentralVersions, + now: &'a str, +} + +/// Parse a single candidate file into a [`FileWork`], doing only pure work: +/// read + hash + skip-decision + language detection + tree-sitter extraction + +/// manifest parse. No store access — safe to call from a rayon worker thread. +fn parse_file(ctx: &ParseContext<'_>, rel: &str) -> Result { + if ctx.changed_only && !ctx.changed.contains(rel) { + // Skip files git didn't flag as changed. + return Ok(FileWork::SkipCounted); + } + + let abs = ctx.repo.root.join(rel); + let bytes = match std::fs::read(&abs) { + Ok(b) => b, + Err(_) => return Ok(FileWork::SkipUnreadable), + }; + let hash = blake3::hash(&bytes).to_hex().to_string(); + let size = bytes.len() as u64; + + // Skip unchanged files (unless forced). + if !ctx.force + && let Some(prev) = ctx.existing.iter().find(|f| f.path == rel) + && prev.hash == hash + { + return Ok(FileWork::SkipCounted); + } + + let language = languages::detect(rel); + let fid = file_id(rel); + let file = IndexedFile { + id: fid.clone(), + path: rel.to_string(), + language, + hash, + size_bytes: size, + tracked: ctx.tracked.contains(rel), + last_indexed_at: ctx.now.to_string(), + }; + + let mut symbols = Vec::new(); + let mut imports = None; + let mut supers = Vec::new(); + let mut references = Vec::new(); + + // Symbol extraction for supported languages. + if language != Language::Other && language_enabled(ctx.config, language) { + let text = String::from_utf8_lossy(&bytes); + symbols = tree_sitter::extract(rel, language, &text).unwrap_or_default(); + + // Collect imports (JS/TS, C#) for later file -> package resolution. + if matches!( + language, + Language::JavaScript | Language::TypeScript | Language::CSharp + ) { + let imps = tree_sitter::extract_imports(rel, language, &text); + if !imps.is_empty() { + imports = Some((imps, language)); + } + } + + // Collect supertype relationships for later INHERITS/IMPLEMENTS edges. + supers = tree_sitter::extract_supertypes(rel, language, &text); + + // Collect usage references for later REFERENCES edges. + references = tree_sitter::extract_references(rel, language, &text); + } + + // Manifest parsing -> projects/packages/edges (pure parse only). A parse + // error aborts the whole index, exactly as the previous inline `?` did. + let manifest = if rel.ends_with(".csproj") { + Some(parse_csproj_manifest(rel, &abs, ctx.central)?) + } else if rel.ends_with("package.json") { + Some(parse_package_json_manifest(rel, &abs)?) + } else { + None + }; + + Ok(FileWork::Indexed(Box::new(IndexedFileWork { + rel: rel.to_string(), + fid, + file, + symbols, + imports, + supers, + references, + manifest, + }))) +} + /// Index the repository into `store`. /// /// * `force` re-indexes every file regardless of hash. @@ -143,7 +281,54 @@ pub fn index_repo( let mut projects_seen = 0usize; let total = candidates.len(); - for (i, rel) in candidates.iter().enumerate() { + + // Stage 1 — parallel parse. Read + hash + detect + tree-sitter extract + + // manifest parse is pure, `Send`, owned-data work (no store access), so it + // runs across rayon's thread pool. `par_iter().map().collect()` preserves + // input order, so the resulting `Vec` is index-aligned with `candidates` — + // the sequential drain below then writes in candidate order, making symbol + // insertion and `pending_*` ordering byte-identical to the old loop. + if let Some(cb) = progress { + cb( + "", + &IndexProgress { + processed: 0, + total, + files_indexed: 0, + symbols: 0, + projects: 0, + packages: 0, + phase: Some("parsing files"), + }, + ); + } + let parse_ctx = ParseContext { + repo, + config, + force, + changed_only, + changed: &changed, + existing: &existing, + tracked: &tracked, + central: ¢ral, + now, + }; + let parsed: Vec> = candidates + .par_iter() + .map(|rel| parse_file(&parse_ctx, rel)) + .collect(); + + // Stage 2 — sequential drain. Walk the parsed results in candidate order, + // accumulating each changed file's node payload into `file_writes` and its + // manifest into `manifest_writes`; the pending_* vecs are filled in order + // too. No per-symbol store call here — the file/symbol nodes are written in + // ONE batched transaction below (the dominant cost otherwise), and manifests + // (few, cheap) right after. The store mutex is never touched from a rayon + // worker, and write order stays candidate-ordered so output is unchanged. + let mut file_writes: Vec = Vec::new(); + let mut manifest_writes: Vec = Vec::new(); + for (i, (rel, work)) in candidates.iter().zip(parsed).enumerate() { + let work = work?; if let Some(cb) = progress { let snap = IndexProgress { processed: i + 1, @@ -156,90 +341,44 @@ pub fn index_repo( }; cb(rel, &snap); } - if changed_only && !changed.contains(rel) { - // Skip files git didn't flag as changed. - outcome.files_skipped_unchanged += 1; - continue; - } - let abs = repo.root.join(rel); - let bytes = match std::fs::read(&abs) { - Ok(b) => b, - Err(_) => continue, + let work = match work { + FileWork::SkipCounted => { + outcome.files_skipped_unchanged += 1; + continue; + } + FileWork::SkipUnreadable => continue, + FileWork::Indexed(w) => w, }; - let hash = blake3::hash(&bytes).to_hex().to_string(); - let size = bytes.len() as u64; - - // Skip unchanged files (unless forced). - if !force - && let Some(prev) = existing.iter().find(|f| &f.path == rel) - && prev.hash == hash - { - outcome.files_skipped_unchanged += 1; - continue; - } - // Changed/new: clear old symbols then re-upsert. - store.remove_file(rel)?; - - let language = languages::detect(rel); - let fid = file_id(rel); - let file = IndexedFile { - id: fid.clone(), - path: rel.clone(), - language, - hash, - size_bytes: size, - tracked: tracked.contains(rel), - last_indexed_at: now.to_string(), - }; - store.upsert_file(file)?; outcome.files_indexed += 1; + outcome.symbols += work.symbols.len(); + file_writes.push(crate::graph::model::FileWrite { + file: work.file, + symbols: work.symbols, + }); - // Symbol extraction for supported languages. - if language != Language::Other && language_enabled(config, language) { - let text = String::from_utf8_lossy(&bytes); - let symbols = tree_sitter::extract(rel, language, &text).unwrap_or_default(); - for sym in symbols { - let sid = sym.id.clone(); - store.upsert_symbol(sym)?; - store.link_file_declares_symbol(&fid, &sid)?; - outcome.symbols += 1; - } - // Collect imports (JS/TS, C#) for later file -> package resolution. - if matches!( - language, - Language::JavaScript | Language::TypeScript | Language::CSharp - ) { - let imports = tree_sitter::extract_imports(rel, language, &text); - if !imports.is_empty() { - pending_imports.push((fid.clone(), imports, language)); - } - } - - // Collect supertype relationships for later INHERITS/IMPLEMENTS edges. - let supers = tree_sitter::extract_supertypes(rel, language, &text); - if !supers.is_empty() { - pending_supertypes.push((rel.clone(), supers)); - } - - // Collect usage references for later REFERENCES edges. - let refs = tree_sitter::extract_references(rel, language, &text); - if !refs.is_empty() { - pending_references.push((rel.clone(), refs)); - } + if let Some((imports, language)) = work.imports { + pending_imports.push((work.fid.clone(), imports, language)); } - - // Manifest parsing -> projects/packages/edges. - if rel.ends_with(".csproj") { - outcome.edges += index_csproj(rel, &abs, store, ¢ral)?; - projects_seen += 1; - } else if rel.ends_with("package.json") { - outcome.edges += index_package_json(rel, &abs, store)?; + if !work.supers.is_empty() { + pending_supertypes.push((work.rel.clone(), work.supers)); + } + if !work.references.is_empty() { + pending_references.push((work.rel.clone(), work.references)); + } + if let Some(manifest) = work.manifest { + manifest_writes.push(manifest); projects_seen += 1; } } + // Write every file + its symbols + DECLARES edges in one transaction. + store.write_files_batch(&file_writes)?; + // Then the manifests (projects/packages + their edges), in candidate order. + for manifest in &manifest_writes { + outcome.edges += write_manifest(store, manifest)?; + } // Post-loop edge-resolution passes. These run after the per-file scan (so // their link targets all exist) and can be the bulk of wall-clock on large // repos, so each reports a phase to the progress UI — otherwise the bar @@ -271,26 +410,34 @@ pub fn index_repo( outcome.edges += resolve_imports(store, &pending_imports)?; } - // Resolve supertype relationships -> INHERITS/IMPLEMENTS edges. Done after - // the main loop so all symbols (the link targets) exist. - if !pending_supertypes.is_empty() { - report_phase( - "resolving type relationships", - outcome.files_indexed, - outcome.symbols, - ); - outcome.edges += resolve_supertypes(store, &pending_supertypes)?; - } + // Resolve supertype/reference relationships -> INHERITS/IMPLEMENTS/REFERENCES + // edges. Both passes look symbols up by name and file; rather than issuing + // one unindexed `symbols_matching` scan per lookup (the resolve-phase + // bottleneck on large repos), build one in-memory index from a single full + // scan and resolve against it. Done after the main loop so every symbol + // (the link targets) exists. Skip building it when there's nothing to + // resolve. + if !pending_supertypes.is_empty() || !pending_references.is_empty() { + let symbol_index = SymbolIndex::build(store)?; + + if !pending_supertypes.is_empty() { + report_phase( + "resolving type relationships", + outcome.files_indexed, + outcome.symbols, + ); + outcome.edges += resolve_supertypes(store, &symbol_index, &pending_supertypes)?; + } - // Resolve usage references -> REFERENCES edges. Done after the main loop so - // all declarations (the link targets) exist — references are cross-file. - if !pending_references.is_empty() { - report_phase( - "resolving references", - outcome.files_indexed, - outcome.symbols, - ); - outcome.edges += resolve_references(store, &pending_references)?; + // References are cross-file, so all declarations must already exist. + if !pending_references.is_empty() { + report_phase( + "resolving references", + outcome.files_indexed, + outcome.symbols, + ); + outcome.edges += resolve_references(store, &symbol_index, &pending_references)?; + } } // Associate every indexed file with its nearest owning project manifest, @@ -379,6 +526,87 @@ fn resolve_imports( Ok(edges) } +/// In-memory index over every symbol in the graph, built once from a single +/// full scan so the resolve passes can look symbols up by name/file in O(1) +/// instead of issuing one unindexed `symbols_matching` scan per lookup. +/// +/// Buckets are sorted deterministically on build (by `start_line`, `end_line`, +/// `id`) so `from`/`child` selection — which takes the first element — is +/// stable when a file declares several same-named symbols (overloads, etc.). +struct SymbolIndex { + /// All symbols sharing a case-insensitive name, keyed by lowercased name. + by_name_ci: std::collections::HashMap>, + /// Declarations per file, keyed by file path then exact symbol name. + by_file: std::collections::HashMap< + String, + std::collections::HashMap>, + >, +} + +impl SymbolIndex { + /// Build the index from one full-table symbol scan (empty query => no + /// `WHERE`, so a single pass over the Symbol table). + fn build(store: &dyn GraphStore) -> Result { + use crate::graph::model::SymbolSearchQuery; + use std::collections::HashMap; + + let all = store.symbols_matching(&SymbolSearchQuery::default())?; + let mut by_name_ci: HashMap> = HashMap::new(); + let mut by_file: HashMap>> = HashMap::new(); + for sym in all { + by_name_ci + .entry(sym.name.to_ascii_lowercase()) + .or_default() + .push(sym.clone()); + by_file + .entry(sym.file_path.clone()) + .or_default() + .entry(sym.name.clone()) + .or_default() + .push(sym); + } + let sort = |list: &mut Vec| { + list.sort_by(|a, b| { + a.start_line + .cmp(&b.start_line) + .then(a.end_line.cmp(&b.end_line)) + .then(a.id.cmp(&b.id)) + }); + }; + for list in by_name_ci.values_mut() { + sort(list); + } + for names in by_file.values_mut() { + for list in names.values_mut() { + sort(list); + } + } + Ok(SymbolIndex { + by_name_ci, + by_file, + }) + } + + /// The first declaration named `name` in `file` (deterministic by build + /// sort), or `None` if the file declares no such symbol. + fn decl_in_file(&self, file: &str, name: &str) -> Option<&crate::graph::model::IndexedSymbol> { + self.by_file.get(file)?.get(name)?.first() + } + + /// All symbols whose name equals `name` (case-insensitive lookup, then an + /// exact-name filter to match the prior `name == …` post-filter semantics). + fn by_name<'a>( + &'a self, + name: &'a str, + ) -> impl Iterator + 'a { + self.by_name_ci + .get(&name.to_ascii_lowercase()) + .into_iter() + .flat_map(|v| v.iter()) + .filter(move |s| s.name == name) + } +} + /// Whether `candidate_path` lives in the same project directory as a file whose /// parent directory is `project_dir`. Segment-safe: `src` does not match /// `src2/foo` — the match must fall on a `/` boundary (or be the dir itself). @@ -402,32 +630,25 @@ fn same_project_dir(candidate_path: &str, project_dir: &str) -> bool { /// else INHERITS). fn resolve_supertypes( store: &dyn GraphStore, + index: &SymbolIndex, pending: &[(String, Vec)], ) -> Result { - use crate::graph::model::{GraphEdge, SymbolKind, SymbolSearchQuery}; + use crate::graph::model::{GraphEdge, SymbolKind}; let mut batch: Vec = Vec::new(); for (file, supers) in pending { let project_prefix = file.rsplit_once('/').map(|(d, _)| d).unwrap_or(""); for st in supers { // The child symbol must be declared in this file. - let child_candidates = store.symbols_matching(&SymbolSearchQuery { - name: Some(st.child.clone()), - file: Some(file.clone()), - ..Default::default() - })?; - let Some(child) = child_candidates.into_iter().find(|s| s.name == st.child) else { + let Some(child) = index.decl_in_file(file, &st.child) else { continue; }; // Candidate supertype symbols (exact name match, any file). - let mut targets: Vec<_> = store - .symbols_matching(&SymbolSearchQuery { - name: Some(st.supertype.clone()), - ..Default::default() - })? - .into_iter() - .filter(|s| s.name == st.supertype && s.id != child.id) + let mut targets: Vec<_> = index + .by_name(&st.supertype) + .filter(|s| s.id != child.id) + .cloned() .collect(); if targets.is_empty() { continue; @@ -497,17 +718,10 @@ fn resolve_supertypes( /// against local variables shadowing a global name. fn resolve_references( store: &dyn GraphStore, + index: &SymbolIndex, pending: &[(String, Vec)], ) -> Result { - use crate::graph::model::{IndexedSymbol, SymbolSearchQuery}; - use std::collections::HashMap; - - // Cache of target candidates keyed by referenced name, shared across every - // file — references to the same type recur constantly, so this turns an - // O(#refs) query pattern into O(#distinct names). Each entry holds the full - // exact-name candidate set (any file); same-file/same-project narrowing is - // applied per reference below. - let mut to_cache: HashMap> = HashMap::new(); + use crate::graph::model::IndexedSymbol; // Accumulate edges and write them in one batch (one transaction) at the end // rather than one DB statement per edge — this is what removes the stall. @@ -515,52 +729,23 @@ fn resolve_references( for (file, refs) in pending { let project_prefix = file.rsplit_once('/').map(|(d, _)| d).unwrap_or(""); - // Preload every declaration in this file once, indexed by name, for the - // `from` (enclosing-declaration) lookup — one query per file instead of - // one per reference. Candidates are sorted so selection is deterministic - // when a file has multiple same-named declarations (overloads, etc.). - let mut from_by_name: HashMap<&str, Vec<&IndexedSymbol>> = HashMap::new(); - let file_symbols = store.symbols_matching(&SymbolSearchQuery { - file: Some(file.clone()), - ..Default::default() - })?; - for sym in &file_symbols { - from_by_name.entry(sym.name.as_str()).or_default().push(sym); - } - for list in from_by_name.values_mut() { - list.sort_by(|a, b| { - a.start_line - .cmp(&b.start_line) - .then(a.end_line.cmp(&b.end_line)) - .then(a.id.cmp(&b.id)) - }); - } - for r in refs { // The referring symbol must be a declaration in this file. if r.from.is_empty() { continue; } - let Some(from) = from_by_name.get(r.from.as_str()).and_then(|v| v.first()) else { + // The `from` (enclosing-declaration) lookup: the first declaration + // in this file named `r.from` (deterministic by the index's build + // sort when a file has multiple same-named declarations). + let Some(from) = index.decl_in_file(file, &r.from) else { continue; }; - // Candidate target symbols (exact name match, any file), cached - // across files. An empty set means the name isn't a declared symbol - // (e.g. a local var) — no edge, the false-positive guard. - if !to_cache.contains_key(&r.to) { - let candidates = store - .symbols_matching(&SymbolSearchQuery { - name: Some(r.to.clone()), - ..Default::default() - })? - .into_iter() - .filter(|s| s.name == r.to) - .collect(); - to_cache.insert(r.to.clone(), candidates); - } + // Candidate target symbols (exact name match, any file). An empty + // set means the name isn't a declared symbol (e.g. a local var) — + // no edge, the false-positive guard. let mut targets: Vec<&IndexedSymbol> = - to_cache[&r.to].iter().filter(|s| s.id != from.id).collect(); + index.by_name(&r.to).filter(|s| s.id != from.id).collect(); if targets.is_empty() { continue; } @@ -751,15 +936,35 @@ fn language_enabled(config: &SynapseConfig, lang: Language) -> bool { } } -/// Parse a `.csproj` and upsert the project + its references/packages. -/// Returns the number of edges created. Package versions missing from the -/// `.csproj` are resolved against Central Package Management via `central`. -fn index_csproj( +/// A manifest (`.csproj` / `package.json`) parsed into the exact, ordered set of +/// store operations needed to record it — the project node plus its reference +/// and package edges in document order. Produced by the pure `parse_*_manifest` +/// functions (safe to run off the main thread) and replayed against the store +/// by [`write_manifest`] in the sequential drain. Splitting parse from write +/// lets the CPU-heavy parse run in parallel while keeping every store write +/// (and thus edge ordering) on the single indexing thread. +struct ManifestWrite { + project: IndexedProject, + ops: Vec, +} + +/// One ordered store operation contributed by a manifest, after its project +/// node is upserted. Each op corresponds to exactly one edge. +enum ManifestOp { + /// `link_project_references_project(project, target_project)`. + ProjectRef { target: String }, + /// `upsert_package` then `link_project_uses_package(project, package)`. + Package(IndexedPackage), +} + +/// Parse a `.csproj` into a [`ManifestWrite`] (pure: no store access). Package +/// versions missing from the `.csproj` are resolved against Central Package +/// Management via `central`. +fn parse_csproj_manifest( rel: &str, abs: &Path, - store: &dyn GraphStore, central: &CentralVersions, -) -> Result { +) -> Result { let text = std::fs::read_to_string(abs).with_context(|| format!("reading {}", abs.display()))?; let parsed = dotnet::parse_csproj(&text)?; @@ -776,21 +981,22 @@ fn index_csproj( } else { "dotnet" }; - store.upsert_project(IndexedProject { - id: pid.clone(), + let project = IndexedProject { + id: pid, name, path: rel.to_string(), language: Language::CSharp, kind: kind.to_string(), - })?; + }; - let mut edges = 0; + let mut ops = Vec::new(); // Resolve project references relative to this csproj's directory. let dir = Path::new(rel).parent(); for proj_ref in &parsed.project_references { let target = resolve_rel(dir, proj_ref); - store.link_project_references_project(&pid, &project_id(&target))?; - edges += 1; + ops.push(ManifestOp::ProjectRef { + target: project_id(&target), + }); } for pkg in &parsed.package_references { // Prefer the inline version; fall back to the central pin (CPM). @@ -799,23 +1005,19 @@ fn index_csproj( .clone() .or_else(|| central.version_for(rel, &pkg.name)) .unwrap_or_default(); - let pkg_id = package_id("nuget", &pkg.name); - store.upsert_package(IndexedPackage { - id: pkg_id.clone(), + ops.push(ManifestOp::Package(IndexedPackage { + id: package_id("nuget", &pkg.name), name: pkg.name.clone(), version, ecosystem: "nuget".to_string(), dependency_kind: "package".to_string(), - })?; - store.link_project_uses_package(&pid, &pkg_id)?; - edges += 1; + })); } - Ok(edges) + Ok(ManifestWrite { project, ops }) } -/// Parse a `package.json` and upsert the project + its dependencies. -/// Returns the number of edges created. -fn index_package_json(rel: &str, abs: &Path, store: &dyn GraphStore) -> Result { +/// Parse a `package.json` into a [`ManifestWrite`] (pure: no store access). +fn parse_package_json_manifest(rel: &str, abs: &Path) -> Result { let text = std::fs::read_to_string(abs).with_context(|| format!("reading {}", abs.display()))?; let parsed = node::parse_package_json(&text)?; @@ -828,29 +1030,45 @@ fn index_package_json(rel: &str, abs: &Path, store: &dyn GraphStore) -> Result Result { + let pid = manifest.project.id.clone(); + store.upsert_project(manifest.project.clone())?; + for op in &manifest.ops { + match op { + ManifestOp::ProjectRef { target } => { + store.link_project_references_project(&pid, target)?; + } + ManifestOp::Package(pkg) => { + store.upsert_package(pkg.clone())?; + store.link_project_uses_package(&pid, &pkg.id)?; + } + } + } + Ok(manifest.ops.len()) } /// Resolve a relative manifest reference (possibly using `\`) against a base dir