Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 25 additions & 24 deletions src/graph/ladybug_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ fn parse_kind(s: &str) -> SymbolKind {
SymbolKind::from_str_opt(s).unwrap_or(SymbolKind::Function)
}

/// Execute one `UNWIND $rows`-style batch stage, moving `rows` into the list
/// parameter so no clone is needed. An empty list is a no-op.
fn run_stage(conn: &Connection<'_>, cypher: &str, rows: Vec<Value>) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let child_ty: lbug::LogicalType = (&rows[0]).into();
let mut stmt = conn
.prepare(cypher)
.map_err(|e| anyhow!("preparing `{cypher}`: {e}"))?;
conn.execute(&mut stmt, vec![("rows", Value::List(child_ty, rows))])
.map_err(|e| anyhow!("executing `{cypher}`: {e}"))?;
Ok(())
}

impl GraphStore for LadybugGraphStore {
fn initialize_schema(&self) -> Result<()> {
let _guard = self.lock.lock().unwrap();
Expand Down Expand Up @@ -561,33 +576,19 @@ impl GraphStore for LadybugGraphStore {
})
.collect();

// (cypher, rows): runs in order. Empty row lists are skipped.
let stages: [(&str, &Vec<Value>); 6] = [
(REMOVE_DECLARED, &path_rows),
(REMOVE_BY_FILEPATH, &path_rows),
(REMOVE_FILE, &path_rows),
(UPSERT_FILE, &file_rows),
(UPSERT_SYMBOL, &symbol_rows),
(LINK_DECLARES, &declares_rows),
];

conn.query("BEGIN TRANSACTION")
.map_err(|e| anyhow!("begin transaction: {e}"))?;
let result = (|| -> Result<()> {
for (q, rows) in stages {
if rows.is_empty() {
continue;
}
let child_ty: lbug::LogicalType = (&rows[0]).into();
let mut stmt = conn
.prepare(q)
.map_err(|e| anyhow!("preparing `{q}`: {e}"))?;
conn.execute(
&mut stmt,
vec![("rows", Value::List(child_ty, rows.clone()))],
)
.map_err(|e| anyhow!("executing `{q}`: {e}"))?;
}
// Run each stage in order; empty row lists are skipped. The three
// `path_rows` stages share one list, so it's cloned per use (it's the
// small per-file list). The large per-symbol lists (`symbol_rows`,
// `declares_rows`) are single-use and moved in by value — no clone.
run_stage(&conn, REMOVE_DECLARED, path_rows.clone())?;
run_stage(&conn, REMOVE_BY_FILEPATH, path_rows.clone())?;
run_stage(&conn, REMOVE_FILE, path_rows)?;
run_stage(&conn, UPSERT_FILE, file_rows)?;
run_stage(&conn, UPSERT_SYMBOL, symbol_rows)?;
run_stage(&conn, LINK_DECLARES, declares_rows)?;
Ok(())
})();
match result {
Expand Down
23 changes: 18 additions & 5 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ fn parse_file(ctx: &ParseContext<'_>, rel: &str) -> Result<FileWork> {
}

// Manifest parsing -> projects/packages/edges (pure parse only). A parse
// error aborts the whole index, exactly as the previous inline `?` did.
// error propagates out of `parse_file`; the sequential drain surfaces it via
// `work?` and aborts the run *before* the batched write stage executes. This
// is a more atomic failure mode than the old write-as-you-go loop, which
// could leave earlier files already committed when a later manifest failed.
let manifest = if rel.ends_with(".csproj") {
Some(parse_csproj_manifest(rel, &abs, ctx.central)?)
} else if rel.ends_with("package.json") {
Expand Down Expand Up @@ -321,8 +324,7 @@ pub fn index_repo(
let parsed_count = &parsed_count;
let done = &done;
scope.spawn(move || {
loop {
let n = parsed_count.load(std::sync::atomic::Ordering::Relaxed);
let emit = |n: usize| {
cb(
"",
&IndexProgress {
Expand All @@ -333,7 +335,18 @@ pub fn index_repo(
..IndexProgress::default()
},
);
if done.load(std::sync::atomic::Ordering::Acquire) {
};
loop {
// Observe `done` first, then read the counter. `done` is
// stored (Release) only after `par_iter().collect()` has
// joined every worker, so all the `fetch_add(Release)`
// increments happen-before it. Reading `done` via Acquire
// therefore guarantees the following Acquire load sees the
// final count — emit it and stop, so the last update always
// reflects the true total rather than a stale lower value.
let finished = done.load(std::sync::atomic::Ordering::Acquire);
emit(parsed_count.load(std::sync::atomic::Ordering::Acquire));
if finished {
break;
}
std::thread::sleep(std::time::Duration::from_millis(80));
Expand All @@ -344,7 +357,7 @@ pub fn index_repo(
.par_iter()
.map(|rel| {
let w = parse_file(&parse_ctx, rel);
parsed_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
parsed_count.fetch_add(1, std::sync::atomic::Ordering::Release);
w
})
.collect();
Expand Down
Loading