Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 133 additions & 9 deletions codex-rs/core/src/session/background_auto_review.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use codex_auto_review::AutoReviewDuplicateDisposition;
use codex_auto_review::AutoReviewDuplicateMatch;
use codex_auto_review::AutoReviewStore;
use codex_auto_review::ReviewCoordination;
use codex_auto_review::ReviewLockGuard;
use codex_git_utils::get_git_repo_root;
use codex_git_utils::get_worktree_diff_byte_count;
use codex_git_utils::get_worktree_diff_fingerprint;
use codex_protocol::protocol::BackgroundAutoReviewControlAction;
Expand All @@ -27,8 +31,11 @@ use crate::review_persistence::ReviewPersistenceContext;
use crate::review_prompts::resolve_review_request;
use crate::state::BackgroundAutoReviewControlledRun;
use crate::state::BackgroundAutoReviewRunningHandle;
use crate::turn_timing::now_unix_timestamp_ms;

const BACKGROUND_AUTO_REVIEW_DEBOUNCE: Duration = Duration::from_secs(2);
const BACKGROUND_AUTO_REVIEW_LOCK_RETRY: Duration = Duration::from_millis(500);
const BACKGROUND_AUTO_REVIEW_LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(25);

impl Session {
pub(crate) async fn record_background_auto_review_turn_start(
Expand Down Expand Up @@ -345,27 +352,128 @@ impl Session {
);
return;
};
if let Some(displaced_running_review) = started_review.displaced_running_review
&& displaced_running_review
.persistence
.save_cancelled(self.codex_home().await)
if let Some(displaced_running_review) = started_review.displaced_running_review {
self.cancel_running_background_auto_review(
displaced_running_review,
AUTO_REVIEW_INTERRUPTED_ERROR_SUMMARY.to_string(),
)
.await;
}
let coordination =
ReviewCoordination::for_scope(self.codex_home().await, persistence.store_scope());
let review_lock_guard = match acquire_background_auto_review_lock(
&coordination,
format!("background_auto_review:{}", persistence.run_id()),
)
.await
{
record_background_review_status(
Arc::clone(self),
&displaced_running_review.persistence,
BackgroundAutoReviewStatus::Cancelled,
Some(AUTO_REVIEW_INTERRUPTED_ERROR_SUMMARY.to_string()),
Ok(Some(guard)) => guard,
Ok(None) => {
let error_summary =
"another background auto review is already running for this worktree"
.to_string();
self.clear_background_auto_review(generation).await;
self.record_skipped_background_auto_review(
&persistence,
generation,
&fingerprint,
error_summary,
)
.await;
debug!("background auto review skipped: review lock is held");
return;
}
Err(err) => {
let error_summary = format!("failed to acquire background auto review lock: {err}");
self.clear_background_auto_review(generation).await;
self.record_skipped_background_auto_review(
&persistence,
generation,
&fingerprint,
error_summary,
)
.await;
warn!(error = %err, "failed to acquire background auto review lock");
return;
}
};
if started_review
.running_review
.cancellation_token
.is_cancelled()
{
debug!("background auto review skipped after lock: review was cancelled");
return;
}
if self.input_queue.has_trigger_turn_mailbox_items().await
|| self.active_turn.lock().await.is_some()
{
let error_summary =
"foreground work became active before background auto review could start"
.to_string();
self.clear_background_auto_review(generation).await;
self.record_skipped_background_auto_review(
&persistence,
generation,
&fingerprint,
error_summary,
)
.await;
debug!("background auto review skipped after lock: foreground work active");
return;
}
spawn_detached_review_thread(
Arc::clone(self),
prepared,
started_review.running_review,
review_lock_guard,
generation,
);
}

pub(crate) async fn recover_background_auto_review_after_restart(&self) {
let (codex_home, scopes) = {
let state = self.state.lock().await;
let mut scopes = vec![state.session_configuration.cwd.clone()];
scopes.extend(
state
.session_configuration
.environments
.iter()
.map(|environment| environment.cwd.clone()),
);
(state.session_configuration.codex_home().clone(), scopes)
};
let mut seen_scopes = HashSet::new();
for cwd in scopes {
let scope =
get_git_repo_root(cwd.as_ref()).unwrap_or_else(|| cwd.as_ref().to_path_buf());
if !seen_scopes.insert(scope.clone()) {
continue;
}
let coordination = ReviewCoordination::for_scope(&codex_home, &scope);
if let Err(err) = coordination.clear_stale_lock_if_dead() {
warn!(error = %err, "failed to clear stale background auto review lock");
continue;
}
match coordination.read_lock_info() {
Ok(Some(_lock_info)) => continue,
Ok(None) => {}
Err(err) => {
warn!(error = %err, "failed to read background auto review lock");
continue;
}
}
let store = AutoReviewStore::for_scope(&codex_home, &scope);
if let Err(err) = store.reconcile_orphaned_in_flight(
std::iter::empty::<&str>(),
now_unix_timestamp_ms() / 1000,
) {
warn!(error = %err, "failed to reconcile durable background auto review runs");
}
}
}

async fn record_pending_background_auto_review(
&self,
generation: u64,
Expand Down Expand Up @@ -647,6 +755,22 @@ impl Session {
}
}

async fn acquire_background_auto_review_lock(
coordination: &ReviewCoordination,
intent: String,
) -> anyhow::Result<Option<ReviewLockGuard>> {
let deadline = tokio::time::Instant::now() + BACKGROUND_AUTO_REVIEW_LOCK_RETRY;
loop {
match coordination.try_acquire_lock(intent.clone())? {
Some(guard) => return Ok(Some(guard)),
None if tokio::time::Instant::now() < deadline => {
tokio::time::sleep(BACKGROUND_AUTO_REVIEW_LOCK_RETRY_INTERVAL).await;
}
None => return Ok(None),
}
}
}

async fn background_review_fingerprint(turn_context: &TurnContext) -> Option<String> {
let cwd = turn_context.environments.single_local_environment_cwd()?;
background_review_fingerprint_for_cwd(cwd).await
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/core/src/session/review.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use codex_auto_review::ReviewLockGuard;
use codex_core_skills::HostLoadedSkills;
use codex_protocol::openai_models::ToolMode;
use codex_protocol::protocol::BackgroundAutoReviewStatus;
Expand Down Expand Up @@ -284,6 +285,7 @@ pub(super) fn spawn_detached_review_thread(
sess: Arc<Session>,
prepared: PreparedReviewThread,
running_review: BackgroundAutoReviewRunningHandle,
review_lock_guard: ReviewLockGuard,
generation: u64,
) {
let turn_extension_data = Arc::clone(&prepared.turn_context.extension_data);
Expand All @@ -297,6 +299,7 @@ pub(super) fn spawn_detached_review_thread(
let cancellation_token = running_review.cancellation_token;
let completion = running_review.completion;
tokio::spawn(async move {
let _review_lock_guard = review_lock_guard;
task.run(session_ctx, turn_context, input, cancellation_token)
.await;
sess.clear_background_auto_review(generation).await;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/session/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ impl Session {
let mut guard = network_policy_decider_session.write().await;
*guard = Arc::downgrade(&sess);
}
sess.recover_background_auto_review_after_restart().await;
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = initial_history.get_event_msgs();
Expand Down
Loading
Loading