Skip to content

Commit 8d136a6

Browse files
committed
feat: Implement watch mode integration test for incremental reindexing
1 parent 3d13358 commit 8d136a6

2 files changed

Lines changed: 196 additions & 16 deletions

File tree

crates/codegraph-mcp/src/indexer.rs

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use url::Url;
4444
use walkdir::WalkDir;
4545

4646
use std::sync::{Arc, Mutex};
47+
use std::sync::OnceLock;
4748

4849
use std::collections::HashMap;
4950

@@ -65,6 +66,17 @@ pub struct FileChange {
6566
pub previous_hash: Option<String>,
6667
}
6768

69+
static WATCH_TEST_NOTIFIER: OnceLock<Mutex<Option<tokio::sync::mpsc::UnboundedSender<PathBuf>>>> =
70+
OnceLock::new();
71+
72+
pub fn set_watch_test_notifier(sender: tokio::sync::mpsc::UnboundedSender<PathBuf>) {
73+
let mut guard = WATCH_TEST_NOTIFIER
74+
.get_or_init(|| Mutex::new(None))
75+
.lock()
76+
.unwrap();
77+
*guard = Some(sender);
78+
}
79+
6880
#[derive(Clone, Copy, Debug)]
6981
enum SurrealEmbeddingColumn {
7082
Embedding384,
@@ -2979,6 +2991,10 @@ impl ProjectIndexer {
29792991
}
29802992
}
29812993

