diff --git a/codex-rs/core/src/session/background_auto_review.rs b/codex-rs/core/src/session/background_auto_review.rs index 74d3dce7314..4a9ab4dc5fe 100644 --- a/codex-rs/core/src/session/background_auto_review.rs +++ b/codex-rs/core/src/session/background_auto_review.rs @@ -1,9 +1,13 @@ +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use codex_auto_review::AutoReviewDuplicateDisposition; use codex_auto_review::AutoReviewDuplicateMatch; use codex_auto_review::AutoReviewStore; +use codex_auto_review::ReviewCoordination; +use codex_auto_review::ReviewLockGuard; +use codex_git_utils::get_git_repo_root; use codex_git_utils::get_worktree_diff_byte_count; use codex_git_utils::get_worktree_diff_fingerprint; use codex_protocol::protocol::BackgroundAutoReviewControlAction; @@ -27,8 +31,11 @@ use crate::review_persistence::ReviewPersistenceContext; use crate::review_prompts::resolve_review_request; use crate::state::BackgroundAutoReviewControlledRun; use crate::state::BackgroundAutoReviewRunningHandle; +use crate::turn_timing::now_unix_timestamp_ms; const BACKGROUND_AUTO_REVIEW_DEBOUNCE: Duration = Duration::from_secs(2); +const BACKGROUND_AUTO_REVIEW_LOCK_RETRY: Duration = Duration::from_millis(500); +const BACKGROUND_AUTO_REVIEW_LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(25); impl Session { pub(crate) async fn record_background_auto_review_turn_start( @@ -345,27 +352,128 @@ impl Session { ); return; }; - if let Some(displaced_running_review) = started_review.displaced_running_review - && displaced_running_review - .persistence - .save_cancelled(self.codex_home().await) + if let Some(displaced_running_review) = started_review.displaced_running_review { + self.cancel_running_background_auto_review( + displaced_running_review, + AUTO_REVIEW_INTERRUPTED_ERROR_SUMMARY.to_string(), + ) + .await; + } + let coordination = + ReviewCoordination::for_scope(self.codex_home().await, persistence.store_scope()); + let review_lock_guard = match acquire_background_auto_review_lock( + &coordination, + format!("background_auto_review:{}", persistence.run_id()), + ) + .await { - record_background_review_status( - Arc::clone(self), - &displaced_running_review.persistence, - BackgroundAutoReviewStatus::Cancelled, - Some(AUTO_REVIEW_INTERRUPTED_ERROR_SUMMARY.to_string()), + Ok(Some(guard)) => guard, + Ok(None) => { + let error_summary = + "another background auto review is already running for this worktree" + .to_string(); + self.clear_background_auto_review(generation).await; + self.record_skipped_background_auto_review( + &persistence, + generation, + &fingerprint, + error_summary, + ) + .await; + debug!("background auto review skipped: review lock is held"); + return; + } + Err(err) => { + let error_summary = format!("failed to acquire background auto review lock: {err}"); + self.clear_background_auto_review(generation).await; + self.record_skipped_background_auto_review( + &persistence, + generation, + &fingerprint, + error_summary, + ) + .await; + warn!(error = %err, "failed to acquire background auto review lock"); + return; + } + }; + if started_review + .running_review + .cancellation_token + .is_cancelled() + { + debug!("background auto review skipped after lock: review was cancelled"); + return; + } + if self.input_queue.has_trigger_turn_mailbox_items().await + || self.active_turn.lock().await.is_some() + { + let error_summary = + "foreground work became active before background auto review could start" + .to_string(); + self.clear_background_auto_review(generation).await; + self.record_skipped_background_auto_review( + &persistence, + generation, + &fingerprint, + error_summary, ) .await; + debug!("background auto review skipped after lock: foreground work active"); + return; } spawn_detached_review_thread( Arc::clone(self), prepared, started_review.running_review, + review_lock_guard, generation, ); } + pub(crate) async fn recover_background_auto_review_after_restart(&self) { + let (codex_home, scopes) = { + let state = self.state.lock().await; + let mut scopes = vec![state.session_configuration.cwd.clone()]; + scopes.extend( + state + .session_configuration + .environments + .iter() + .map(|environment| environment.cwd.clone()), + ); + (state.session_configuration.codex_home().clone(), scopes) + }; + let mut seen_scopes = HashSet::new(); + for cwd in scopes { + let scope = + get_git_repo_root(cwd.as_ref()).unwrap_or_else(|| cwd.as_ref().to_path_buf()); + if !seen_scopes.insert(scope.clone()) { + continue; + } + let coordination = ReviewCoordination::for_scope(&codex_home, &scope); + if let Err(err) = coordination.clear_stale_lock_if_dead() { + warn!(error = %err, "failed to clear stale background auto review lock"); + continue; + } + match coordination.read_lock_info() { + Ok(Some(_lock_info)) => continue, + Ok(None) => {} + Err(err) => { + warn!(error = %err, "failed to read background auto review lock"); + continue; + } + } + let store = AutoReviewStore::for_scope(&codex_home, &scope); + if let Err(err) = store.reconcile_orphaned_in_flight( + std::iter::empty::<&str>(), + now_unix_timestamp_ms() / 1000, + ) { + warn!(error = %err, "failed to reconcile durable background auto review runs"); + } + } + } + async fn record_pending_background_auto_review( &self, generation: u64, @@ -647,6 +755,22 @@ impl Session { } } +async fn acquire_background_auto_review_lock( + coordination: &ReviewCoordination, + intent: String, +) -> anyhow::Result> { + let deadline = tokio::time::Instant::now() + BACKGROUND_AUTO_REVIEW_LOCK_RETRY; + loop { + match coordination.try_acquire_lock(intent.clone())? { + Some(guard) => return Ok(Some(guard)), + None if tokio::time::Instant::now() < deadline => { + tokio::time::sleep(BACKGROUND_AUTO_REVIEW_LOCK_RETRY_INTERVAL).await; + } + None => return Ok(None), + } + } +} + async fn background_review_fingerprint(turn_context: &TurnContext) -> Option { let cwd = turn_context.environments.single_local_environment_cwd()?; background_review_fingerprint_for_cwd(cwd).await diff --git a/codex-rs/core/src/session/review.rs b/codex-rs/core/src/session/review.rs index de95ead7f89..e87f0ad3322 100644 --- a/codex-rs/core/src/session/review.rs +++ b/codex-rs/core/src/session/review.rs @@ -1,3 +1,4 @@ +use codex_auto_review::ReviewLockGuard; use codex_core_skills::HostLoadedSkills; use codex_protocol::openai_models::ToolMode; use codex_protocol::protocol::BackgroundAutoReviewStatus; @@ -284,6 +285,7 @@ pub(super) fn spawn_detached_review_thread( sess: Arc, prepared: PreparedReviewThread, running_review: BackgroundAutoReviewRunningHandle, + review_lock_guard: ReviewLockGuard, generation: u64, ) { let turn_extension_data = Arc::clone(&prepared.turn_context.extension_data); @@ -297,6 +299,7 @@ pub(super) fn spawn_detached_review_thread( let cancellation_token = running_review.cancellation_token; let completion = running_review.completion; tokio::spawn(async move { + let _review_lock_guard = review_lock_guard; task.run(session_ctx, turn_context, input, cancellation_token) .await; sess.clear_background_auto_review(generation).await; diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 1416468f4a2..075ce1133eb 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -1071,6 +1071,7 @@ impl Session { let mut guard = network_policy_decider_session.write().await; *guard = Arc::downgrade(&sess); } + sess.recover_background_auto_review_after_restart().await; // Dispatch the SessionConfiguredEvent first and then report any errors. // If resuming, include converted initial messages in the payload so UIs can render them immediately. let initial_messages = initial_history.get_event_msgs(); diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index c6f429f865e..a5225b7ed50 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -27,6 +27,7 @@ use codex_protocol::protocol::ReviewTarget; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::user_input::UserInput; +use codex_utils_absolute_path::AbsolutePathBuf; use core_test_support::PathBufExt; use core_test_support::responses; use core_test_support::responses::ResponseMock; @@ -714,6 +715,160 @@ async fn automatic_background_review_runs_after_file_changing_turn() -> anyhow:: Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn automatic_background_review_holds_coordination_lock_until_completion() -> anyhow::Result<()> +{ + skip_if_no_network!(Ok(())); + + let call_id = "auto-bg-lock-apply-patch"; + let patch = "*** Begin Patch\n*** Add File: auto_background_review_lock.txt\n+review lock\n*** End Patch"; + let review_json = serde_json::json!({ + "findings": [], + "overall_correctness": "patch is correct", + "overall_explanation": "Automatic background review completed.", + "overall_confidence_score": 0.9 + }) + .to_string(); + let (gate_background_completed_tx, gate_background_completed_rx) = oneshot::channel(); + let foreground_tool = vec![StreamingSseChunk { + gate: None, + body: responses::sse(vec![ + ev_response_created("resp-1"), + ev_apply_patch_custom_tool_call(call_id, patch), + ev_completed("resp-1"), + ]), + }]; + let foreground_complete = vec![StreamingSseChunk { + gate: None, + body: responses::sse(vec![ + ev_assistant_message("msg-1", "patch applied"), + ev_completed("resp-2"), + ]), + }]; + let background_review = vec![ + StreamingSseChunk { + gate: None, + body: streaming_sse_event(responses::ev_response_created("resp-3")), + }, + StreamingSseChunk { + gate: Some(gate_background_completed_rx), + body: streaming_sse_event(responses::ev_assistant_message("msg-3", &review_json)), + }, + StreamingSseChunk { + gate: None, + body: streaming_sse_event(responses::ev_completed("resp-3")), + }, + ]; + let (server, _completions) = start_streaming_sse_server(vec![ + foreground_tool, + foreground_complete, + background_review, + ]) + .await; + let codex_home = Arc::new(TempDir::new()?); + let test = test_codex() + .with_home(codex_home.clone()) + .build_with_streaming_server(&server) + .await?; + init_git_repo(test.cwd_path()); + + test.submit_turn("create a file to review").await?; + let pending_status = wait_for_background_auto_review_status( + test.codex.as_ref(), + BackgroundAutoReviewStatus::Pending, + None, + ) + .await; + let running_status = wait_for_background_auto_review_status( + test.codex.as_ref(), + BackgroundAutoReviewStatus::Running, + Some(pending_status.run_id.as_str()), + ) + .await; + let lock_info = wait_for_auto_review_lock(codex_home.path()).await; + assert_eq!( + lock_info.intent, + format!("background_auto_review:{}", running_status.run_id) + ); + assert_eq!(lock_info.git_head, current_git_head(test.cwd_path())); + + let _ = gate_background_completed_tx.send(()); + let completed_status = wait_for_background_auto_review_status( + test.codex.as_ref(), + BackgroundAutoReviewStatus::Completed, + Some(running_status.run_id.as_str()), + ) + .await; + assert_eq!(completed_status.error_summary, None); + wait_for_auto_review_lock_absent(codex_home.path()).await; + + let _codex_home_guard = codex_home; + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn session_startup_marks_orphaned_background_review_lost() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let codex_home = Arc::new(TempDir::new()?); + let cwd = Arc::new(TempDir::new()?); + let cwd_path = AbsolutePathBuf::try_from(cwd.path().to_path_buf())?; + init_git_repo(cwd.path()); + + let run = codex_auto_review::AutoReviewRun { + schema_version: AUTO_REVIEW_SCHEMA_VERSION, + run_id: "orphaned_background_review".to_string(), + status: AutoReviewRunStatus::Running, + freshness: codex_auto_review::AutoReviewRunFreshness::Current, + source: AutoReviewRunSource::Background, + target: codex_auto_review::AutoReviewRunTarget { + branch: Some("main".to_string()), + head_sha: current_git_head(cwd.path()), + base_sha: None, + worktree_path: Some(cwd.path().to_path_buf()), + worktree_diff_fingerprint: Some("orphaned-fingerprint".to_string()), + }, + review_target: ReviewTarget::UncommittedChanges, + started_at_unix_secs: 1, + completed_at_unix_secs: None, + model: Some("gpt-test".to_string()), + superseded_by: None, + cancel_reason: None, + error_summary: None, + findings: Vec::new(), + }; + AutoReviewStore::for_scope(codex_home.path(), cwd.path()) + .save_run(&run) + .expect("save orphaned background review run"); + + let cwd_path_for_config = cwd_path.clone(); + let test = test_codex() + .with_home(Arc::clone(&codex_home)) + .with_config(move |config| { + config.cwd = cwd_path_for_config; + }) + .build(&server) + .await?; + + let run = load_single_auto_review_run(codex_home.path())?; + + assert_eq!(run.run_id, "orphaned_background_review"); + assert_eq!(run.status, AutoReviewRunStatus::Lost); + assert_eq!(run.source, AutoReviewRunSource::Background); + assert_eq!( + run.cancel_reason.as_deref(), + Some("agent_missing_after_restart") + ); + assert!(run.completed_at_unix_secs.is_some()); + + let _test_guard = test; + let _codex_home_guard = codex_home; + let _cwd_guard = cwd; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_review_awareness_injected_for_current_findings() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); @@ -2211,6 +2366,63 @@ fn load_auto_review_runs( Ok(runs) } +fn load_auto_review_locks( + codex_home: &std::path::Path, +) -> anyhow::Result> { + let review_dir = codex_home.join("state/review"); + if !review_dir.exists() { + return Ok(Vec::new()); + } + let mut locks: Vec = Vec::new(); + for entry in std::fs::read_dir(&review_dir) + .map_err(|err| anyhow::anyhow!("auto review state dir: {err}"))? + { + let entry = entry?; + let lock_path = entry.path().join("review.lock"); + if !lock_path.exists() { + continue; + } + let text = std::fs::read_to_string(&lock_path)?; + locks.push(serde_json::from_str(&text)?); + } + locks.sort_by(|left, right| left.intent.cmp(&right.intent)); + Ok(locks) +} + +async fn wait_for_auto_review_lock( + codex_home: &std::path::Path, +) -> codex_auto_review::ReviewLockInfo { + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + loop { + if let Ok(mut locks) = load_auto_review_locks(codex_home) + && let Some(lock) = locks.pop() + { + assert!(locks.is_empty(), "expected one auto review lock"); + return lock; + } + assert!( + tokio::time::Instant::now() < deadline, + "timeout waiting for auto review lock" + ); + tokio::time::sleep(Duration::from_millis(50)).await; + } +} + +async fn wait_for_auto_review_lock_absent(codex_home: &std::path::Path) { + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + loop { + match load_auto_review_locks(codex_home) { + Ok(locks) if locks.is_empty() => return, + _ => {} + } + assert!( + tokio::time::Instant::now() < deadline, + "timeout waiting for auto review lock removal" + ); + tokio::time::sleep(Duration::from_millis(50)).await; + } +} + async fn expected_worktree_diff_fingerprint(cwd: &Path) -> String { get_worktree_diff_fingerprint(cwd) .await