diff --git a/Cargo.lock b/Cargo.lock index f9f49a8..e553509 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -809,7 +809,7 @@ dependencies = [ [[package]] name = "synapse" -version = "0.1.3" +version = "0.1.4" dependencies = [ "anyhow", "blake3", diff --git a/Cargo.toml b/Cargo.toml index 112da6a..e013bbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "synapse" -version = "0.1.3" +version = "0.1.4" edition = "2024" [[bin]] diff --git a/src/graph/ladybug_store.rs b/src/graph/ladybug_store.rs index b3008fd..0e5ee51 100644 --- a/src/graph/ladybug_store.rs +++ b/src/graph/ladybug_store.rs @@ -354,6 +354,92 @@ impl GraphStore for LadybugGraphStore { ) } + fn link_edges(&self, edges: &[crate::graph::model::GraphEdge]) -> Result<()> { + use crate::graph::model::GraphEdge; + if edges.is_empty() { + return Ok(()); + } + let _guard = self.lock.lock().unwrap(); + let conn = self.conn()?; + + // The Cypher for each edge kind. Same MERGE shape as the per-edge + // `link_*` methods; here the statement is prepared ONCE per kind and + // re-executed per row, all inside one transaction — so we pay a single + // commit instead of one per edge (the source of the end-of-index stall). + let cypher = |e: &GraphEdge| -> &'static str { + match e { + GraphEdge::SymbolReferences { .. } => { + "MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:REFERENCES]->(b)" + } + GraphEdge::SymbolInherits { .. } => { + "MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:INHERITS]->(b)" + } + GraphEdge::SymbolImplements { .. } => { + "MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:IMPLEMENTS]->(b)" + } + GraphEdge::FileImportsPackage { .. } => { + "MATCH (f:File {id: $a}), (k:Package {id: $b}) MERGE (f)-[:IMPORTS_PACKAGE]->(k)" + } + GraphEdge::ProjectContainsFile { .. } => { + "MATCH (p:Project {id: $a}), (f:File {id: $b}) MERGE (p)-[:CONTAINS_FILE]->(f)" + } + } + }; + // The two endpoint ids for an edge, in (a, b) order matching the Cypher. + fn endpoints(e: &GraphEdge) -> (&str, &str) { + match e { + GraphEdge::SymbolReferences { from, to } + | GraphEdge::SymbolInherits { from, to } + | GraphEdge::SymbolImplements { from, to } => (from, to), + GraphEdge::FileImportsPackage { file, package } => (file, package), + GraphEdge::ProjectContainsFile { project, file } => (project, file), + } + } + + // One prepared statement per distinct edge-kind Cypher, reused across + // all rows of that kind. + let mut prepared: std::collections::HashMap<&'static str, lbug::PreparedStatement> = + std::collections::HashMap::new(); + + conn.query("BEGIN TRANSACTION") + .map_err(|e| anyhow!("begin transaction: {e}"))?; + // Run the batch; on any error, roll back so a partial batch isn't left + // half-committed, then surface the error. + let result = (|| -> Result<()> { + for edge in edges { + let q = cypher(edge); + if !prepared.contains_key(q) { + let stmt = conn + .prepare(q) + .map_err(|e| anyhow!("preparing `{q}`: {e}"))?; + prepared.insert(q, stmt); + } + let stmt = prepared.get_mut(q).expect("just inserted"); + let (a, b) = endpoints(edge); + conn.execute( + stmt, + vec![ + ("a", Value::String(a.to_string())), + ("b", Value::String(b.to_string())), + ], + ) + .map_err(|e| anyhow!("executing batch edge: {e}"))?; + } + Ok(()) + })(); + match result { + Ok(()) => conn + .query("COMMIT") + .map(|_| ()) + .map_err(|e| anyhow!("commit transaction: {e}")), + Err(err) => { + // Best-effort rollback; report the original error. + let _ = conn.query("ROLLBACK"); + Err(err) + } + } + } + fn symbol_type_relations(&self, symbol_name: &str) -> Result> { let _guard = self.lock.lock().unwrap(); let conn = self.conn()?; diff --git a/src/graph/model.rs b/src/graph/model.rs index 5bbb270..6067fe0 100644 --- a/src/graph/model.rs +++ b/src/graph/model.rs @@ -248,6 +248,23 @@ pub struct RelatedItem { pub depth: usize, } +/// A relationship edge to create between two already-upserted nodes, used by +/// the indexer's post-pass to write many edges in one batch. Both ids must +/// already exist as nodes; writing uses `MERGE` so re-runs are idempotent. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum GraphEdge { + /// `Symbol -[:REFERENCES]-> Symbol` (referrer -> referenced). + SymbolReferences { from: String, to: String }, + /// `Symbol -[:INHERITS]-> Symbol` (subtype -> base). + SymbolInherits { from: String, to: String }, + /// `Symbol -[:IMPLEMENTS]-> Symbol` (implementor -> interface/trait). + SymbolImplements { from: String, to: String }, + /// `File -[:IMPORTS_PACKAGE]-> Package`. + FileImportsPackage { file: String, package: String }, + /// `Project -[:CONTAINS_FILE]-> File`. + ProjectContainsFile { project: String, file: String }, +} + /// Aggregate counts for `status`/`index --stats`. #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct IndexStats { diff --git a/src/graph/store.rs b/src/graph/store.rs index 84a3062..f0cf3d5 100644 --- a/src/graph/store.rs +++ b/src/graph/store.rs @@ -41,6 +41,33 @@ pub trait GraphStore { /// type use (the `REFERENCES` edge). Direction is referrer -> referenced. fn link_symbol_references(&self, from_symbol_id: &str, to_symbol_id: &str) -> Result<()>; + /// Create many relationship edges in a single batch. Backends that support + /// transactions should write the whole batch in one transaction with reused + /// prepared statements — far faster than one `link_*` call per edge during + /// the indexer's post-pass. Idempotent (`MERGE`); endpoints must already + /// exist as nodes. The default implementation falls back to per-edge writes. + fn link_edges(&self, edges: &[crate::graph::model::GraphEdge]) -> Result<()> { + use crate::graph::model::GraphEdge; + for e in edges { + match e { + GraphEdge::SymbolReferences { from, to } => { + self.link_symbol_references(from, to)? + } + GraphEdge::SymbolInherits { from, to } => self.link_symbol_inherits(from, to)?, + GraphEdge::SymbolImplements { from, to } => { + self.link_symbol_implements(from, to)? + } + GraphEdge::FileImportsPackage { file, package } => { + self.link_file_imports_package(file, package)? + } + GraphEdge::ProjectContainsFile { project, file } => { + self.link_project_contains_file(project, file)? + } + } + } + Ok(()) + } + fn symbols_matching(&self, query: &SymbolSearchQuery) -> Result>; fn files_matching(&self, query: &FileSearchQuery) -> Result>; fn related_to_symbol(&self, symbol: &str, depth: usize) -> Result>; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index d97e2dd..3b131a6 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -64,6 +64,10 @@ pub struct IndexProgress { pub projects: usize, /// Packages discovered so far. pub packages: usize, + /// The current post-loop phase, when indexing has moved past the per-file + /// scan into edge resolution (e.g. "resolving references"). `None` during + /// the file scan. Lets the UI show what the otherwise-frozen bar is doing. + pub phase: Option<&'static str>, } /// A progress observer invoked once per candidate file as indexing proceeds. @@ -148,6 +152,7 @@ pub fn index_repo( symbols: outcome.symbols, projects: projects_seen, packages: 0, + phase: None, }; cb(rel, &snap); } @@ -235,32 +240,80 @@ pub fn index_repo( } } + // Post-loop edge-resolution passes. These run after the per-file scan (so + // their link targets all exist) and can be the bulk of wall-clock on large + // repos, so each reports a phase to the progress UI — otherwise the bar + // sits frozen at N/N and looks hung. `report_phase` reuses the same + // callback as the file scan, emitting a snapshot tagged with the phase. + // It's a fn (not a closure) taking the live counts so it doesn't hold a + // borrow of `outcome` across the mutations below. + let report_phase = |phase: &'static str, files: usize, symbols: usize| { + if let Some(cb) = progress { + cb( + "", + &IndexProgress { + processed: total, + total, + files_indexed: files, + symbols, + projects: projects_seen, + packages: 0, + phase: Some(phase), + }, + ); + } + }; + // Resolve collected imports to known packages -> IMPORTS_PACKAGE edges. // Done after the main loop so every manifest has registered its packages. if !pending_imports.is_empty() { + report_phase("resolving imports", outcome.files_indexed, outcome.symbols); outcome.edges += resolve_imports(store, &pending_imports)?; } // Resolve supertype relationships -> INHERITS/IMPLEMENTS edges. Done after // the main loop so all symbols (the link targets) exist. if !pending_supertypes.is_empty() { + report_phase( + "resolving type relationships", + outcome.files_indexed, + outcome.symbols, + ); outcome.edges += resolve_supertypes(store, &pending_supertypes)?; } // Resolve usage references -> REFERENCES edges. Done after the main loop so // all declarations (the link targets) exist — references are cross-file. if !pending_references.is_empty() { + report_phase( + "resolving references", + outcome.files_indexed, + outcome.symbols, + ); outcome.edges += resolve_references(store, &pending_references)?; } // Associate every indexed file with its nearest owning project manifest, // creating CONTAINS_FILE edges. We link against the full candidate set (not // just files touched this run) so ownership is complete after any index. - for rel in &candidates { - if let Some(manifest) = owning_manifest(rel, &manifests) { - store.link_project_contains_file(&project_id(manifest), &file_id(rel))?; - } - } + // Batched into one transaction like the resolve passes above. + report_phase( + "linking project membership", + outcome.files_indexed, + outcome.symbols, + ); + let contains_edges: Vec = candidates + .iter() + .filter_map(|rel| { + owning_manifest(rel, &manifests).map(|manifest| { + crate::graph::model::GraphEdge::ProjectContainsFile { + project: project_id(manifest), + file: file_id(rel), + } + }) + }) + .collect(); + store.link_edges(&contains_edges)?; // Remove files that no longer exist as candidates (deleted/now-ignored). if !changed_only { @@ -299,7 +352,7 @@ fn resolve_imports( .map(|p| p.name.as_str()) .collect(); - let mut edges = 0; + let mut batch: Vec = Vec::new(); for (fid, imports, lang) in pending { // De-dup the resolved package ids per file. let mut linked: HashSet = HashSet::new(); @@ -314,11 +367,15 @@ fn resolve_imports( if let Some(pkg_id) = resolved && linked.insert(pkg_id.clone()) { - store.link_file_imports_package(fid, &pkg_id)?; - edges += 1; + batch.push(crate::graph::model::GraphEdge::FileImportsPackage { + file: fid.clone(), + package: pkg_id, + }); } } } + let edges = batch.len(); + store.link_edges(&batch)?; Ok(edges) } @@ -347,9 +404,9 @@ fn resolve_supertypes( store: &dyn GraphStore, pending: &[(String, Vec)], ) -> Result { - use crate::graph::model::{SymbolKind, SymbolSearchQuery}; + use crate::graph::model::{GraphEdge, SymbolKind, SymbolSearchQuery}; - let mut edges = 0; + let mut batch: Vec = Vec::new(); for (file, supers) in pending { let project_prefix = file.rsplit_once('/').map(|(d, _)| d).unwrap_or(""); for st in supers { @@ -405,15 +462,22 @@ fn resolve_supertypes( matches!(target.kind, SymbolKind::Interface | SymbolKind::Trait) } }; - if implements { - store.link_symbol_implements(&child.id, &target.id)?; + batch.push(if implements { + GraphEdge::SymbolImplements { + from: child.id.clone(), + to: target.id.clone(), + } } else { - store.link_symbol_inherits(&child.id, &target.id)?; - } - edges += 1; + GraphEdge::SymbolInherits { + from: child.id.clone(), + to: target.id.clone(), + } + }); } } } + let edges = batch.len(); + store.link_edges(&batch)?; Ok(edges) } @@ -445,7 +509,9 @@ fn resolve_references( // applied per reference below. let mut to_cache: HashMap> = HashMap::new(); - let mut edges = 0; + // Accumulate edges and write them in one batch (one transaction) at the end + // rather than one DB statement per edge — this is what removes the stall. + let mut batch: Vec = Vec::new(); for (file, refs) in pending { let project_prefix = file.rsplit_once('/').map(|(d, _)| d).unwrap_or(""); @@ -522,11 +588,15 @@ fn resolve_references( // Deterministic order when an ambiguous name produces multiple edges. targets.sort_by(|a, b| a.file_path.cmp(&b.file_path).then(a.id.cmp(&b.id))); for target in targets { - store.link_symbol_references(&from.id, &target.id)?; - edges += 1; + batch.push(crate::graph::model::GraphEdge::SymbolReferences { + from: from.id.clone(), + to: target.id.clone(), + }); } } } + let edges = batch.len(); + store.link_edges(&batch)?; Ok(edges) } diff --git a/src/main.rs b/src/main.rs index 7ad1796..d6633ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -139,13 +139,17 @@ fn cmd_index(cwd: &Path, args: cli::IndexArgs) -> Result<()> { move |current: &str, p: &indexer::IndexProgress| { pb.set_length(p.total as u64); pb.set_position(p.processed as u64); - // Colored live stats line + the current file, both below the bar. + // Bottom line: the current file during the scan, or the active + // post-loop phase (e.g. "resolving references…") once the per-file + // scan is done — so the bar at N/N shows work rather than looking + // hung while edges are written. + let bottom = match p.phase { + Some(phase) => format!("\x1b[2m{phase}…\x1b[0m"), + None => format!("\x1b[2m{}\x1b[0m", truncate_middle(current, 64)), + }; pb.set_message(format!( - "\x1b[36mfiles\x1b[0m {} \x1b[36msymbols\x1b[0m {} \x1b[36mprojects\x1b[0m {}\n \x1b[2m{}\x1b[0m", - p.files_indexed, - p.symbols, - p.projects, - truncate_middle(current, 64), + "\x1b[36mfiles\x1b[0m {} \x1b[36msymbols\x1b[0m {} \x1b[36mprojects\x1b[0m {}\n {}", + p.files_indexed, p.symbols, p.projects, bottom, )); } }); diff --git a/tests/ladybug_smoke.rs b/tests/ladybug_smoke.rs index 22f2a43..ace2e63 100644 --- a/tests/ladybug_smoke.rs +++ b/tests/ladybug_smoke.rs @@ -128,3 +128,82 @@ fn ladybug_reference_edge_roundtrips() { let _ = std::fs::remove_dir_all(&path); let _ = std::fs::remove_file(&path); } + +/// The batched `link_edges` path (one transaction, prepared statements reused) +/// writes the same edges as the per-edge `link_*` methods, across edge kinds. +#[test] +fn ladybug_link_edges_batch_roundtrips() { + use synapse::graph::model::GraphEdge; + + let path = temp_db_path("batch"); + let _ = std::fs::remove_dir_all(&path); + let _ = std::fs::remove_file(&path); + + let store = LadybugGraphStore::open(&path).expect("open"); + store.initialize_schema().expect("schema"); + + let mk = |id: &str, name: &str, file: &str| IndexedSymbol { + id: id.into(), + name: name.into(), + full_name: name.into(), + kind: SymbolKind::Class, + language: Language::CSharp, + file_path: file.into(), + start_line: 1, + end_line: 2, + visibility: "public".into(), + exported: true, + }; + store + .upsert_symbol(mk("sym:base.cs#class#Base#1", "Base", "base.cs")) + .unwrap(); + store + .upsert_symbol(mk("sym:impl.cs#class#Impl#1", "Impl", "impl.cs")) + .unwrap(); + store + .upsert_symbol(mk("sym:user.cs#class#User#1", "User", "user.cs")) + .unwrap(); + + // Two edge kinds written in one batch (one transaction). + store + .link_edges(&[ + GraphEdge::SymbolInherits { + from: "sym:impl.cs#class#Impl#1".into(), + to: "sym:base.cs#class#Base#1".into(), + }, + GraphEdge::SymbolReferences { + from: "sym:user.cs#class#User#1".into(), + to: "sym:base.cs#class#Base#1".into(), + }, + ]) + .expect("link_edges batch"); + + // Both edges are queryable, and re-running the batch is idempotent (MERGE). + assert_eq!(store.stats().unwrap().reference_edges, 1); + let refs = store.symbol_references("Base").unwrap(); + assert!( + refs.iter().any(|r| r.path == "user.cs"), + "batched REFERENCES edge must be queryable: {refs:?}" + ); + let rels = store.symbol_type_relations("Impl").unwrap(); + assert!( + rels.iter().any(|r| r.reason.contains("inherits")), + "batched INHERITS edge must be queryable: {rels:?}" + ); + + // Idempotency: the same batch again must not double-count. + store + .link_edges(&[GraphEdge::SymbolReferences { + from: "sym:user.cs#class#User#1".into(), + to: "sym:base.cs#class#Base#1".into(), + }]) + .expect("re-link"); + assert_eq!( + store.stats().unwrap().reference_edges, + 1, + "MERGE must keep the batch idempotent" + ); + + let _ = std::fs::remove_dir_all(&path); + let _ = std::fs::remove_file(&path); +}