From 6dbdfd6aecb2b018976137bb890a65f058031b36 Mon Sep 17 00:00:00 2001 From: shiny-code-bot Date: Thu, 18 Jun 2026 11:54:18 -0400 Subject: [PATCH] Add auto-review coordination locks --- codex-rs/Cargo.lock | 2 + codex-rs/auto-review/Cargo.toml | 9 + codex-rs/auto-review/src/lib.rs | 11 +- codex-rs/auto-review/src/review_coord.rs | 406 +++++++++++++++ .../auto-review/src/review_coord_tests.rs | 491 ++++++++++++++++++ 5 files changed, 918 insertions(+), 1 deletion(-) create mode 100644 codex-rs/auto-review/src/review_coord.rs create mode 100644 codex-rs/auto-review/src/review_coord_tests.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 9c9b831eb74a..3e8e89118db1 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2180,10 +2180,12 @@ dependencies = [ "codex-protocol", "codex-utils-path", "crc32fast", + "libc", "pretty_assertions", "serde", "serde_json", "tempfile", + "windows-sys 0.52.0", ] [[package]] diff --git a/codex-rs/auto-review/Cargo.toml b/codex-rs/auto-review/Cargo.toml index 7fd4aca82be5..77b3ed339e41 100644 --- a/codex-rs/auto-review/Cargo.toml +++ b/codex-rs/auto-review/Cargo.toml @@ -19,6 +19,15 @@ crc32fast = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +[target.'cfg(unix)'.dependencies] +libc = { workspace = true } + +[target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.52", features = [ + "Win32_Foundation", + "Win32_System_Threading", +] } + [dev-dependencies] pretty_assertions = { workspace = true } tempfile = { workspace = true } diff --git a/codex-rs/auto-review/src/lib.rs b/codex-rs/auto-review/src/lib.rs index 19c0729bd3da..491d02a5f052 100644 --- a/codex-rs/auto-review/src/lib.rs +++ b/codex-rs/auto-review/src/lib.rs @@ -11,6 +11,12 @@ use codex_utils_path::write_atomically; use serde::Deserialize; use serde::Serialize; +mod review_coord; + +pub use review_coord::ReviewCoordination; +pub use review_coord::ReviewLockGuard; +pub use review_coord::ReviewLockInfo; + pub const SUMMARY_MAX_FINDINGS: usize = 20; pub const SUMMARY_MAX_FIELD_BYTES: usize = 240; pub const SUMMARY_MAX_BYTES: usize = 4096; @@ -850,11 +856,14 @@ fn validate_safe_id(value: &str) -> Result<()> { } fn scoped_store_root(codex_home: &Path, scope: &Path) -> PathBuf { + scoped_review_root(codex_home, scope).join(STORE_DIR) +} + +fn scoped_review_root(codex_home: &Path, scope: &Path) -> PathBuf { codex_home .join(STATE_DIR) .join(REVIEW_DIR) .join(repo_key(scope)) - .join(STORE_DIR) } fn legacy_store_root(codex_home: &Path) -> PathBuf { diff --git a/codex-rs/auto-review/src/review_coord.rs b/codex-rs/auto-review/src/review_coord.rs new file mode 100644 index 000000000000..e097bbe35b8c --- /dev/null +++ b/codex-rs/auto-review/src/review_coord.rs @@ -0,0 +1,406 @@ +use std::fs; +use std::fs::OpenOptions; +use std::io; +use std::io::Write; +use std::path::Path; +use std::path::PathBuf; +use std::process::Command; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; + +use anyhow::Context; +use anyhow::Result; +use codex_utils_path::write_atomically; +use serde::Deserialize; +use serde::Serialize; + +use crate::scoped_review_root; + +const LOCK_FILENAME: &str = "review.lock"; +const EPOCH_FILENAME: &str = "snapshot.epoch"; +const EPOCH_LOCK_FILENAME: &str = "snapshot.epoch.lock"; +const MALFORMED_LOCK_STALE_SECS: u64 = 10 * 60; +const EPOCH_LOCK_STALE_SECS: u64 = 10 * 60; +const CLEANUP_LOCK_STALE_SECS: u64 = 10 * 60; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ReviewLockInfo { + pub pid: u32, + pub started_at_unix_secs: u64, + pub intent: String, + pub git_head: Option, + pub snapshot_epoch: u64, + #[serde(default)] + pub owner_id: String, +} + +#[derive(Debug, Clone)] +pub struct ReviewCoordination { + root: PathBuf, + scope: PathBuf, +} + +#[derive(Debug)] +pub struct ReviewLockGuard { + lock_path: PathBuf, + owner_id: String, +} + +impl ReviewCoordination { + pub fn for_scope(codex_home: impl AsRef, scope: impl AsRef) -> Self { + let scope = scope.as_ref().to_path_buf(); + Self { + root: scoped_review_root(codex_home.as_ref(), &scope), + scope, + } + } + + pub fn root(&self) -> &Path { + &self.root + } + + pub fn current_snapshot_epoch(&self) -> Result { + match fs::read_to_string(self.epoch_path()) { + Ok(text) => text.trim().parse::().with_context(|| { + format!( + "failed to parse snapshot epoch {}", + self.epoch_path().display() + ) + }), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(0), + Err(err) => Err(err).with_context(|| { + format!( + "failed to read snapshot epoch {}", + self.epoch_path().display() + ) + }), + } + } + + pub fn bump_snapshot_epoch(&self) -> Result { + let _guard = self.try_acquire_epoch_lock()?; + let current = self.current_snapshot_epoch()?; + let next = current.saturating_add(1); + self.write_snapshot_epoch(next)?; + Ok(next) + } + + pub fn try_acquire_lock(&self, intent: impl Into) -> Result> { + fs::create_dir_all(&self.root).with_context(|| { + format!("failed to create review state dir {}", self.root.display()) + })?; + + let lock_path = self.lock_path(); + if lock_path.exists() { + let _ = self.clear_stale_lock_if_dead()?; + } + let owner_id = new_owner_id(); + let file = OpenOptions::new() + .write(true) + .create_new(true) + .open(&lock_path); + + let mut file = match file { + Ok(file) => file, + Err(err) if err.kind() == io::ErrorKind::AlreadyExists => return Ok(None), + Err(err) => { + return Err(err).with_context(|| { + format!("failed to create review lock {}", lock_path.display()) + }); + } + }; + + let info = match self.lock_info(intent.into(), owner_id.clone()) { + Ok(info) => info, + Err(err) => { + let _ = fs::remove_file(&lock_path); + return Err(err); + } + }; + let body = serde_json::to_string_pretty(&info)?; + if let Err(err) = file.write_all(format!("{body}\n").as_bytes()) { + let _ = fs::remove_file(&lock_path); + return Err(err) + .with_context(|| format!("failed to write review lock {}", lock_path.display())); + } + Ok(Some(ReviewLockGuard { + lock_path, + owner_id, + })) + } + + fn lock_info(&self, intent: String, owner_id: String) -> Result { + Ok(ReviewLockInfo { + pid: std::process::id(), + started_at_unix_secs: now_unix_secs().unwrap_or_default(), + intent, + git_head: git_head(&self.scope), + snapshot_epoch: self.current_snapshot_epoch()?, + owner_id, + }) + } + + pub fn read_lock_info(&self) -> Result> { + match fs::read_to_string(self.lock_path()) { + Ok(text) => serde_json::from_str(&text) + .with_context(|| { + format!("failed to parse review lock {}", self.lock_path().display()) + }) + .map(Some), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None), + Err(err) => Err(err).with_context(|| { + format!("failed to read review lock {}", self.lock_path().display()) + }), + } + } + + pub fn clear_stale_lock_if_dead(&self) -> Result { + let lock_path = self.lock_path(); + let _cleanup_guard = try_acquire_cleanup_lock(&lock_path)?; + let text = match fs::read_to_string(&lock_path) { + Ok(text) => text, + Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(false), + Err(err) => { + return Err(err).with_context(|| { + format!("failed to read review lock {}", lock_path.display()) + }); + } + }; + let Ok(info) = serde_json::from_str::(&text) else { + return self.clear_malformed_lock_if_stale(&lock_path); + }; + if pid_alive(info.pid) { + return Ok(false); + } + match fs::remove_file(&lock_path) { + Ok(()) => Ok(true), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false), + Err(err) => Err(err).with_context(|| { + format!("failed to remove stale review lock {}", lock_path.display()) + }), + } + } + + pub(crate) fn lock_path(&self) -> PathBuf { + self.root.join(LOCK_FILENAME) + } + + pub(crate) fn epoch_path(&self) -> PathBuf { + self.root.join(EPOCH_FILENAME) + } + + fn write_snapshot_epoch(&self, next: u64) -> Result<()> { + write_atomically(&self.epoch_path(), &format!("{next}\n")).with_context(|| { + format!( + "failed to write snapshot epoch {}", + self.epoch_path().display() + ) + }) + } + + fn try_acquire_epoch_lock(&self) -> Result { + fs::create_dir_all(&self.root).with_context(|| { + format!("failed to create review state dir {}", self.root.display()) + })?; + let lock_path = self.root.join(EPOCH_LOCK_FILENAME); + loop { + match OpenOptions::new() + .write(true) + .create_new(true) + .open(&lock_path) + { + Ok(_) => return Ok(EpochLockGuard { lock_path }), + Err(err) if err.kind() == io::ErrorKind::AlreadyExists => { + let _ = clear_stale_path_if_old(&lock_path, EPOCH_LOCK_STALE_SECS)?; + std::thread::sleep(std::time::Duration::from_millis(10)); + } + Err(err) => { + return Err(err).with_context(|| { + format!( + "failed to create snapshot epoch lock {}", + lock_path.display() + ) + }); + } + } + } + } + + fn clear_malformed_lock_if_stale(&self, lock_path: &Path) -> Result { + clear_stale_path_if_old_unlocked(lock_path, MALFORMED_LOCK_STALE_SECS) + } +} + +struct EpochLockGuard { + lock_path: PathBuf, +} + +struct CleanupLockGuard { + lock_path: PathBuf, +} + +impl Drop for EpochLockGuard { + fn drop(&mut self) { + let _ = fs::remove_file(&self.lock_path); + } +} + +impl Drop for CleanupLockGuard { + fn drop(&mut self) { + let _ = fs::remove_file(&self.lock_path); + } +} + +impl Drop for ReviewLockGuard { + fn drop(&mut self) { + let Ok(text) = fs::read_to_string(&self.lock_path) else { + return; + }; + let Ok(info) = serde_json::from_str::(&text) else { + return; + }; + if info.owner_id == self.owner_id { + let _ = fs::remove_file(&self.lock_path); + } + } +} + +fn git_head(cwd: &Path) -> Option { + let output = Command::new("git") + .current_dir(cwd) + .args(["rev-parse", "HEAD"]) + .output() + .ok()?; + if !output.status.success() { + return None; + } + Some(String::from_utf8_lossy(&output.stdout).trim().to_string()) +} + +fn new_owner_id() -> String { + let nanos = now_unix_nanos().unwrap_or_default(); + format!("{}-{nanos}", std::process::id()) +} + +fn now_unix_secs() -> Option { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|duration| duration.as_secs()) +} + +fn now_unix_nanos() -> Option { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok()? + .as_nanos(); + Some(nanos.min(u128::from(u64::MAX)) as u64) +} + +#[cfg(unix)] +fn pid_alive(pid: u32) -> bool { + let res = unsafe { libc::kill(pid as libc::pid_t, 0) }; + if res == 0 { + return true; + } + let err = io::Error::last_os_error() + .raw_os_error() + .unwrap_or(libc::ESRCH); + err != libc::ESRCH +} + +#[cfg(not(unix))] +fn pid_alive(pid: u32) -> bool { + platform_pid_alive(pid) +} + +#[cfg(windows)] +fn platform_pid_alive(pid: u32) -> bool { + use windows_sys::Win32::Foundation::CloseHandle; + use windows_sys::Win32::Foundation::STILL_ACTIVE; + use windows_sys::Win32::System::Threading::GetExitCodeProcess; + use windows_sys::Win32::System::Threading::OpenProcess; + use windows_sys::Win32::System::Threading::PROCESS_QUERY_LIMITED_INFORMATION; + + let handle = unsafe { OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid) }; + if handle == 0 { + return false; + } + let mut code = 0; + let ok = unsafe { GetExitCodeProcess(handle, &mut code) }; + unsafe { CloseHandle(handle) }; + ok != 0 && code == STILL_ACTIVE +} + +#[cfg(not(any(unix, windows)))] +fn platform_pid_alive(pid: u32) -> bool { + pid == std::process::id() +} + +fn clear_stale_path_if_old(path: &Path, stale_secs: u64) -> Result { + let _cleanup_guard = try_acquire_cleanup_lock(path)?; + clear_stale_path_if_old_unlocked(path, stale_secs) +} + +fn clear_stale_path_if_old_unlocked(path: &Path, stale_secs: u64) -> Result { + let Ok(metadata) = fs::metadata(path) else { + return Ok(false); + }; + let Ok(modified) = metadata.modified() else { + return Ok(false); + }; + if !modified + .elapsed() + .is_ok_and(|elapsed| elapsed.as_secs() >= stale_secs) + { + return Ok(false); + } + match fs::remove_file(path) { + Ok(()) => Ok(true), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false), + Err(err) => { + Err(err).with_context(|| format!("failed to remove stale path {}", path.display())) + } + } +} + +fn try_acquire_cleanup_lock(target_path: &Path) -> Result { + let cleanup_path = cleanup_lock_path(target_path); + loop { + match OpenOptions::new() + .write(true) + .create_new(true) + .open(&cleanup_path) + { + Ok(_) => { + return Ok(CleanupLockGuard { + lock_path: cleanup_path, + }); + } + Err(err) if err.kind() == io::ErrorKind::AlreadyExists => { + let _ = clear_stale_path_if_old_unlocked(&cleanup_path, CLEANUP_LOCK_STALE_SECS)?; + std::thread::sleep(std::time::Duration::from_millis(10)); + } + Err(err) => { + return Err(err).with_context(|| { + format!( + "failed to create review cleanup lock {}", + cleanup_path.display() + ) + }); + } + } + } +} + +fn cleanup_lock_path(target_path: &Path) -> PathBuf { + let file_name = target_path + .file_name() + .map(|file_name| file_name.to_string_lossy()) + .unwrap_or_default(); + target_path.with_file_name(format!("{file_name}.cleanup")) +} + +#[cfg(test)] +#[path = "review_coord_tests.rs"] +mod tests; diff --git a/codex-rs/auto-review/src/review_coord_tests.rs b/codex-rs/auto-review/src/review_coord_tests.rs new file mode 100644 index 000000000000..b92b31a0aba0 --- /dev/null +++ b/codex-rs/auto-review/src/review_coord_tests.rs @@ -0,0 +1,491 @@ +use std::fs; +use std::process::Command; +use std::time::Duration; +use std::time::SystemTime; + +use pretty_assertions::assert_eq; +use tempfile::TempDir; + +use super::*; +use crate::AutoReviewStore; +use crate::scoped_store_root; + +fn temp_repo() -> TempDir { + let repo = TempDir::new().expect("temp repo"); + Command::new("git") + .current_dir(repo.path()) + .args(["init", "--quiet"]) + .status() + .expect("git init"); + Command::new("git") + .current_dir(repo.path()) + .args(["config", "user.email", "test@example.com"]) + .status() + .expect("git config email"); + Command::new("git") + .current_dir(repo.path()) + .args(["config", "user.name", "Test User"]) + .status() + .expect("git config name"); + fs::write(repo.path().join("README.md"), "hello\n").expect("write readme"); + Command::new("git") + .current_dir(repo.path()) + .args(["add", "README.md"]) + .status() + .expect("git add"); + Command::new("git") + .current_dir(repo.path()) + .args(["commit", "--quiet", "-m", "initial"]) + .status() + .expect("git commit"); + repo +} + +fn git_head(repo: &TempDir) -> String { + let output = Command::new("git") + .current_dir(repo.path()) + .args(["rev-parse", "HEAD"]) + .output() + .expect("git rev-parse"); + assert!(output.status.success()); + String::from_utf8(output.stdout) + .expect("utf8 head") + .trim() + .to_string() +} + +#[test] +fn coordination_files_share_scoped_review_root_with_auto_review_store() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + + assert_eq!( + coordination.root(), + scoped_store_root(home.path(), repo.path()) + .parent() + .unwrap() + ); + assert_eq!( + coordination.lock_path(), + coordination.root().join("review.lock") + ); + assert_eq!( + coordination.epoch_path(), + coordination.root().join("snapshot.epoch") + ); +} + +#[test] +fn coordination_only_does_not_make_has_store_files_true() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + + let guard = coordination + .try_acquire_lock("coordination-only") + .expect("lock attempt") + .expect("lock acquired"); + coordination.bump_snapshot_epoch().expect("bump epoch"); + + assert!(!AutoReviewStore::has_store_files(home.path())); + drop(guard); +} + +#[test] +fn scoped_locks_separate_repositories_under_one_codex_home() { + let home = TempDir::new().expect("temp home"); + let repo_a = TempDir::new().expect("repo a"); + let repo_b = TempDir::new().expect("repo b"); + let coord_a = ReviewCoordination::for_scope(home.path(), repo_a.path()); + let coord_b = ReviewCoordination::for_scope(home.path(), repo_b.path()); + + let guard_a = coord_a + .try_acquire_lock("repo-a") + .expect("lock repo a") + .expect("repo a lock acquired"); + let guard_b = coord_b + .try_acquire_lock("repo-b") + .expect("lock repo b") + .expect("repo b lock acquired"); + + assert_ne!(coord_a.root(), coord_b.root()); + drop(guard_a); + drop(guard_b); +} + +#[test] +fn lock_contention_and_release() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + + let guard = coordination + .try_acquire_lock("first") + .expect("first lock") + .expect("first lock acquired"); + assert!( + coordination + .try_acquire_lock("second") + .expect("second lock") + .is_none() + ); + + drop(guard); + assert!( + coordination + .try_acquire_lock("third") + .expect("third lock") + .is_some() + ); +} + +#[test] +fn lock_records_intent_pid_head_and_epoch() { + let home = TempDir::new().expect("temp home"); + let repo = temp_repo(); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + let epoch = coordination.bump_snapshot_epoch().expect("bump epoch"); + + let guard = coordination + .try_acquire_lock("record-info") + .expect("lock") + .expect("lock acquired"); + let info = coordination + .read_lock_info() + .expect("lock info") + .expect("lock info exists"); + + assert_eq!(info.pid, std::process::id()); + assert_eq!(info.intent, "record-info"); + assert_eq!(info.git_head, Some(git_head(&repo))); + assert_eq!(info.snapshot_epoch, epoch); + assert!(!info.owner_id.is_empty()); + drop(guard); +} + +#[test] +fn epoch_bump_is_repo_scoped_and_monotonic() { + let home = TempDir::new().expect("temp home"); + let repo_a = TempDir::new().expect("repo a"); + let repo_b = TempDir::new().expect("repo b"); + let coord_a = ReviewCoordination::for_scope(home.path(), repo_a.path()); + let coord_b = ReviewCoordination::for_scope(home.path(), repo_b.path()); + + let a0 = coord_a.current_snapshot_epoch().expect("a0"); + let b0 = coord_b.current_snapshot_epoch().expect("b0"); + let a1 = coord_a.bump_snapshot_epoch().expect("a1"); + let a2 = coord_a.bump_snapshot_epoch().expect("a2"); + + assert!(a1 > a0); + assert!(a2 > a1); + assert_eq!(coord_b.current_snapshot_epoch().expect("b current"), b0); +} + +#[test] +fn epoch_bump_waits_for_existing_epoch_lock() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + let epoch_lock = coordination.root().join("snapshot.epoch.lock"); + fs::write(&epoch_lock, "busy").expect("write epoch lock"); + + std::thread::scope(|scope| { + let handle = scope.spawn(|| coordination.bump_snapshot_epoch().expect("bump epoch")); + std::thread::sleep(Duration::from_millis(50)); + assert!(epoch_lock.exists()); + fs::remove_file(&epoch_lock).expect("release epoch lock"); + assert!(handle.join().expect("join epoch bump") > 0); + }); +} + +#[test] +fn epoch_stale_cleanup_waits_for_cleanup_lock_before_removing() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + let epoch_lock = coordination.root().join("snapshot.epoch.lock"); + let cleanup_lock = cleanup_lock_path(&epoch_lock); + fs::write(&epoch_lock, "stale").expect("write stale epoch lock"); + let stale_time = SystemTime::now() - Duration::from_secs(EPOCH_LOCK_STALE_SECS + 1); + let file = fs::OpenOptions::new() + .write(true) + .open(&epoch_lock) + .expect("open epoch lock"); + file.set_modified(stale_time).expect("set stale mtime"); + fs::write(&cleanup_lock, "busy").expect("write cleanup lock"); + + std::thread::scope(|scope| { + let handle = scope.spawn(|| coordination.bump_snapshot_epoch().expect("bump epoch")); + std::thread::sleep(Duration::from_millis(50)); + fs::write(&epoch_lock, "fresh").expect("replace epoch lock"); + fs::remove_file(&cleanup_lock).expect("release cleanup lock"); + std::thread::sleep(Duration::from_millis(50)); + assert_eq!( + fs::read_to_string(&epoch_lock).expect("epoch lock"), + "fresh" + ); + fs::remove_file(&epoch_lock).expect("release epoch lock"); + assert!(handle.join().expect("join epoch bump") > 0); + }); +} + +#[test] +fn epoch_bump_clears_stale_epoch_lock() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + let epoch_lock = coordination.root().join("snapshot.epoch.lock"); + fs::write(&epoch_lock, "stale").expect("write epoch lock"); + let stale_time = SystemTime::now() - Duration::from_secs(EPOCH_LOCK_STALE_SECS + 1); + let file = fs::OpenOptions::new() + .write(true) + .open(&epoch_lock) + .expect("open epoch lock"); + file.set_modified(stale_time).expect("set stale mtime"); + + let epoch = coordination.bump_snapshot_epoch().expect("bump epoch"); + + assert!(epoch > 0); + assert!(!epoch_lock.exists()); +} + +#[test] +fn lock_info_survives_epoch_bump_for_stale_detection() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + + let guard = coordination + .try_acquire_lock("stale-check") + .expect("lock") + .expect("lock acquired"); + let initial = coordination + .read_lock_info() + .expect("lock info") + .expect("lock info exists"); + let current = coordination.bump_snapshot_epoch().expect("bump epoch"); + let still_recorded = coordination + .read_lock_info() + .expect("lock info") + .expect("lock info exists"); + + assert_eq!(still_recorded.snapshot_epoch, initial.snapshot_epoch); + assert!(current > initial.snapshot_epoch); + drop(guard); +} + +#[test] +fn guard_drop_does_not_remove_replaced_lock() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + + let guard = coordination + .try_acquire_lock("old") + .expect("lock") + .expect("lock acquired"); + let replacement = ReviewLockInfo { + pid: std::process::id(), + started_at_unix_secs: 1, + intent: "new".to_string(), + git_head: None, + snapshot_epoch: 0, + owner_id: "different-owner".to_string(), + }; + fs::write( + coordination.lock_path(), + serde_json::to_string_pretty(&replacement).expect("serialize replacement"), + ) + .expect("write replacement lock"); + + drop(guard); + + let info = coordination + .read_lock_info() + .expect("lock info") + .expect("replacement remains"); + assert_eq!(info.owner_id, "different-owner"); +} + +#[test] +fn malformed_lock_is_not_cleared() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + fs::write(coordination.lock_path(), "not json").expect("write malformed lock"); + + assert!(coordination.read_lock_info().is_err()); + assert!( + !coordination + .clear_stale_lock_if_dead() + .expect("clear stale lock") + ); + assert!(coordination.lock_path().exists()); + assert!( + coordination + .try_acquire_lock("blocked") + .expect("lock attempt") + .is_none() + ); +} + +#[test] +fn stale_malformed_lock_is_cleared() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + fs::write(coordination.lock_path(), "not json").expect("write malformed lock"); + let stale_time = SystemTime::now() - Duration::from_secs(MALFORMED_LOCK_STALE_SECS + 1); + let file = fs::OpenOptions::new() + .write(true) + .open(coordination.lock_path()) + .expect("open malformed lock"); + file.set_modified(stale_time).expect("set stale mtime"); + + assert!( + coordination + .clear_stale_lock_if_dead() + .expect("clear stale lock") + ); + assert!(!coordination.lock_path().exists()); +} + +#[test] +fn acquisition_clears_stale_malformed_lock() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + fs::write(coordination.lock_path(), "not json").expect("write malformed lock"); + let stale_time = SystemTime::now() - Duration::from_secs(MALFORMED_LOCK_STALE_SECS + 1); + let file = fs::OpenOptions::new() + .write(true) + .open(coordination.lock_path()) + .expect("open malformed lock"); + file.set_modified(stale_time).expect("set stale mtime"); + + let guard = coordination + .try_acquire_lock("after-stale-malformed") + .expect("lock attempt") + .expect("lock acquired"); + + drop(guard); +} + +#[test] +fn review_stale_cleanup_waits_for_cleanup_lock_before_removing() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + let cleanup_lock = cleanup_lock_path(&coordination.lock_path()); + fs::write(coordination.lock_path(), "not json").expect("write stale malformed lock"); + let stale_time = SystemTime::now() - Duration::from_secs(MALFORMED_LOCK_STALE_SECS + 1); + let file = fs::OpenOptions::new() + .write(true) + .open(coordination.lock_path()) + .expect("open malformed lock"); + file.set_modified(stale_time).expect("set stale mtime"); + fs::write(&cleanup_lock, "busy").expect("write cleanup lock"); + + std::thread::scope(|scope| { + let handle = scope.spawn(|| coordination.clear_stale_lock_if_dead()); + std::thread::sleep(Duration::from_millis(50)); + fs::write(coordination.lock_path(), "fresh").expect("replace review lock"); + fs::remove_file(&cleanup_lock).expect("release cleanup lock"); + assert!( + !handle + .join() + .expect("join cleanup") + .expect("cleanup result") + ); + }); + assert_eq!( + fs::read_to_string(coordination.lock_path()).expect("review lock"), + "fresh" + ); +} + +#[test] +fn lock_creation_failure_removes_partial_lock() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + fs::write(coordination.epoch_path(), "not a number").expect("write malformed epoch"); + + assert!(coordination.try_acquire_lock("bad-epoch").is_err()); + assert!(!coordination.lock_path().exists()); +} + +#[test] +#[cfg(unix)] +fn valid_lock_with_dead_pid_is_cleared() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + let mut child = Command::new("sh") + .arg("-c") + .arg("exit 0") + .spawn() + .expect("spawn child"); + let pid = child.id(); + child.wait().expect("wait child"); + assert!(!pid_alive(pid)); + let info = ReviewLockInfo { + pid, + started_at_unix_secs: 1, + intent: "dead".to_string(), + git_head: None, + snapshot_epoch: 0, + owner_id: "dead-owner".to_string(), + }; + fs::write( + coordination.lock_path(), + serde_json::to_string_pretty(&info).expect("serialize lock"), + ) + .expect("write lock"); + + assert!( + coordination + .clear_stale_lock_if_dead() + .expect("clear stale lock") + ); + assert!(!coordination.lock_path().exists()); +} + +#[test] +#[cfg(unix)] +fn valid_lock_with_live_pid_is_not_cleared() { + let home = TempDir::new().expect("temp home"); + let repo = TempDir::new().expect("temp repo"); + let coordination = ReviewCoordination::for_scope(home.path(), repo.path()); + fs::create_dir_all(coordination.root()).expect("coordination root"); + let info = ReviewLockInfo { + pid: std::process::id(), + started_at_unix_secs: 1, + intent: "live".to_string(), + git_head: None, + snapshot_epoch: 0, + owner_id: "live-owner".to_string(), + }; + fs::write( + coordination.lock_path(), + serde_json::to_string_pretty(&info).expect("serialize lock"), + ) + .expect("write lock"); + + assert!( + !coordination + .clear_stale_lock_if_dead() + .expect("clear stale lock") + ); + assert!(coordination.lock_path().exists()); +}