Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "synapse"
version = "0.2.0"
version = "0.2.1"
edition = "2024"

[[bin]]
Expand All @@ -27,6 +27,7 @@ chrono = { version = "0.4.44", features = ["serde"] }
clap = { version = "4.6.1", features = ["derive"] }
globset = "0.4.18"
ignore = "0.4.25"
rayon = "1.10"
quick-xml = "0.40.1"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.150"
Expand Down
221 changes: 190 additions & 31 deletions src/graph/ladybug_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,26 +362,31 @@ impl GraphStore for LadybugGraphStore {
let _guard = self.lock.lock().unwrap();
let conn = self.conn()?;

// The Cypher for each edge kind. Same MERGE shape as the per-edge
// `link_*` methods; here the statement is prepared ONCE per kind and
// re-executed per row, all inside one transaction — so we pay a single
// commit instead of one per edge (the source of the end-of-index stall).
// One `UNWIND $rows` statement per edge kind: all edges of a kind are
// passed as a single list-of-structs parameter and merged in one
// `execute`, so a 61k-edge batch is ~5 FFI calls instead of 61k. Same
// MERGE shape as the per-edge `link_*` methods; idempotent.
let cypher = |e: &GraphEdge| -> &'static str {
match e {
GraphEdge::SymbolReferences { .. } => {
"MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:REFERENCES]->(b)"
"UNWIND $rows AS r MATCH (a:Symbol {id: r.a}), (b:Symbol {id: r.b}) \
MERGE (a)-[:REFERENCES]->(b)"
}
GraphEdge::SymbolInherits { .. } => {
"MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:INHERITS]->(b)"
"UNWIND $rows AS r MATCH (a:Symbol {id: r.a}), (b:Symbol {id: r.b}) \
MERGE (a)-[:INHERITS]->(b)"
}
GraphEdge::SymbolImplements { .. } => {
"MATCH (a:Symbol {id: $a}), (b:Symbol {id: $b}) MERGE (a)-[:IMPLEMENTS]->(b)"
"UNWIND $rows AS r MATCH (a:Symbol {id: r.a}), (b:Symbol {id: r.b}) \
MERGE (a)-[:IMPLEMENTS]->(b)"
}
GraphEdge::FileImportsPackage { .. } => {
"MATCH (f:File {id: $a}), (k:Package {id: $b}) MERGE (f)-[:IMPORTS_PACKAGE]->(k)"
"UNWIND $rows AS r MATCH (f:File {id: r.a}), (k:Package {id: r.b}) \
MERGE (f)-[:IMPORTS_PACKAGE]->(k)"
}
GraphEdge::ProjectContainsFile { .. } => {
"MATCH (p:Project {id: $a}), (f:File {id: $b}) MERGE (p)-[:CONTAINS_FILE]->(f)"
"UNWIND $rows AS r MATCH (p:Project {id: r.a}), (f:File {id: r.b}) \
MERGE (p)-[:CONTAINS_FILE]->(f)"
}
}
};
Expand All @@ -395,35 +400,48 @@ impl GraphStore for LadybugGraphStore {
GraphEdge::ProjectContainsFile { project, file } => (project, file),
}
}
// Discriminant index, so edges keep their kinds grouped while preserving
// per-kind order (the Cypher is selected from the first edge of a group).
fn kind_ix(e: &GraphEdge) -> u8 {
match e {
GraphEdge::SymbolReferences { .. } => 0,
GraphEdge::SymbolInherits { .. } => 1,
GraphEdge::SymbolImplements { .. } => 2,
GraphEdge::FileImportsPackage { .. } => 3,
GraphEdge::ProjectContainsFile { .. } => 4,
}
}

// One prepared statement per distinct edge-kind Cypher, reused across
// all rows of that kind.
let mut prepared: std::collections::HashMap<&'static str, lbug::PreparedStatement> =
std::collections::HashMap::new();
// Group edges by kind, preserving order within each kind.
let mut by_kind: std::collections::BTreeMap<u8, Vec<&GraphEdge>> =
std::collections::BTreeMap::new();
for e in edges {
by_kind.entry(kind_ix(e)).or_default().push(e);
}