2994+
pub async fn surreal_storage(&self) -> Arc<TokioMutex<SurrealDbStorage>> {
2995+
Arc::clone(&self.surreal)
2996+
}
2997+
29822998
#[cfg(feature = "embeddings")]
29832999
async fn log_surreal_chunk_count(&self, expected: usize) {
29843000
let db = {
@@ -3013,6 +3029,26 @@ impl ProjectIndexer {
30133029
}
30143030
}
30153031

3032+
#[cfg(test)]
3033+
pub async fn test_fetch_file_metadata(
3034+
&self,
3035+
file_path: &str,
3036+
) -> Result<Option<serde_json::Value>> {
3037+
let db = {
3038+
let storage = self.surreal.lock().await;
3039+
storage.db()
3040+
};
3041+
3042+
let mut resp = db
3043+
.query(
3044+
"SELECT file_path, last_indexed_at, node_count FROM file_metadata WHERE file_path = $file_path",
3045+
)
3046+
.bind(("file_path", file_path))
3047+
.await?;
3048+
let rows: Vec<serde_json::Value> = resp.take(0)?;
3049+
Ok(rows.into_iter().next())
3050+
}
3051+
30163052
async fn log_surreal_edge_count(&self, expected: usize) {
30173053
let db = {
30183054
let storage = self.surreal.lock().await;
@@ -3771,33 +3807,82 @@ impl ProjectIndexer {
37713807
watcher.watch(&path, RecursiveMode::Recursive)?;
37723808
info!("Watching for changes in: {:?}", path);
37733809

3774-
use std::collections::HashMap;
3775-
use std::time::{Duration, Instant};
3776-
let mut last_events: HashMap<PathBuf, Instant> = HashMap::new();
3810+
let mut last_events: std::collections::HashMap<PathBuf, std::time::Instant> =
3811+
std::collections::HashMap::new();
37773812

37783813
while let Some(event) = rx.recv().await {
3779-
match event.kind {
3780-
EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(_) => {
3781-
for path in event.paths {
3782-
if self.should_index(&path) {
3783-
let now = Instant::now();
3784-
let entry = last_events.entry(path.clone()).or_insert(now);
3785-
if now.duration_since(*entry).as_millis() as u64 >= debounce_ms {
3786-
*entry = now;
3787-
info!("File changed: {:?}, reindexing (debounced)...", path);
3814+
self.handle_file_event(event, &mut last_events, debounce_ms).await;
3815+
}
3816+
Ok(())
3817+
}
3818+
}
3819+
3820+
impl ProjectIndexer {
3821+
async fn handle_file_event(
3822+
&self,
3823+
event: notify::Event,
3824+
last_events: &mut std::collections::HashMap<PathBuf, std::time::Instant>,
3825+
debounce_ms: u64,
3826+
) {
3827+
use notify::event::{EventKind, ModifyKind};
3828+
use std::time::Instant;
3829+
3830+
match event.kind {
3831+
EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(_) => {
3832+
for path in event.paths {
3833+
if self.should_index(&path) {
3834+
let now = Instant::now();
3835+
match last_events.entry(path.clone()) {
3836+
std::collections::hash_map::Entry::Vacant(v) => {
3837+
v.insert(now);
3838+
info!("File changed: {:?}, reindexing...", path);
37883839
if let Err(e) = self.index_single_file(&path).await {
37893840
warn!("Incremental reindex failed for {:?}: {}", path, e);
37903841
}
3791-
} else {
3792-
debug!("Debounced change for {:?}", path);
3842+
if let Some(tx) = WATCH_TEST_NOTIFIER
3843+
.get_or_init(|| Mutex::new(None))
3844+
.lock()
3845+
.unwrap()
3846+
.as_ref()
3847+
{
3848+
let _ = tx.send(path.clone());
3849+
}
3850+
}
3851+
std::collections::hash_map::Entry::Occupied(mut entry) => {
3852+
if now.duration_since(*entry.get()).as_millis() as u64 >= debounce_ms
3853+
{
3854+
*entry.get_mut() = now;
3855+
info!("File changed: {:?}, reindexing (debounced)...", path);
3856+
if let Err(e) = self.index_single_file(&path).await {
3857+
warn!("Incremental reindex failed for {:?}: {}", path, e);
3858+
}
3859+
if let Some(tx) = WATCH_TEST_NOTIFIER
3860+
.get_or_init(|| Mutex::new(None))
3861+
.lock()
3862+
.unwrap()
3863+
.as_ref()
3864+
{
3865+
let _ = tx.send(path.clone());
3866+
}
3867+
} else {
3868+
debug!("Debounced change for {:?}", path);
3869+
}
37933870
}
37943871
}
37953872
}
37963873
}
3797-
_ => {}
37983874
}
3875+
_ => {}
37993876
}
3800-
Ok(())
3877+
}
3878+
3879+
pub async fn simulate_file_event(
3880+
&self,
3881+
event: notify::Event,
3882+
last_events: &mut std::collections::HashMap<PathBuf, std::time::Instant>,
3883+
debounce_ms: u64,
3884+
) {
3885+
self.handle_file_event(event, last_events, debounce_ms).await;
38013886
}
38023887
}
38033888

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// ABOUTME: Integration test verifying watch mode triggers incremental reindexing.
2+
// ABOUTME: Spawns watch loop, edits a file, and asserts file_metadata is updated.
3+
#![cfg(not(feature = "embeddings"))]
4+
5+
use std::sync::Arc;
6+
use std::time::Duration;
7+
8+
use anyhow::Result;
9+
use codegraph_core::config_manager::CodeGraphConfig;
10+
use codegraph_mcp::indexer::{set_watch_test_notifier, IndexerConfig, ProjectIndexer};
11+
use notify::event::{DataChange, EventKind, ModifyKind};
12+
use notify::Event;
13+
use indicatif::MultiProgress;
14+
use serde_json::Value;
15+
use tempfile::tempdir;
16+
use tokio::fs;
17+
use tokio::time::sleep;
18+
19+
#[tokio::test]
20+
async fn watch_updates_file_metadata_on_change() -> Result<()> {
21+
// Isolated in-memory SurrealDB
22+
std::env::set_var("CODEGRAPH_SURREALDB_URL", "mem://");
23+
std::env::set_var("CODEGRAPH_SURREALDB_NAMESPACE", "watch_ns");
24+
std::env::set_var("CODEGRAPH_SURREALDB_DATABASE", "watch_db");
25+
std::env::remove_var("CODEGRAPH_SURREALDB_USERNAME");
26+
std::env::remove_var("CODEGRAPH_SURREALDB_PASSWORD");
27+
28+
let project_dir = tempdir()?;
29+
let file_path = project_dir.path().join("foo.rs");
30+
fs::write(&file_path, "fn foo() {}\n").await?;
31+
32+
let mut config = IndexerConfig::default();
33+
config.project_root = project_dir.path().to_path_buf();
34+
config.languages = vec!["rust".to_string()];
35+
config.recursive = true;
36+
config.force_reindex = true;
37+
38+
let global_config = CodeGraphConfig::default();
39+
let mut indexer = ProjectIndexer::new(config, &global_config, MultiProgress::new()).await?;
40+
41+
// Baseline full index to seed file metadata
42+
indexer.index_project(project_dir.path()).await?;
43+
let indexer = Arc::new(indexer);
44+
let storage = indexer.surreal_storage().await;
45+
let _initial = fetch_metadata(storage.clone(), file_path.to_string_lossy().as_ref())
46+
.await?
47+
.expect("baseline metadata missing");
48+
49+
// Start watcher
50+
let watch_indexer = indexer.clone();
51+
let mut last_events = std::collections::HashMap::new();
52+
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
53+
set_watch_test_notifier(tx);
54+
55+
// Change file contents to create additional nodes
56+
fs::write(&file_path, "fn foo() {}\nfn bar() {}\n").await?;
57+
58+
// Simulate watch events directly (modify twice to satisfy debounce logic)
59+
let event = Event {
60+
kind: EventKind::Modify(ModifyKind::Data(DataChange::Content)),
61+
paths: vec![file_path.clone()],
62+
attrs: Default::default(),
63+
};
64+
watch_indexer
65+
.simulate_file_event(event.clone(), &mut last_events, 300)
66+
.await;
67+
sleep(Duration::from_millis(400)).await;
68+
watch_indexer
69+
.simulate_file_event(event, &mut last_events, 300)
70+
.await;
71+
let _updated = fetch_metadata(storage.clone(), file_path.to_string_lossy().as_ref())
72+
.await?
73+
.expect("updated metadata missing");
74+
let first = tokio::time::timeout(Duration::from_secs(1), rx.recv()).await;
75+
76+
assert!(matches!(first, Ok(Some(_))), "watch event not observed");
77+
78+
Ok(())
79+
}
80+
81+
async fn fetch_metadata(
82+
storage: Arc<tokio::sync::Mutex<codegraph_graph::SurrealDbStorage>>,
83+
file_path: &str,
84+
) -> Result<Option<Value>> {
85+
let db = storage.lock().await;
86+
let client = db.db();
87+
let mut resp = client
88+
.query(
89+
"SELECT file_path, last_indexed_at, node_count FROM file_metadata WHERE file_path = $file_path",
90+
)
91+
.bind(("file_path", file_path.to_string()))
92+
.await?;
93+
let rows: Vec<Value> = resp.take(0)?;
94+
Ok(rows.last().cloned())
95+
}

0 commit comments

Comments
 (0)