conn.query("BEGIN TRANSACTION")
.map_err(|e| anyhow!("begin transaction: {e}"))?;
// Run the batch; on any error, roll back so a partial batch isn't left
// half-committed, then surface the error.
let result = (|| -> Result<()> {
for edge in edges {
let q = cypher(edge);
if !prepared.contains_key(q) {
let stmt = conn
.prepare(q)
.map_err(|e| anyhow!("preparing `{q}`: {e}"))?;
prepared.insert(q, stmt);
}
let stmt = prepared.get_mut(q).expect("just inserted");
let (a, b) = endpoints(edge);
conn.execute(
stmt,
vec![
("a", Value::String(a.to_string())),
("b", Value::String(b.to_string())),
],
)
.map_err(|e| anyhow!("executing batch edge: {e}"))?;
for group in by_kind.values() {
let q = cypher(group[0]);
let rows: Vec<Value> = group
.iter()
.map(|e| {
let (a, b) = endpoints(e);
Value::Struct(vec![
("a".to_string(), Value::String(a.to_string())),
("b".to_string(), Value::String(b.to_string())),
])
})
.collect();
let child_ty: lbug::LogicalType = (&rows[0]).into();
let mut stmt = conn
.prepare(q)
.map_err(|e| anyhow!("preparing `{q}`: {e}"))?;
conn.execute(&mut stmt, vec![("rows", Value::List(child_ty, rows))])
.map_err(|e| anyhow!("executing batch edges: {e}"))?;
}
Ok(())
})();
Expand All @@ -446,6 +464,147 @@ impl GraphStore for LadybugGraphStore {
}
}

fn write_files_batch(&self, files: &[crate::graph::model::FileWrite]) -> Result<()> {
if files.is_empty() {
return Ok(());
}
let _guard = self.lock.lock().unwrap();
let conn = self.conn()?;

// The whole batch is a handful of `UNWIND $rows` statements (one list
// parameter each) inside one transaction, instead of ~2 `execute` calls
// per symbol. Removes run first (clearing stale nodes), then file and
// symbol upserts, then DECLARES edges. MERGE is idempotent and all ids
// are deterministic, so the final graph is identical regardless of the
// intra-batch ordering this collapses.
const REMOVE_DECLARED: &str = "UNWIND $rows AS r MATCH (f:File {path: r.path})-[:DECLARES]->(s:Symbol) DETACH DELETE s";
const REMOVE_BY_FILEPATH: &str =
"UNWIND $rows AS r MATCH (s:Symbol {filePath: r.path}) DETACH DELETE s";
const REMOVE_FILE: &str = "UNWIND $rows AS r MATCH (f:File {path: r.path}) DETACH DELETE f";
const UPSERT_FILE: &str = "UNWIND $rows AS r MERGE (f:File {id: r.id}) \
SET f.path = r.path, f.language = r.language, f.hash = r.hash, \
f.sizeBytes = r.size, f.tracked = r.tracked, f.lastIndexedAt = r.indexed";
const UPSERT_SYMBOL: &str = "UNWIND $rows AS r MERGE (s:Symbol {id: r.id}) \
SET s.name = r.name, s.fullName = r.full, s.kind = r.kind, s.language = r.language, \
s.filePath = r.file, s.startLine = r.startln, s.endLine = r.endln, \
s.visibility = r.vis, s.exported = r.exported";
const LINK_DECLARES: &str = "UNWIND $rows AS r MATCH (f:File {id: r.f}), (s:Symbol {id: r.s}) \
MERGE (f)-[:DECLARES]->(s)";

// Build the row lists up front (owned), so the transaction body is just
// a sequence of single `execute` calls.
let path_rows: Vec<Value> = files
.iter()
.map(|fw| {
Value::Struct(vec![(
"path".to_string(),
Value::String(fw.file.path.clone()),
)])
})
.collect();
let file_rows: Vec<Value> = files
.iter()
.map(|fw| {
let f = &fw.file;
Value::Struct(vec![
("id".to_string(), Value::String(f.id.clone())),
("path".to_string(), Value::String(f.path.clone())),
(
"language".to_string(),
Value::String(lang_to_str(f.language)),
),
("hash".to_string(), Value::String(f.hash.clone())),
("size".to_string(), Value::Int64(f.size_bytes as i64)),
("tracked".to_string(), Value::Bool(f.tracked)),
(
"indexed".to_string(),
Value::String(f.last_indexed_at.clone()),
),
])
})
.collect();
let symbol_rows: Vec<Value> = files
.iter()
.flat_map(|fw| {
fw.symbols.iter().map(|sym| {
Value::Struct(vec![
("id".to_string(), Value::String(sym.id.clone())),
("name".to_string(), Value::String(sym.name.clone())),
("full".to_string(), Value::String(sym.full_name.clone())),
(
"kind".to_string(),
Value::String(sym.kind.as_str().to_string()),
),
(
"language".to_string(),
Value::String(lang_to_str(sym.language)),
),
("file".to_string(), Value::String(sym.file_path.clone())),
("startln".to_string(), Value::Int64(sym.start_line as i64)),
("endln".to_string(), Value::Int64(sym.end_line as i64)),
("vis".to_string(), Value::String(sym.visibility.clone())),
("exported".to_string(), Value::Bool(sym.exported)),
])
})
})
.collect();
let declares_rows: Vec<Value> = files
.iter()
.flat_map(|fw| {
let fid = fw.file.id.clone();
fw.symbols.iter().map(move |sym| {
Value::Struct(vec![
("f".to_string(), Value::String(fid.clone())),
("s".to_string(), Value::String(sym.id.clone())),
])
})
})
.collect();

// (cypher, rows): runs in order. Empty row lists are skipped.
let stages: [(&str, &Vec<Value>); 6] = [
(REMOVE_DECLARED, &path_rows),
(REMOVE_BY_FILEPATH, &path_rows),
(REMOVE_FILE, &path_rows),
(UPSERT_FILE, &file_rows),
(UPSERT_SYMBOL, &symbol_rows),
(LINK_DECLARES, &declares_rows),
];

conn.query("BEGIN TRANSACTION")
.map_err(|e| anyhow!("begin transaction: {e}"))?;
let result = (|| -> Result<()> {
for (q, rows) in stages {
if rows.is_empty() {
continue;
}
let child_ty: lbug::LogicalType = (&rows[0]).into();
let mut stmt = conn
.prepare(q)
.map_err(|e| anyhow!("preparing `{q}`: {e}"))?;
conn.execute(
&mut stmt,
vec![("rows", Value::List(child_ty, rows.clone()))],
)
.map_err(|e| anyhow!("executing `{q}`: {e}"))?;
}
Ok(())
})();
match result {
Ok(()) => match conn.query("COMMIT") {
Ok(_) => Ok(()),
Err(e) => {
let _ = conn.query("ROLLBACK");
Err(anyhow!("commit transaction: {e}"))
}
},
Err(err) => {
let _ = conn.query("ROLLBACK");
Err(err)
}
}
}

fn symbol_type_relations(&self, symbol_name: &str) -> Result<Vec<RelatedItem>> {
let _guard = self.lock.lock().unwrap();
let conn = self.conn()?;
Expand Down
17 changes: 17 additions & 0 deletions src/graph/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,23 @@ pub enum GraphEdge {
ProjectContainsFile { project: String, file: String },
}

/// One file's complete (re)index payload, for the batched node-write path.
///
/// Carries everything needed to replace a file's nodes in the graph: the file
/// node and every symbol it declares. The indexer collects these during the
/// drain and writes them in one transaction via
/// [`GraphStore::write_files_batch`](crate::graph::GraphStore::write_files_batch),
/// rather than one auto-committed statement per symbol — which is the dominant
/// cost of indexing a large repo.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileWrite {
/// The file node to upsert. Its declared symbols are removed first (clearing
/// stale ones from a previous index) then re-inserted from `symbols`.
pub file: IndexedFile,
/// Every symbol declared in the file, in deterministic insertion order.
pub symbols: Vec<IndexedSymbol>,
}

/// Aggregate counts for `status`/`index --stats`.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct IndexStats {
Expand Down
26 changes: 24 additions & 2 deletions src/graph/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
//! replaceable. No backend-specific type ever appears in this signature.

use crate::graph::model::{
FileSearchQuery, IndexStats, IndexedFile, IndexedPackage, IndexedProject, IndexedSymbol,
RelatedItem, SymbolSearchQuery,
FileSearchQuery, FileWrite, IndexStats, IndexedFile, IndexedPackage, IndexedProject,
IndexedSymbol, RelatedItem, SymbolSearchQuery,
};
use anyhow::Result;

Expand Down Expand Up @@ -68,6 +68,28 @@ pub trait GraphStore {
Ok(())
}

/// Apply many files' (re)index writes in one batch. For each [`FileWrite`]:
/// remove the file's existing nodes, upsert the file, upsert every declared
/// symbol, and create the `DECLARES` edges. Backends that support
/// transactions should write the whole batch in one transaction with reused
/// prepared statements — far faster than one auto-committed `upsert_symbol`
/// per symbol, which dominates indexing time on large repos. The default
/// implementation falls back to the per-file/per-symbol methods, preserving
/// the original ordering (remove -> upsert file -> upsert+link each symbol).
fn write_files_batch(&self, files: &[FileWrite]) -> Result<()> {
for fw in files {
self.remove_file(&fw.file.path)?;
let fid = fw.file.id.clone();
self.upsert_file(fw.file.clone())?;
for sym in &fw.symbols {
let sid = sym.id.clone();
self.upsert_symbol(sym.clone())?;
self.link_file_declares_symbol(&fid, &sid)?;
}
}
Ok(())
}

fn symbols_matching(&self, query: &SymbolSearchQuery) -> Result<Vec<IndexedSymbol>>;
fn files_matching(&self, query: &FileSearchQuery) -> Result<Vec<IndexedFile>>;
fn related_to_symbol(&self, symbol: &str, depth: usize) -> Result<Vec<RelatedItem>>;
Expand Down
Loading
Loading