diff --git a/AGENTS.md b/AGENTS.md index 8dc9ac709..3f47048e1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -139,7 +139,7 @@ FRESHELL_RUN_REAL_PROVIDER_CONTRACTS=1 npm run test:vitest -- \ **Pane System:** Tabs contain pane layouts (tree structure of splits). Each pane owns its terminal lifecycle via `createRequestId` and `terminalId`. When splitting panes, each new pane gets its own `createRequestId`, ensuring independent backend terminals. Pane content types: `terminal` (with mode, shell, status) and `browser` (with URL, devtools state). -**Agent Status Indicators:** Blue/busy status is derived from provider activity slices through `resolvePaneActivity`; green/needs-attention and the idle sound flow through `recordTurnComplete` and `useTurnCompletionNotifications`. Terminal Codex turn-complete is server-authoritative via `terminal.turn.complete`, matching Claude and OpenCode. `freshopencode` runs on a shared long-lived `opencode serve` sidecar and uses server-pushed `session.idle`/`session.status` events to drive busy/green (no longer derived from subprocess closure). Gemini and Kimi terminal modes are status-inert until their CLIs expose a reliable turn-complete signal. +**Agent Status Indicators:** Blue/busy status is derived from provider activity slices through `resolvePaneActivity`; green/needs-attention and the idle sound flow through `recordTurnComplete` and `useTurnCompletionNotifications`. Turn-complete (green/sound) is server-authoritative everywhere: terminal CLIs via `terminal.turn.complete`, and fresh-agent panes (freshclaude/kilroy/freshcodex/freshopencode) via a discrete `freshAgent.turn.complete` edge emitted only on a positive completion — freshclaude/kilroy on the SDK `result` with `subtype === 'success'`, freshopencode on the success-only `emitStatus(idle)` path, and freshcodex on `turn/completed` only when `params.turn.status === 'completed'` (the notification also fires on interrupt). The client folds it in via `applyFreshAgentCompletion` using the `at`-monotonic dedupe regime (wall-clock `at`, no per-session counter, so a resumed durable session can't swallow completions across a server restart). The fragile client-side busy→idle derivation was removed; `useAgentSessionTurnCompletion` now only handles the waiting-for-approval edge. `freshopencode` still runs on a shared long-lived `opencode serve` sidecar and uses server-pushed `session.idle`/`session.status` events to drive busy. Gemini and Kimi terminal modes are status-inert until their CLIs expose a reliable turn-complete signal. **Fresh-Agent Orchestration:** The REST agent API (`/api/tabs`, `/api/panes/:id/split`, `/api/panes/:id/send-keys`, `/api/panes/:id/capture`, `/api/panes/:id/wait-for`) and the MCP `freshell` tool accept `agent`/`model`/`effort` parameters to create and drive fresh-agent panes (e.g. `agent=opencode`). The orchestration layer dispatches to the registered `FreshAgentRuntimeManager`, so the same external surface works for any fresh-agent provider. diff --git a/docs/superpowers/plans/2026-06-23-fresh-agent-server-authoritative-turn-complete.md b/docs/superpowers/plans/2026-06-23-fresh-agent-server-authoritative-turn-complete.md new file mode 100644 index 000000000..a51922812 --- /dev/null +++ b/docs/superpowers/plans/2026-06-23-fresh-agent-server-authoritative-turn-complete.md @@ -0,0 +1,208 @@ +# Server-authoritative fresh-agent turn completion + +## Problem + +Fresh-agent panes (freshclaude, kilroy, freshcodex, freshopencode) drove the +GREEN / needs-attention highlight and the idle chime by **re-deriving the +turn-complete edge on the client** — `useAgentSessionTurnCompletion` watched the +Redux busy *level* (`isFreshAgentBusy`) and fired `recordTurnComplete` on a +busy→idle transition. + +Differentiating a level to recover an edge is inherently fragile: it must observe +both sides, in order, exactly once, with no bounce. That produced the three +reported symptoms: + +- **Premature / flicker chimes** — a transient idle blip (snapshot clobber, + stream gap) read as a completed turn. +- **Missed chimes** — a fast turn whose busy onset was never observed, or a turn + that completed while the client was differentiating stale snapshot levels. +- **Wrong color** — green derived from a level that disagreed with the real + outcome (e.g. an interrupt looks just like a completion at the level). + +Terminal-mode CLIs do not have this problem: they use a **server-authoritative +discrete completion event** (`terminal.turn.complete`). Fresh-agent panes had no +server completion event at all. + +## Approach + +Give fresh-agent panes the same server-authoritative model: each provider adapter +emits a **discrete `freshAgent.turn.complete` edge only on a positive +completion**, and the client folds it into the existing GREEN/SOUND pipeline. +Delete the client-side busy→idle derivation. + +### Server emit points (positive completion only) + +Validated empirically against the real binaries before implementing: + +- **freshclaude / kilroy** (`server/sdk-bridge.ts`): the SDK `result` message with + `subtype === 'success'`. In streaming-input mode an interrupt sends a + `control_response` ACK and yields **no** result message; kill → `sdk.exit`; + stream error → `sdk.error` + `sdk.status idle`. So `subtype === 'success'` is a + clean, unambiguous positive edge. +- **freshopencode** (`server/fresh-agent/adapters/opencode/adapter.ts`): the + success-only `emitStatus(state, 'idle')` path after `await idle` resolves, gated on + per-session `turnAborted` **and** `turnErrored` flags. `onceIdle` resolves on *any* idle + — including the idle that an interrupt's abort triggers (it does **not** reject) and the + idle that follows an *errored* turn — and it never inspects the error. OpenCode surfaces a + failed turn out-of-band as a `session.error` SSE event (mapped to `sdk.error`) and then + goes idle, so the success path cannot infer success from "reached idle" alone. `interrupt()` + sets `turnAborted` before aborting, and `bindServeStream` sets `turnErrored` when it relays + an `sdk.error`; the send suppresses its chime if either is set when idle resolves. Each new + turn resets both flags, and a *failed* abort clears `turnAborted` again (the turn was not + actually stopped, so its genuine completion must still chime). This makes OpenCode's + positive-completion check the analogue of Claude's `subtype === 'success'` and Codex's + `status === 'completed'`. The catch path (sidecar loss / timeout) and the serve SSE idle + relay also never chime. `/compact` takes the same await-idle + emit path with the same + abort/error gate (it is a user-visible turn and chimed before the client busy→idle + derivation was removed; Claude `/compact` is a normal turn and already chimes). +- **freshcodex** (`server/fresh-agent/adapters/codex/adapter.ts`): the app-server + `turn/completed` notification. **Empirical finding:** `turn/completed` fires for + interrupts too, and carries the authoritative outcome inline at + `params.turn.status` (`'completed' | 'interrupted' | 'failed'`). We register + `onTurnCompleted` in `subscribe` and chime only on a positive completion — no extra + read-back round-trip. The authoritative status appears either inline at + `params.turn.status` (codex-cli 0.142.0, probed live) **or** flat at `params.status` + (the shape the app-server client tests model), so we read + `params.turn?.status ?? params.status` and require `=== 'completed'`; either shape is + detected and interrupts/failures at either location are excluded. The protocol schema + (`CodexTurnCompletedNotificationSchema`) declares both the inline `turn.status` and the + flat `status` contract. + +### Transport + +Adapters emit `sdk.turn.complete { sessionId, at }`; +`normalizeFreshAgentProviderEvent` maps it to `freshAgent.turn.complete`. It rides +the existing `freshAgent.event` envelope, so it inherits per-client authorization, +the subscription lifecycle, and the materialization locator remap for free (the +envelope re-stamps the real `sessionId`, so opencode's placeholder-id emit arrives +correctly keyed). + +### Client + +`handleFreshAgentTransportEvent` routes `freshAgent.turn.complete` to a new +`applyFreshAgentCompletion` thunk, which resolves the owning tab/pane from the +`provider:sessionId` session key and dispatches `recordTurnComplete`. The handler +**requires a finite numeric `at`** and drops (logs) a malformed event rather than +fabricating a client `Date.now()`: every server emit site stamps a monotonic `at`, so a +missing/non-numeric `at` is a contract violation, and a fabricated client timestamp could +collide with or regress against the server clock and swallow a real later completion. + +**Identity matching (the runtime-handle gotcha).** The server keys the event by the +runtime handle it subscribed with (`provider:content.sessionId`). For Claude/kilroy +that runtime handle is the bridge `nanoid`, which differs from the durable Claude +UUID persisted in `content.sessionRef` — and `resolveFreshAgentSessionKey` *prefers* +`sessionRef`. So `findFreshAgentPaneBySessionKey` matches the event against the +runtime handle **and** the resolved (sessionRef-preferred) key; matching only the +latter silently dropped every chime on restored Claude sessions. OpenCode and Codex +keep `content.sessionId === sessionRef.sessionId`, so they were unaffected (which is +why the original OpenCode-only test missed it). + +**Dedupe regime: `at`-monotonic (no `completionSeq`).** A wall-clock `at` avoids the +counter "restart-swallow": a per-session counter resets to 0 on restart while the +client's dedupe state survives, so a resumed *durable* fresh-agent session (same +`sessionId` after a deploy) would swallow real completions. Terminals dodge that by +getting a fresh `nanoid` terminalId on restart; fresh-agent sessions keep their durable +id, so the counter approach is unsafe here. The discrete edge is never re-derived from a +snapshot level, so a reconnect cannot re-green, and a replayed/stale event with an +older-or-equal `at` is dropped. + +Wall-clock `at` is *not* unconditionally monotonic across a restart, though: the +per-session clamp below can push `at` above real wall time (a large backward clock step +keeps `at` ahead until wall time recovers), and a fresh process then stamps a lower `at`. +To close that residual restart-swallow, the client **clears the per-terminal `at` +baselines on a real server restart** (`resetCompletionDedupeBaselines`, dispatched from +the bootId-change branch in `App.tsx`). This is safe precisely because the discrete edge +is never re-derived from a snapshot and a fresh process replays nothing — so there is no +stale completion to re-green, and the first genuine post-restart completion is accepted. +A plain reconnect (same `bootId`) keeps the baselines, preserving replay dedupe. + +**Server-side per-session monotonic clamp.** Raw `Date.now()` is not a reliable +per-turn identity: two genuine completions can land in the same millisecond, and the +system clock can step backwards (NTP correction) — both would make a real later +completion look `<= last` and be dropped as a replay, recreating the missed-chime +class. So each emit site clamps its session's `at` to be strictly greater than the +previous one via the shared `nextMonotonicTurnCompleteAt` helper. This guarantees +distinct turns never collide or regress *within a process*. It does **not** by itself +guarantee monotonicity across a restart (the clamp can push `at` ahead of real wall +time, and a fresh process may stamp a lower value) — the client-side baseline reset +described above closes that gap. + +The clamp state must live on **per-session** server state, never per-subscription: +the client store's dedupe survives a WS reconnect, but fresh-agent subscriptions are +torn down and recreated on reconnect. So `sdk-bridge` keeps it on `SdkSessionState`, +the opencode adapter on `OpencodeSessionState`, and the codex adapter on a per-thread +`lastTurnCompleteAtByThread` map (not the `subscribe()` closure) — otherwise a same-ms +or backward-clock completion right after a reconnect would be dropped for codex. + +### What was deleted + +`useAgentSessionTurnCompletion` no longer derives turn completion from the busy +level. It retains only the "waiting-for-approval" edge (a 0→≥1 pending +permission/question transition), which is a distinct attention concern. That edge +records under a **distinct dedupe namespace** (`provider:sessionId#waiting`): it is +stamped with the *client* clock, and for opencode/codex it would otherwise share the +server completion's `provider:sessionId` entry — letting an approval stamped ahead of +the server clock (common on a remote client) swallow the real completion via the +monotonic `at <= last` guard. + +## Spike findings (empirical, real binaries) + +- **Claude SDK** (`@anthropic-ai/claude-agent-sdk` 2.1.186): result subtype enum is + `success | error_during_execution | error_max_turns | error_max_budget_usd | + error_max_structured_output_retries`. Interrupt in streaming-input mode yields no + result. One result per user turn (no `parent_tool_use_id` on results). +- **Codex app-server** (codex-cli 0.142.0, probed live): `turn/completed` params + are `{ threadId, turn: { id, status, error, startedAt, completedAt, durationMs } }` + — status is inline. Interrupt produces `turn/completed` with + `turn.status === 'interrupted'` (no separate `turn/aborted`). The runtime exposes + `onExit` (not yet wired into the freshcodex adapter — a separate stuck-blue gap). +- **OpenCode**: single success-only completion point already existed; owns an + explicit `ses_` id, immune to ambiguous-ownership. + +## Test coverage + +- Unit: transport normalize; claude bridge (success emits, non-success does not); + opencode (exactly one on success, none on abort); codex adapter (only + `turn.status === 'completed'`, scoped to the subscribed thread); client thunk + + transport routing + `at`-monotonic dedupe; hook no longer fires on busy→idle. +- Unit (review follow-ups, round 1): client routes a Claude completion keyed by the + runtime handle when the pane carries a durable `sessionRef` (identity match); + `nextMonotonicTurnCompleteAt` clamps same-ms/backward-clock; each emit site + (claude/opencode/codex) stamps a strictly-increasing `at` across successive + same-millisecond completions. +- Unit (review follow-ups, round 2): opencode does not chime on an interrupt even + though `onceIdle` resolves, and resumes chiming on the next clean turn; codex chimes + on a flat `params.status` completion and skips a flat `interrupted`; the + waiting-for-approval edge does not swallow a later server completion (separate dedupe + namespace). +- Unit (review follow-ups, round 3): codex keeps the monotonic `at` clamp per thread + across a re-subscribe (WS reconnect), not per subscription. +- Unit (review follow-ups, round 4): opencode still chimes when an interrupt's abort + request fails and the turn then completes normally; `resetCompletionDedupeBaselines` + clears the per-terminal `at` baseline (so a lower post-restart `at` re-fires) while + preserving unacknowledged attention. +- Unit (review follow-ups, round 5): opencode `/compact` emits a server-authoritative + completion edge on success (it previously greened via the removed client busy→idle + derivation). +- Unit (review follow-ups, round 6): codex `shutdown()` clears the per-thread + turn-complete clock too (a reused-in-process adapter must not clamp a fresh + completion against a stale pre-shutdown timestamp), matching every other per-thread + map it already clears. Round 6 also corrected the `turn-complete-clock` helper + comment, which had overstated the clamp as guaranteeing cross-restart monotonicity + (it does not — the client baseline reset is what closes that gap). +- Unit (review follow-ups, round 7): opencode does **not** chime when a turn reports + `session.error` and then goes idle (false-success gap — onceIdle resolves on the + post-error idle without inspecting the error), and resumes chiming on the next clean + turn (the error flag resets per turn like the abort flag); the client drops a malformed + `freshAgent.turn.complete` without a finite numeric `at` instead of fabricating a client + `Date.now()`. +- e2e: WS `freshAgent.turn.complete` → `handleFreshAgentMessage` → + `applyFreshAgentCompletion` → `useTurnCompletionNotifications` chimes once + + highlights, ignores replays, re-chimes on the next real turn. + +## Deliberately out of scope (follow-ups) + +- freshcodex `onExit` self-heal for a crashed sidecar (stuck-blue gap). +- snapshot status-clobber / provider-agnostic `statusSeq` for BLUE correctness. +- Centralizing GREEN/BLUE render precedence across TabItem / PaneHeader / Sidebar. +- Making the waiting-for-approval edge server-authoritative too (delete the hook). diff --git a/server/coding-cli/codex-app-server/protocol.ts b/server/coding-cli/codex-app-server/protocol.ts index 817fd2946..2881ada97 100644 --- a/server/coding-cli/codex-app-server/protocol.ts +++ b/server/coding-cli/codex-app-server/protocol.ts @@ -400,6 +400,17 @@ export const CodexTurnCompletedNotificationSchema = z.object({ params: z.object({ threadId: z.string().min(1), turnId: z.string().min(1).optional(), + // The real app-server reports the authoritative outcome inline as the completed + // turn object (status 'completed' | 'interrupted' | 'failed'). turn/completed + // fires for interrupts too, so consumers must read turn.status to chime only on + // a positive completion rather than treating the bare notification as success. + turn: z.object({ + id: z.string().min(1).optional(), + status: CodexTurnStatusSchema.optional(), + }).passthrough().optional(), + // Some app-server forms report the outcome flat at params.status instead of inline + // under turn; consumers read params.turn?.status ?? params.status. + status: CodexTurnStatusSchema.optional(), }).passthrough(), }).passthrough() diff --git a/server/fresh-agent/adapters/codex/adapter.ts b/server/fresh-agent/adapters/codex/adapter.ts index be7f75315..a0c989795 100644 --- a/server/fresh-agent/adapters/codex/adapter.ts +++ b/server/fresh-agent/adapters/codex/adapter.ts @@ -25,6 +25,7 @@ import { parseCodexDisplayIdHandle, } from './normalize.js' import { normalizeFreshAgentEffort, normalizeFreshAgentModel } from '../../../../shared/fresh-agent-models.js' +import { nextMonotonicTurnCompleteAt } from '../../turn-complete-clock.js' type CodexThreadLifecycleEvent = | { @@ -65,6 +66,9 @@ type CodexRuntimePort = { interruptTurn?: (input: CodexTurnInterruptParams) => Promise shutdown?: () => Promise onThreadLifecycle?: (handler: (event: CodexThreadLifecycleEvent) => void) => () => void + onTurnCompleted?: ( + handler: (event: { threadId: string; turnId?: string; params: Record }) => void, + ) => () => void readThread: (input: { threadId: string; includeTurns?: boolean }) => Promise> listThreadTurns: (input: { threadId: string @@ -286,6 +290,9 @@ export function createCodexFreshAgentAdapter(deps: { } const displayIdSecret = deps.displayIdSecret const activeTurnByThread = new Map() + // Per-thread (not per-subscription) so the monotonic turn-complete clamp survives a WS + // reconnect, matching how Claude/OpenCode keep it on session state. + const lastTurnCompleteAtByThread = new Map() const settingsByThread = new Map>() const runtimeByThread = new Map() const threadIdsByRuntime = new Map>() @@ -665,6 +672,7 @@ export function createCodexFreshAgentAdapter(deps: { const clearThreadState = (threadId: string) => { activeTurnByThread.delete(threadId) + lastTurnCompleteAtByThread.delete(threadId) settingsByThread.delete(threadId) modelByTurnByThread.delete(threadId) submittedInputsByThread.delete(threadId) @@ -867,7 +875,7 @@ export function createCodexFreshAgentAdapter(deps: { if (!runtime.onThreadLifecycle) { throw new Error('Codex app-server runtime does not support thread lifecycle subscriptions.') } - return runtime.onThreadLifecycle((event) => { + const offLifecycle = runtime.onThreadLifecycle((event) => { if (event.kind === 'thread_started') { if (event.thread.id !== sessionId) return listener(makeCodexStatusEvent(sessionId, event.thread.status, event.thread.updatedAt)) @@ -891,6 +899,29 @@ export function createCodexFreshAgentAdapter(deps: { } listener(makeCodexStatusEvent(sessionId, event.status)) }) + + // Server-authoritative turn-complete edge for the GREEN/SOUND pipeline. The + // app-server fires turn/completed for interrupts too, carrying the authoritative + // outcome inline at params.turn.status, so we chime only for a positive + // completion ('completed') and never on interrupt/failure. + const offTurnCompleted = runtime.onTurnCompleted?.((event) => { + if (event.threadId !== sessionId) return + // turn/completed fires for interrupts/failures too, so chime only on a positive + // completion. The authoritative status appears either inline at params.turn.status + // (codex-cli 0.142.0, probed live) or flat at params.status (the shape the + // app-server client tests model); accept either so neither version silently fails. + const params = event.params as { status?: unknown; turn?: { status?: unknown } } | undefined + const status = params?.turn?.status ?? params?.status + if (status !== 'completed') return + const at = nextMonotonicTurnCompleteAt(lastTurnCompleteAtByThread.get(sessionId), Date.now()) + lastTurnCompleteAtByThread.set(sessionId, at) + listener({ type: 'sdk.turn.complete', sessionId, at }) + }) + + return () => { + offLifecycle() + offTurnCompleted?.() + } }, async send(sessionId, input) { @@ -1174,6 +1205,7 @@ export function createCodexFreshAgentAdapter(deps: { displayCursorByHandle.clear() submittedInputsByThread.clear() submittedAliasByThread.clear() + lastTurnCompleteAtByThread.clear() await Promise.all(runtimes.map((runtime) => runtime.shutdown?.())) }, } diff --git a/server/fresh-agent/adapters/opencode/adapter.ts b/server/fresh-agent/adapters/opencode/adapter.ts index 1e5edf170..15a5d4307 100644 --- a/server/fresh-agent/adapters/opencode/adapter.ts +++ b/server/fresh-agent/adapters/opencode/adapter.ts @@ -8,6 +8,7 @@ import type { FreshAgentThreadLocator, } from '../../runtime-adapter.js' import { FreshAgentLostSessionError } from '../../runtime-manager.js' +import { nextMonotonicTurnCompleteAt } from '../../turn-complete-clock.js' import { normalizeFreshAgentEffort, normalizeFreshAgentModel } from '../../../../shared/fresh-agent-models.js' import { logger } from '../../../logger.js' import { defaultOpencodeDataHome } from '../../../coding-cli/providers/opencode.js' @@ -45,6 +46,18 @@ type OpencodeSessionState = { events: EventEmitter sendQueue: Promise unsubscribeServe?: () => void + /** Last emitted turn-complete `at`, kept per session so the edge stays strictly monotonic. */ + lastTurnCompleteAt?: number + /** Set by interrupt() so the in-flight send suppresses its chime when idle resolves. */ + turnAborted?: boolean + /** + * Set when the serve stream relays a `session.error` during the in-flight turn, so the + * success path suppresses its chime. onceIdle resolves on the idle that follows an + * errored turn without inspecting the error, so a positive completion must independently + * confirm the turn did not error — the OpenCode analogue of Claude's `subtype === 'success'` + * and Codex's `status === 'completed'`. + */ + turnErrored?: boolean } type CreateOpencodeFreshAgentAdapterOptions = { @@ -208,16 +221,34 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen async function compactForState(state: OpencodeSessionState, input?: { instructions?: string }): Promise { if (!state.realSessionId) return await ensureMutableRoute(state) + const realId = state.realSessionId const route = cwdRoute(state.cwd) - if (route) { - await serveManager.compact(state.realSessionId, input, route) - return - } - if (input) { - await serveManager.compact(state.realSessionId, input) - return + // Compact is a user-visible turn: it must green/chime on completion like a send. Set up + // the idle waiter before issuing the request so we don't miss the idle, and gate the + // chime on turnAborted/turnErrored so an interrupt or error during compact does not + // falsely complete. + state.turnAborted = false + state.turnErrored = false + emitStatus(state, 'running') + const idle = route + ? serveManager.onceIdle(realId, turnTimeoutMs, route) + : serveManager.onceIdle(realId, turnTimeoutMs) + void idle.catch(() => {}) + try { + if (route) await serveManager.compact(realId, input, route) + else if (input) await serveManager.compact(realId, input) + else await serveManager.compact(realId) + await idle + emitStatus(state, 'idle') + if (!state.turnAborted && !state.turnErrored) { + const completionAt = nextMonotonicTurnCompleteAt(state.lastTurnCompleteAt, Date.now()) + state.lastTurnCompleteAt = completionAt + state.events.emit('event', { type: 'sdk.turn.complete', sessionId: state.placeholderId, at: completionAt }) + } + } catch (error) { + emitStatus(state, 'idle') + throw error } - await serveManager.compact(state.realSessionId) } async function forkForState(state: OpencodeSessionState): Promise<{ id: string; directory?: string }> { @@ -239,6 +270,11 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen state.unsubscribeServe = serveManager.subscribe(state.realSessionId, (parsed) => { const mapped = serveEventToSdk(parsed, state.placeholderId) if (mapped) { + if (mapped.type === 'sdk.error') { + // A turn error means the in-flight turn did not positively complete; the + // success path consults this when onceIdle later resolves on the post-error idle. + state.turnErrored = true + } if (mapped.type === 'sdk.session.snapshot') { const status: 'running' | 'idle' = mapped.status === 'idle' ? 'idle' : 'running' state.status = status @@ -288,6 +324,10 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen const effort = normalized?.effort ?? state.effort const effectiveCwd = normalized?.cwd ?? state.cwd + // A fresh turn starts un-aborted and un-errored; interrupt() flips turnAborted while we + // are parked on idle, and the serve stream flips turnErrored if the turn reports an error. + state.turnAborted = false + state.turnErrored = false emitStatus(state, 'running') try { if (!state.realSessionId) { @@ -324,6 +364,16 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen state.model = modelStr ?? state.model state.effort = effort emitStatus(state, 'idle') + // Server-authoritative turn-complete edge for the GREEN/SOUND pipeline. onceIdle + // resolves on ANY idle — including the idle an interrupt's abort triggers or the idle + // that follows an errored turn — so a positive completion requires that the turn was + // neither interrupted nor errored. (The catch below for abort/interrupt/sidecar loss + // and the serve SSE idle relay also never chime.) + if (!state.turnAborted && !state.turnErrored) { + const completionAt = nextMonotonicTurnCompleteAt(state.lastTurnCompleteAt, Date.now()) + state.lastTurnCompleteAt = completionAt + state.events.emit('event', { type: 'sdk.turn.complete', sessionId: state.placeholderId, at: completionAt }) + } return sendResult(state.realSessionId) } catch (error) { emitStatus(state, 'idle') @@ -461,7 +511,17 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen async interrupt(sessionId) { const state = requireState(sessionId) - await abortForState(state) + // Mark before aborting so the in-flight send (parked on onceIdle) sees the abort and + // suppresses its turn-complete chime when the abort-triggered idle resolves it. + state.turnAborted = true + try { + await abortForState(state) + } catch (error) { + // The abort never landed, so the turn may still complete normally — clear the flag + // so a genuine completion is not silently swallowed. + state.turnAborted = false + throw error + } emitStatus(state, 'idle') }, diff --git a/server/fresh-agent/sdk-events.ts b/server/fresh-agent/sdk-events.ts index d8cff9129..4037f6f66 100644 --- a/server/fresh-agent/sdk-events.ts +++ b/server/fresh-agent/sdk-events.ts @@ -32,6 +32,7 @@ export type FreshAgentProviderEvent = | { type: 'freshAgent.permission.cancelled'; sessionId: string; requestId: string } | { type: 'freshAgent.question.request'; sessionId: string; requestId: string; questions: Array<{ question: string; header: string; options: Array<{ label: string; description: string }>; multiSelect: boolean }> } | { type: 'freshAgent.status'; sessionId: string; status: SdkSessionStatus } + | { type: 'freshAgent.turn.complete'; sessionId: string; at: number } | { type: 'freshAgent.error'; sessionId: string; message: string; code?: string } | { type: 'freshAgent.exit'; sessionId: string; exitCode?: number } | { type: 'freshAgent.killed'; sessionId: string; success?: boolean } @@ -66,6 +67,8 @@ export function normalizeFreshAgentProviderEvent(event: unknown): unknown { return { ...providerEvent, type: 'freshAgent.question.request' } as FreshAgentProviderEvent case 'sdk.status': return { ...providerEvent, type: 'freshAgent.status' } as FreshAgentProviderEvent + case 'sdk.turn.complete': + return { ...providerEvent, type: 'freshAgent.turn.complete' } as FreshAgentProviderEvent case 'sdk.error': return { ...providerEvent, type: 'freshAgent.error' } as FreshAgentProviderEvent case 'sdk.exit': diff --git a/server/fresh-agent/turn-complete-clock.ts b/server/fresh-agent/turn-complete-clock.ts new file mode 100644 index 000000000..7739f9054 --- /dev/null +++ b/server/fresh-agent/turn-complete-clock.ts @@ -0,0 +1,21 @@ +/** + * Returns a per-session strictly-monotonic turn-complete timestamp. + * + * Fresh-agent turn completions are deduped on the client by the wall-clock `at`, + * chosen over a per-session counter because a counter resets to zero on a server + * restart and would swallow completions for a resumed durable session. Raw + * `Date.now()` is not a reliable per-turn identity, though: two genuine + * completions can land in the same millisecond, and the system clock can step + * backwards (NTP correction). Either case would make a real later completion look + * `<= last` and be dropped as a stale replay. + * + * Clamping each session's `at` to be strictly greater than its previous one + * guarantees distinct turns never collide or regress *within a process*. It does + * NOT by itself guarantee monotonicity across a restart — the clamp can push `at` + * ahead of real wall time, and a fresh process may then stamp a lower value. That + * residual gap is closed on the client, which clears its per-terminal `at` + * baselines on a real server restart (`resetCompletionDedupeBaselines`). + */ +export function nextMonotonicTurnCompleteAt(lastAt: number | undefined, now: number): number { + return lastAt !== undefined && now <= lastAt ? lastAt + 1 : now +} diff --git a/server/sdk-bridge-types.ts b/server/sdk-bridge-types.ts index dd354f493..986650cc2 100644 --- a/server/sdk-bridge-types.ts +++ b/server/sdk-bridge-types.ts @@ -75,6 +75,7 @@ export type SdkServerMessage = | { type: 'sdk.permission.request'; sessionId: string; requestId: string; subtype: string; tool?: { name: string; input?: Record }; toolUseID?: string; suggestions?: unknown[]; blockedPath?: string; decisionReason?: string } | { type: 'sdk.permission.cancelled'; sessionId: string; requestId: string } | { type: 'sdk.status'; sessionId: string; status: SdkSessionStatus } + | { type: 'sdk.turn.complete'; sessionId: string; at: number } | { type: 'sdk.error'; sessionId: string; message: string; code?: string } | { type: 'sdk.exit'; sessionId: string; exitCode?: number } | { type: 'sdk.killed'; sessionId: string; success: boolean } @@ -125,6 +126,8 @@ export interface SdkSessionState { costUsd: number totalInputTokens: number totalOutputTokens: number + /** Last emitted turn-complete `at`, kept per session so the edge stays strictly monotonic. */ + lastTurnCompleteAt?: number } export interface SdkReplayState { diff --git a/server/sdk-bridge.ts b/server/sdk-bridge.ts index 713ea8157..f8153acb4 100644 --- a/server/sdk-bridge.ts +++ b/server/sdk-bridge.ts @@ -17,6 +17,7 @@ import { buildMcpServerCommandArgs } from './mcp/config-writer.js' import { sanitizeFreshAgentPluginPaths } from '../shared/fresh-agent-plugins.js' import { logger } from './logger.js' import { synthesizeClaudeFreshAgentLiveMessageId } from './fresh-agent/history/claude/history-ledger.js' +import { nextMonotonicTurnCompleteAt } from './fresh-agent/turn-complete-clock.js' import type { ClaudeFreshAgentHistorySource } from './fresh-agent/history/claude/history-source.js' import type { SdkSessionState, @@ -461,6 +462,19 @@ export class SdkBridge extends EventEmitter { costUsd: rMsg.total_cost_usd, usage, }) + // Server-authoritative turn-complete edge for the GREEN/SOUND pipeline. + // Only a positively-completed turn ('success') chimes; interrupts yield no + // result message at all, and tool-only/error turns surface a non-success + // subtype, so this never fires green on an aborted or errored turn. + if (rMsg.subtype === 'success') { + const at = nextMonotonicTurnCompleteAt(state.lastTurnCompleteAt, Date.now()) + state.lastTurnCompleteAt = at + this.broadcastToSession(sessionId, { + type: 'sdk.turn.complete', + sessionId, + at, + }) + } break } diff --git a/src/App.tsx b/src/App.tsx index 80cc794c8..a49f31ec6 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -1,6 +1,7 @@ import { lazy, Suspense, useCallback, useEffect, useRef, useState, type TouchEvent as ReactTouchEvent } from 'react' import { useAppDispatch, useAppSelector, useAppStore } from '@/store/hooks' import { setStatus, setError, setErrorCode, setServerInstanceId, setBootId, setServerRestarted, setLiveTerminalIds, setPlatform, setAvailableClis, setFeatureFlags } from '@/store/connectionSlice' +import { resetCompletionDedupeBaselines } from '@/store/turnCompletionSlice' import { setLocalSettings, setServerSettings } from '@/store/settingsSlice' import { markWsSnapshotReceived, @@ -907,6 +908,10 @@ export default function App() { dispatch(setServerRestarted(serverRestarted)) if (serverRestarted) { dispatch(setLiveTerminalIds([])) + // The fresh process replays nothing and may stamp a lower wall-clock `at` than + // a clamp-inflated pre-restart value; drop the per-terminal `at` baselines so a + // resumed durable session's next real completion is not swallowed as a replay. + dispatch(resetCompletionDedupeBaselines()) } dispatch(resetWsSnapshotReceived()) // If App registered late and missed a prior invalidation, a fresh HTTP baseline diff --git a/src/hooks/useAgentSessionTurnCompletion.ts b/src/hooks/useAgentSessionTurnCompletion.ts index 00a0e12fb..e884f3ea0 100644 --- a/src/hooks/useAgentSessionTurnCompletion.ts +++ b/src/hooks/useAgentSessionTurnCompletion.ts @@ -2,16 +2,13 @@ import { useEffect, useRef } from 'react' import { makeFreshAgentSessionKey } from '@shared/fresh-agent' import { useAppDispatch, useAppSelector } from '@/store/hooks' import { collectPaneEntries } from '@/lib/pane-utils' -import { - isFreshAgentBusy, - resolveFreshAgentSessionKey, -} from '@/lib/pane-activity' +import { resolveFreshAgentSessionKey } from '@/lib/pane-activity' import { recordTurnComplete } from '@/store/turnCompletionSlice' import type { FreshAgentSessionState } from '@/store/freshAgentTypes' const EMPTY_FRESH_AGENT_SESSIONS: Record = {} -type SessionEdgeState = { wasBusy: boolean; hadPending: boolean } +type SessionEdgeState = { hadPending: boolean } function hasWaitingItems(session: FreshAgentSessionState | undefined): boolean { if (!session) return false @@ -20,17 +17,22 @@ function hasWaitingItems(session: FreshAgentSessionState | undefined): boolean { } /** - * Bridges SDK-driven fresh-agent panes into the existing - * GREEN/SOUND pipeline. Watches each SDK pane's busy/pending edges and fires - * recordTurnComplete on: - * - a real busy -> idle transition (turn complete), and - * - a 0 -> >=1 pending permission/question transition (waiting-for-approval). + * Bridges the fresh-agent "waiting-for-approval" edge into the GREEN/SOUND pipeline. + * Fires recordTurnComplete on a 0 -> >=1 pending permission/question transition. * - * The synthetic terminalId is the pane's `provider:sessionId` session key, which - * recordTurnComplete only uses as a dedupe key (markTab/PaneAttention key on - * tabId/paneId). Never fires on the FIRST observation of a session, so tab - * restore / snapshot hydration of an already-idle or already-pending session - * does not produce a spurious green/sound. + * Turn COMPLETION (a finished turn) is no longer derived here: it is + * server-authoritative via the discrete freshAgent.turn.complete event + * (applyFreshAgentCompletion). Differentiating the client-side busy level to recover + * a completion edge was the source of premature (flicker), missed (fast-turn), and + * stale-color chimes, so that path is intentionally gone. + * + * The synthetic terminalId is the pane's `provider:sessionId` session key with a + * `#waiting` suffix, which recordTurnComplete only uses as a dedupe key + * (markTab/PaneAttention key on tabId/paneId). The suffix keeps this client-clock edge + * in a separate dedupe namespace from the server turn-complete (`provider:sessionId`), + * so an approval can never suppress a real completion via the monotonic `at` guard. + * Never fires on the FIRST observation of a session, so tab restore / snapshot hydration + * of an already-pending session does not produce a spurious green/sound. */ export function useAgentSessionTurnCompletion(): void { const dispatch = useAppDispatch() @@ -46,24 +48,17 @@ export function useAgentSessionTurnCompletion(): void { if (!layout) continue for (const entry of collectPaneEntries(layout)) { const content = entry.content - let sessionKey: string | undefined - let isBusy = false - let hasPending = false + if (content.kind !== 'fresh-agent') continue - if (content.kind === 'fresh-agent') { - const session = content.sessionId - ? freshAgentSessions[makeFreshAgentSessionKey({ - sessionType: content.sessionType, - provider: content.provider, - sessionId: content.sessionId, - })] - : undefined - sessionKey = resolveFreshAgentSessionKey(content, session) - isBusy = isFreshAgentBusy(content, session) - hasPending = hasWaitingItems(session) - } else { - continue - } + const session = content.sessionId + ? freshAgentSessions[makeFreshAgentSessionKey({ + sessionType: content.sessionType, + provider: content.provider, + sessionId: content.sessionId, + })] + : undefined + const sessionKey = resolveFreshAgentSessionKey(content, session) + const hasPending = hasWaitingItems(session) if (!sessionKey) continue seen.add(sessionKey) @@ -71,8 +66,8 @@ export function useAgentSessionTurnCompletion(): void { const prev = prevRef.current.get(sessionKey) if (prev === undefined) { // First observation: initialize without firing (avoids spurious green on - // restore / snapshot hydration of an already-finished or pending session). - prevRef.current.set(sessionKey, { wasBusy: isBusy, hadPending: hasPending }) + // restore / snapshot hydration of an already-pending session). + prevRef.current.set(sessionKey, { hadPending: hasPending }) continue } @@ -81,20 +76,16 @@ export function useAgentSessionTurnCompletion(): void { dispatch(recordTurnComplete({ tabId, paneId: entry.paneId, - terminalId: sessionKey, - at: Date.now(), - })) - } else if (prev.wasBusy && !isBusy && !hasPending) { - // Turn complete: an observed busy -> idle with nothing pending. - dispatch(recordTurnComplete({ - tabId, - paneId: entry.paneId, - terminalId: sessionKey, + // Distinct dedupe namespace from the server turn-complete (whose terminalId is + // `provider:sessionId`). This edge uses the CLIENT clock; mixing it into the + // server-completion entry would let an approval stamped ahead of the server + // clock (common on a remote client) swallow the real completion as `at <= last`. + terminalId: `${sessionKey}#waiting`, at: Date.now(), })) } - prevRef.current.set(sessionKey, { wasBusy: isBusy, hadPending: hasPending }) + prevRef.current.set(sessionKey, { hadPending: hasPending }) } } diff --git a/src/lib/fresh-agent-ws.ts b/src/lib/fresh-agent-ws.ts index 15d2519ec..1d5cb881f 100644 --- a/src/lib/fresh-agent-ws.ts +++ b/src/lib/fresh-agent-ws.ts @@ -1,9 +1,11 @@ import type { AppDispatch } from '@/store/store' import type { FreshAgentRuntimeProvider, FreshAgentSessionType } from '@shared/fresh-agent' import type { SessionRef } from '@shared/session-contract' +import { createLogger } from '@/lib/client-logger' import { consumeCancelledCreate, consumeCreateRoute, rememberCreateRoute } from '@/lib/create-cancellation' import { flushPersistedLayoutNow } from '@/store/persistControl' import { materializeFreshAgentSession as materializeFreshAgentPaneSession } from '@/store/panesSlice' +import { applyFreshAgentCompletion } from '@/store/turnCompletionThunks' import { addAssistantMessage, addPermissionRequest, @@ -27,6 +29,8 @@ import { turnResult, } from '@/store/freshAgentSlice' +const log = createLogger('fresh-agent-ws') + type FreshAgentCreatedMessage = { type: 'freshAgent.created' requestId: string @@ -226,6 +230,21 @@ export function handleFreshAgentTransportEvent(dispatch: AppDispatch, msg: Fresh status: event.status as never, })) return true + case 'freshAgent.turn.complete': { + // The server always stamps a monotonic numeric `at`. Drop a malformed event rather + // than fabricating a client `Date.now()`, which could collide with or regress against + // the server clock and swallow a real later completion (or spuriously green). + if (typeof event.at !== 'number' || !Number.isFinite(event.at)) { + log.warn('dropping malformed freshAgent.turn.complete without a numeric at', { sessionId, at: event.at }) + return true + } + dispatch(applyFreshAgentCompletion({ + provider: locator.provider, + sessionId, + at: event.at, + })) + return true + } case 'freshAgent.assistant': dispatch(addAssistantMessage({ ...locator, diff --git a/src/store/turnCompletionSlice.ts b/src/store/turnCompletionSlice.ts index 5848b04c2..294034c1e 100644 --- a/src/store/turnCompletionSlice.ts +++ b/src/store/turnCompletionSlice.ts @@ -103,6 +103,13 @@ const turnCompletionSlice = createSlice({ seq: state.seq, }) }, + // Cleared on a real server restart (not a plain reconnect). The new process has no + // buffered events to replay, and its wall clock may be behind a clamp-inflated + // pre-restart `at`, so dropping the per-terminal `at` baseline lets the first genuine + // post-restart completion through instead of swallowing it as a stale replay. + resetCompletionDedupeBaselines(state) { + state.lastAtByTerminalId = {} + }, consumeTurnCompleteEvents(state, action: PayloadAction<{ throughSeq: number }>) { const { throughSeq } = action.payload if (throughSeq <= 0) return @@ -129,6 +136,7 @@ const turnCompletionSlice = createSlice({ export const { recordTurnComplete, + resetCompletionDedupeBaselines, consumeTurnCompleteEvents, markTabAttention, clearTabAttention, diff --git a/src/store/turnCompletionThunks.ts b/src/store/turnCompletionThunks.ts index 87a4127da..2dfcb5eea 100644 --- a/src/store/turnCompletionThunks.ts +++ b/src/store/turnCompletionThunks.ts @@ -1,4 +1,8 @@ import type { TerminalTurnCompleteMessage } from '@shared/ws-protocol' +import { makeFreshAgentSessionKey } from '@shared/fresh-agent' +import { collectPaneEntries } from '@/lib/pane-utils' +import { resolveFreshAgentSessionKey } from '@/lib/pane-activity' +import type { FreshAgentPaneContent, PaneNode } from './paneTypes' import { selectTabPaneByTerminalId } from './selectors/paneTerminalSelectors' import { recordTurnComplete } from './turnCompletionSlice' import type { AppDispatch, RootState } from './store' @@ -28,3 +32,81 @@ export function applyServerCompletion(payload: ApplyServerCompletionPayload) { })) } } + +export type ApplyFreshAgentCompletionPayload = { + provider: string + sessionId: string + at: number +} + +/** + * Server-authoritative fresh-agent turn completion. The provider adapters emit a + * discrete turn-complete edge ONLY on a positive completion, so the client no longer + * derives green/sound from the busy level. We resolve the owning tab/pane from the + * `provider:sessionId` session key and fold the event into the GREEN/SOUND pipeline + * via the `at`-monotonic dedupe regime (no completionSeq). The discrete edge is never + * replayed from a snapshot, so a reconnect cannot re-green, and a stale/older `at` is + * dropped. Across a real server restart the client clears the per-terminal `at` + * baselines (resetCompletionDedupeBaselines), so a resumed durable session whose fresh + * process stamps a lower wall-clock `at` is not swallowed. + */ +export function applyFreshAgentCompletion(payload: ApplyFreshAgentCompletionPayload) { + return (dispatch: AppDispatch, getState: () => RootState): void => { + const state = getState() + const sessionKey = `${payload.provider}:${payload.sessionId}` + const location = findFreshAgentPaneBySessionKey(state, sessionKey) + if (!location) return + + dispatch(recordTurnComplete({ + tabId: location.tabId, + paneId: location.paneId, + terminalId: sessionKey, + at: payload.at, + })) + } +} + +function findFreshAgentPaneBySessionKey( + state: RootState, + sessionKey: string, +): { tabId: string; paneId: string } | undefined { + const layouts = state.panes?.layouts + if (!layouts) return undefined + const sessions = state.freshAgent?.sessions ?? {} + const activeTabId = state.tabs?.activeTabId + + const scan = (tabId: string, layout: PaneNode): { tabId: string; paneId: string } | undefined => { + for (const entry of collectPaneEntries(layout)) { + if (entry.content.kind !== 'fresh-agent') continue + const content = entry.content as FreshAgentPaneContent + const session = content.sessionId + ? sessions[makeFreshAgentSessionKey({ + sessionType: content.sessionType, + provider: content.provider, + sessionId: content.sessionId, + })] + : undefined + // The server keys the completion event by the runtime handle it subscribed with + // (`provider:content.sessionId`). For Claude/kilroy that runtime handle differs + // from the durable Claude UUID carried in content.sessionRef, which + // resolveFreshAgentSessionKey prefers — so we must match the runtime handle too, + // or restored Claude sessions would silently drop every chime. + const runtimeKey = content.sessionId ? `${content.provider}:${content.sessionId}` : undefined + if (runtimeKey === sessionKey || resolveFreshAgentSessionKey(content, session) === sessionKey) { + return { tabId, paneId: entry.paneId } + } + } + return undefined + } + + if (activeTabId && layouts[activeTabId]) { + const hit = scan(activeTabId, layouts[activeTabId]) + if (hit) return hit + } + for (const [tabId, layout] of Object.entries(layouts)) { + if (tabId === activeTabId || !layout) continue + const hit = scan(tabId, layout) + if (hit) return hit + } + return undefined +} diff --git a/test/e2e/fresh-agent-turn-complete-notification.test.tsx b/test/e2e/fresh-agent-turn-complete-notification.test.tsx new file mode 100644 index 000000000..f331be4fc --- /dev/null +++ b/test/e2e/fresh-agent-turn-complete-notification.test.tsx @@ -0,0 +1,157 @@ +import { useEffect } from 'react' +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' +import { render, cleanup, waitFor, act } from '@testing-library/react' +import { Provider } from 'react-redux' +import { configureStore } from '@reduxjs/toolkit' +import { useAppDispatch } from '@/store/hooks' +import { useTurnCompletionNotifications } from '@/hooks/useTurnCompletionNotifications' +import tabsReducer from '@/store/tabsSlice' +import panesReducer from '@/store/panesSlice' +import settingsReducer, { defaultSettings } from '@/store/settingsSlice' +import connectionReducer from '@/store/connectionSlice' +import turnCompletionReducer from '@/store/turnCompletionSlice' +import freshAgentReducer, { setSessionStatus } from '@/store/freshAgentSlice' +import { handleFreshAgentMessage } from '@/lib/fresh-agent-ws' +import type { PaneNode } from '@/store/paneTypes' +import type { Tab } from '@/store/types' + +const playSound = vi.hoisted(() => vi.fn()) + +const wsMocks = vi.hoisted(() => { + const messageHandlers = new Set<(msg: any) => void>() + return { + onMessage: vi.fn((cb: (msg: any) => void) => { + messageHandlers.add(cb) + return () => messageHandlers.delete(cb) + }), + resetHandlers: () => messageHandlers.clear(), + emitMessage: (msg: any) => { + for (const cb of messageHandlers) cb(msg) + }, + } +}) + +vi.mock('@/hooks/useNotificationSound', () => ({ + useNotificationSound: () => ({ play: playSound }), +})) + +vi.mock('@/lib/ws-client', () => ({ + getWsClient: () => ({ + send: vi.fn(), + connect: vi.fn().mockResolvedValue(undefined), + onMessage: wsMocks.onMessage, + onReconnect: vi.fn(() => () => {}), + }), +})) + +const SESSION_ID = 'ses_real_1' + +function turnComplete(at: number) { + wsMocks.emitMessage({ + type: 'freshAgent.event', + sessionId: SESSION_ID, + sessionType: 'freshopencode', + provider: 'opencode', + event: { type: 'freshAgent.turn.complete', sessionId: SESSION_ID, at }, + }) +} + +function Harness() { + const dispatch = useAppDispatch() + useTurnCompletionNotifications() + useEffect(() => { + return wsMocks.onMessage((msg: any) => { + if (typeof msg?.type === 'string' && msg.type.startsWith('freshAgent')) { + handleFreshAgentMessage(dispatch, msg) + } + }) + }, [dispatch]) + return null +} + +function createStore() { + const foregroundTab: Tab = { + id: 'tab-1', createRequestId: 'req-1', title: 'Foreground', status: 'running', + mode: 'shell', shell: 'system', terminalId: 'term-1', createdAt: 1, + } + const agentTab: Tab = { + id: 'tab-2', createRequestId: 'req-2', title: 'Agent', status: 'running', + mode: 'shell', shell: 'system', createdAt: 1, + } + const agentLeaf: PaneNode = { + type: 'leaf', + id: 'pane-2', + content: { + kind: 'fresh-agent', + createRequestId: 'req-2', + sessionType: 'freshopencode', + provider: 'opencode', + sessionId: SESSION_ID, + sessionRef: { provider: 'opencode', sessionId: SESSION_ID }, + } as never, + } + const layouts: Record = { + 'tab-1': { type: 'leaf', id: 'pane-1', content: { kind: 'terminal', createRequestId: 'req-1', status: 'running', mode: 'shell', shell: 'system', terminalId: 'term-1', initialCwd: '/tmp' } as never }, + 'tab-2': agentLeaf, + } + return configureStore({ + reducer: { + tabs: tabsReducer, + panes: panesReducer, + settings: settingsReducer, + connection: connectionReducer, + turnCompletion: turnCompletionReducer, + freshAgent: freshAgentReducer, + }, + preloadedState: { + tabs: { tabs: [foregroundTab, agentTab], activeTabId: 'tab-1', renameRequestTabId: null }, + panes: { layouts, activePane: { 'tab-1': 'pane-1', 'tab-2': 'pane-2' }, paneTitles: {} }, + settings: { settings: { ...defaultSettings }, loaded: true }, + connection: { status: 'ready' as const, error: null }, + turnCompletion: { seq: 0, lastAtByTerminalId: {}, pendingEvents: [], attentionByTab: {}, attentionByPane: {} }, + }, + }) +} + +describe('fresh-agent server-authoritative turn completion (e2e notification flow)', () => { + const originalHidden = Object.getOwnPropertyDescriptor(document, 'hidden') + const originalHasFocus = Object.getOwnPropertyDescriptor(document, 'hasFocus') + + beforeEach(() => { + playSound.mockClear() + wsMocks.resetHandlers() + // Window focused, but the agent tab (tab-2) is in the background, so a completion + // there should both chime and highlight. + Object.defineProperty(document, 'hidden', { configurable: true, get: () => false }) + Object.defineProperty(document, 'hasFocus', { configurable: true, value: () => true }) + }) + + afterEach(() => { + cleanup() + if (originalHidden) Object.defineProperty(document, 'hidden', originalHidden) + if (originalHasFocus) Object.defineProperty(document, 'hasFocus', originalHasFocus) + }) + + it('chimes once and highlights the agent tab on a server-pushed completion, and ignores replays', async () => { + const store = createStore() + store.dispatch(setSessionStatus({ sessionId: SESSION_ID, sessionType: 'freshopencode', provider: 'opencode', status: 'idle' })) + + render() + await waitFor(() => expect(wsMocks.onMessage).toHaveBeenCalled()) + + act(() => { turnComplete(1000) }) + + await waitFor(() => expect(playSound).toHaveBeenCalledTimes(1)) + expect(store.getState().turnCompletion.attentionByTab['tab-2']).toBe(true) + expect(store.getState().turnCompletion.attentionByPane['pane-2']).toBe(true) + + // A replayed/stale completion (reconnect, same or older timestamp) must not re-chime. + act(() => { turnComplete(1000) }) + act(() => { turnComplete(500) }) + expect(playSound).toHaveBeenCalledTimes(1) + + // The next real turn (strictly newer) chimes again. + act(() => { turnComplete(2000) }) + await waitFor(() => expect(playSound).toHaveBeenCalledTimes(2)) + }) +}) diff --git a/test/unit/client/hooks/useAgentSessionTurnCompletion.test.tsx b/test/unit/client/hooks/useAgentSessionTurnCompletion.test.tsx index a57024f4b..3db112779 100644 --- a/test/unit/client/hooks/useAgentSessionTurnCompletion.test.tsx +++ b/test/unit/client/hooks/useAgentSessionTurnCompletion.test.tsx @@ -5,6 +5,7 @@ import { Provider } from 'react-redux' import { configureStore } from '@reduxjs/toolkit' import freshAgentReducer, { setSessionStatus, addPermissionRequest } from '@/store/freshAgentSlice' import turnCompletionReducer from '@/store/turnCompletionSlice' +import { applyFreshAgentCompletion } from '@/store/turnCompletionThunks' import { useAgentSessionTurnCompletion } from '@/hooks/useAgentSessionTurnCompletion' import type { PaneNode } from '@/store/paneTypes' @@ -45,20 +46,18 @@ function render(store: ReturnType) { const claudeRunning = { sessionId: 'abc', sessionType: 'freshclaude' as const, provider: 'claude' as const } describe('useAgentSessionTurnCompletion', () => { - it('fires green+attention once on a fresh-agent running -> idle transition', () => { + it('does NOT fire on a busy -> idle transition (turn completion is server-authoritative)', () => { + // Turn completion green/sound now flows from the server-authoritative + // freshAgent.turn.complete edge (applyFreshAgentCompletion), not from + // differentiating the client-side busy level. The hook must not re-derive it. const store = makeStore() act(() => { store.dispatch(setSessionStatus({ ...claudeRunning, status: 'running' })) }) render(store) - // first observation (running) does not fire expect(store.getState().turnCompletion.pendingEvents).toHaveLength(0) act(() => { store.dispatch(setSessionStatus({ ...claudeRunning, status: 'idle' })) }) - // The hook dispatches recordTurnComplete (-> pendingEvents); attention marking - // is done downstream by useTurnCompletionNotifications. - const events = store.getState().turnCompletion.pendingEvents - expect(events).toHaveLength(1) - expect(events[0]).toMatchObject({ tabId: TAB, paneId: PANE, terminalId: 'claude:abc' }) + expect(store.getState().turnCompletion.pendingEvents).toHaveLength(0) }) it('does NOT fire when the session is already idle on first observation (restore/hydration)', () => { @@ -83,6 +82,50 @@ describe('useAgentSessionTurnCompletion', () => { const events = store.getState().turnCompletion.pendingEvents expect(events).toHaveLength(1) - expect(events[0]).toMatchObject({ tabId: TAB, paneId: PANE, terminalId: 'claude:abc' }) + // The waiting-for-approval edge dedupes under a distinct namespace (`#waiting`) so it + // cannot poison the server turn-complete entry (`claude:abc`). + expect(events[0]).toMatchObject({ tabId: TAB, paneId: PANE, terminalId: 'claude:abc#waiting' }) + }) + + it('does not let a waiting-for-approval green swallow a later server completion', () => { + // For opencode/codex the pane session key equals the server completion key, so if the + // approval edge (CLIENT clock) and the server completion (SERVER clock) shared one + // monotonic dedupe entry, an approval stamped ahead of the server clock (common on a + // remote client) would suppress the real turn-complete. They must dedupe independently. + const SES = 'ses_op_1' + const opencodeLeaf: PaneNode = { + type: 'leaf', + id: 'pane-op', + content: { + kind: 'fresh-agent', + createRequestId: 'cr-op', + sessionType: 'freshopencode', + provider: 'opencode', + sessionId: SES, + sessionRef: { provider: 'opencode', sessionId: SES }, + } as never, + } + const store = configureStore({ + reducer: { + panes: () => ({ layouts: { 'tab-op': opencodeLeaf }, activePane: {} }) as never, + tabs: () => ({ activeTabId: 'tab-op' }) as never, + freshAgent: freshAgentReducer, + turnCompletion: turnCompletionReducer, + }, + }) + const opencodeRunning = { sessionId: SES, sessionType: 'freshopencode' as const, provider: 'opencode' as const } + act(() => { store.dispatch(setSessionStatus({ ...opencodeRunning, status: 'running' })) }) + render(store as never) + + // Permission prompt → waiting-for-approval green (stamped with the large client clock). + act(() => { store.dispatch(addPermissionRequest({ ...opencodeRunning, requestId: 'perm-op' } as never)) }) + expect(store.getState().turnCompletion.pendingEvents).toHaveLength(1) + + // The real server completion arrives with a much smaller (server-clock) `at`. + act(() => { store.dispatch(applyFreshAgentCompletion({ provider: 'opencode', sessionId: SES, at: 1000 })) }) + + const events = store.getState().turnCompletion.pendingEvents + expect(events).toHaveLength(2) + expect(events.some((e) => e.terminalId === `opencode:${SES}`)).toBe(true) }) }) diff --git a/test/unit/client/lib/fresh-agent-turn-complete.test.ts b/test/unit/client/lib/fresh-agent-turn-complete.test.ts new file mode 100644 index 000000000..20f765de6 --- /dev/null +++ b/test/unit/client/lib/fresh-agent-turn-complete.test.ts @@ -0,0 +1,157 @@ +import { describe, expect, it } from 'vitest' +import { configureStore } from '@reduxjs/toolkit' +import freshAgentReducer, { setSessionStatus } from '@/store/freshAgentSlice' +import turnCompletionReducer from '@/store/turnCompletionSlice' +import { handleFreshAgentMessage } from '@/lib/fresh-agent-ws' +import type { PaneNode } from '@/store/paneTypes' + +const TAB = 'tab-1' +const PANE = 'pane-1' +const SESSION_ID = 'ses_real_1' + +const freshOpencodeLeaf: PaneNode = { + type: 'leaf', + id: PANE, + content: { + kind: 'fresh-agent', + createRequestId: 'cr', + sessionType: 'freshopencode', + provider: 'opencode', + sessionId: SESSION_ID, + sessionRef: { provider: 'opencode', sessionId: SESSION_ID }, + } as never, +} + +function makeStore() { + return configureStore({ + reducer: { + panes: () => ({ layouts: { [TAB]: freshOpencodeLeaf }, activePane: {} }) as never, + tabs: () => ({ activeTabId: TAB }) as never, + freshAgent: freshAgentReducer, + turnCompletion: turnCompletionReducer, + }, + }) +} + +function turnCompleteMessage(at: number) { + return { + type: 'freshAgent.event', + sessionId: SESSION_ID, + sessionType: 'freshopencode', + provider: 'opencode', + event: { type: 'freshAgent.turn.complete', sessionId: SESSION_ID, at }, + } +} + +describe('server-authoritative fresh-agent turn completion (client)', () => { + it('routes a freshAgent.turn.complete event to recordTurnComplete for the owning tab/pane', () => { + const store = makeStore() + store.dispatch(setSessionStatus({ sessionId: SESSION_ID, sessionType: 'freshopencode', provider: 'opencode', status: 'idle' })) + + const handled = handleFreshAgentMessage(store.dispatch, turnCompleteMessage(1000)) + expect(handled).toBe(true) + + const events = store.getState().turnCompletion.pendingEvents + expect(events).toHaveLength(1) + expect(events[0]).toMatchObject({ tabId: TAB, paneId: PANE, terminalId: `opencode:${SESSION_ID}`, at: 1000 }) + }) + + it('dedupes a replayed/stale completion by the at-monotonic guard (no premature re-green)', () => { + const store = makeStore() + + handleFreshAgentMessage(store.dispatch, turnCompleteMessage(1000)) + // Same timestamp (replay) and an older timestamp must both be dropped. + handleFreshAgentMessage(store.dispatch, turnCompleteMessage(1000)) + handleFreshAgentMessage(store.dispatch, turnCompleteMessage(500)) + expect(store.getState().turnCompletion.pendingEvents).toHaveLength(1) + + // A strictly newer completion (next real turn) greens again. + handleFreshAgentMessage(store.dispatch, turnCompleteMessage(2000)) + expect(store.getState().turnCompletion.pendingEvents).toHaveLength(2) + }) + + it('drops a malformed completion without a numeric at instead of fabricating Date.now()', () => { + // The server always stamps a monotonic numeric `at` (every emit site uses + // nextMonotonicTurnCompleteAt). A completion without one is malformed; fabricating a + // client Date.now() would inject a timestamp that can collide with or regress against + // the server clock — swallowing a real later completion or spuriously greening. Drop it. + const store = makeStore() + store.dispatch(setSessionStatus({ sessionId: SESSION_ID, sessionType: 'freshopencode', provider: 'opencode', status: 'idle' })) + + const handled = handleFreshAgentMessage(store.dispatch, { + type: 'freshAgent.event', + sessionId: SESSION_ID, + sessionType: 'freshopencode', + provider: 'opencode', + event: { type: 'freshAgent.turn.complete', sessionId: SESSION_ID } as never, + }) + expect(handled).toBe(true) + expect(store.getState().turnCompletion.pendingEvents).toHaveLength(0) + + // A non-finite at (e.g. NaN from a bad parse) is likewise dropped. + handleFreshAgentMessage(store.dispatch, { + type: 'freshAgent.event', + sessionId: SESSION_ID, + sessionType: 'freshopencode', + provider: 'opencode', + event: { type: 'freshAgent.turn.complete', sessionId: SESSION_ID, at: Number.NaN } as never, + }) + expect(store.getState().turnCompletion.pendingEvents).toHaveLength(0) + }) + + it('ignores a completion for a session that owns no live pane', () => { + const store = makeStore() + const handled = handleFreshAgentMessage(store.dispatch, { + type: 'freshAgent.event', + sessionId: 'ses_unknown', + sessionType: 'freshopencode', + provider: 'opencode', + event: { type: 'freshAgent.turn.complete', sessionId: 'ses_unknown', at: 1000 }, + }) + expect(handled).toBe(true) + expect(store.getState().turnCompletion.pendingEvents).toHaveLength(0) + }) + + it('routes a Claude completion keyed by the runtime handle even when the pane carries a durable sessionRef', () => { + // A RESTORED Claude/kilroy pane looks like this: the resumed bridge session gets a + // fresh runtime handle (content.sessionId), while the persisted durable Claude UUID + // lives in content.sessionRef. The server keys the completion event by the runtime + // handle it subscribed with, so the lookup must match the runtime handle and not only + // the sessionRef-preferred key (which would silently drop the chime). + const RUNTIME_ID = 'claude-runtime-nanoid' + const DURABLE_ID = '11111111-2222-4333-8444-555555555555' + const claudeLeaf: PaneNode = { + type: 'leaf', + id: 'pane-claude', + content: { + kind: 'fresh-agent', + createRequestId: 'cr-claude', + sessionType: 'freshclaude', + provider: 'claude', + sessionId: RUNTIME_ID, + sessionRef: { provider: 'claude', sessionId: DURABLE_ID }, + } as never, + } + const store = configureStore({ + reducer: { + panes: () => ({ layouts: { 'tab-claude': claudeLeaf }, activePane: {} }) as never, + tabs: () => ({ activeTabId: 'tab-claude' }) as never, + freshAgent: freshAgentReducer, + turnCompletion: turnCompletionReducer, + }, + }) + + const handled = handleFreshAgentMessage(store.dispatch, { + type: 'freshAgent.event', + sessionId: RUNTIME_ID, + sessionType: 'freshclaude', + provider: 'claude', + event: { type: 'freshAgent.turn.complete', sessionId: RUNTIME_ID, at: 1000 }, + }) + expect(handled).toBe(true) + + const events = store.getState().turnCompletion.pendingEvents + expect(events).toHaveLength(1) + expect(events[0]).toMatchObject({ tabId: 'tab-claude', paneId: 'pane-claude', terminalId: `claude:${RUNTIME_ID}` }) + }) +}) diff --git a/test/unit/client/store/turnCompletionSlice.test.ts b/test/unit/client/store/turnCompletionSlice.test.ts index 4acb4d2fd..f4d817563 100644 --- a/test/unit/client/store/turnCompletionSlice.test.ts +++ b/test/unit/client/store/turnCompletionSlice.test.ts @@ -7,6 +7,7 @@ import reducer, { markTabAttention, markPaneAttention, recordTurnComplete, + resetCompletionDedupeBaselines, type TurnCompletionState, } from '@/store/turnCompletionSlice' import panesReducer from '@/store/panesSlice' @@ -163,6 +164,23 @@ describe('turnCompletionSlice', () => { expect(state.lastAppliedCompletionSeqByTerminalId?.['term-1']).toBe(9) }) + it('resetCompletionDedupeBaselines clears the at baseline (so a post-restart lower at re-fires) but preserves attention', () => { + // Across a real server restart the client store survives, but the fresh process may + // stamp a lower wall-clock `at` than the (possibly clamp-inflated) pre-restart value. + // Resetting the baseline lets that genuine completion through instead of dropping it. + let state = reducer(undefined, recordTurnComplete({ tabId: 't', paneId: 'p', terminalId: 'opencode:ses', at: 5000 })) + state = reducer(state, markTabAttention({ tabId: 't' })) + expect(state.pendingEvents).toHaveLength(1) + + state = reducer(state, resetCompletionDedupeBaselines()) + // Unacknowledged attention must survive a restart. + expect(state.attentionByTab['t']).toBe(true) + + // A lower `at` would have been dropped as stale without the reset. + state = reducer(state, recordTurnComplete({ tabId: 't', paneId: 'p', terminalId: 'opencode:ses', at: 1000 })) + expect(state.pendingEvents.some((e) => e.at === 1000)).toBe(true) + }) + it('consumes pending events up through the handled sequence', () => { let state = reducer( undefined, diff --git a/test/unit/server/fresh-agent/codex-adapter.test.ts b/test/unit/server/fresh-agent/codex-adapter.test.ts index f2307ed8d..2c1f38edb 100644 --- a/test/unit/server/fresh-agent/codex-adapter.test.ts +++ b/test/unit/server/fresh-agent/codex-adapter.test.ts @@ -1080,6 +1080,211 @@ describe('Codex fresh-agent adapter', () => { expect(off).toHaveBeenCalledTimes(1) }) + it('emits a server-authoritative sdk.turn.complete only for a completed turn on the subscribed thread', async () => { + let turnCompletedHandler: ((event: any) => void) | undefined + const offLifecycle = vi.fn() + const offTurnCompleted = vi.fn() + const runtime = { + startThread: vi.fn(), + resumeThread: vi.fn(), + onThreadLifecycle: vi.fn(() => offLifecycle), + onTurnCompleted: vi.fn((handler) => { + turnCompletedHandler = handler + return offTurnCompleted + }), + readThread: vi.fn(), + listThreadTurns: vi.fn(), + readThreadTurn: vi.fn(), + } + const adapter = createCodexFreshAgentAdapter({ runtime: runtime as any }) + const listener = vi.fn() + + const unsubscribe = await adapter.subscribe?.('thread-new-1', listener) + expect(runtime.onTurnCompleted).toHaveBeenCalledWith(expect.any(Function)) + + // Real codex turn/completed carries the authoritative status inline at params.turn.status. + // A completed turn on a different thread is ignored. + turnCompletedHandler?.({ + threadId: 'other-thread', + params: { threadId: 'other-thread', turn: { id: 'turn-x', status: 'completed' } }, + }) + expect(listener).not.toHaveBeenCalledWith(expect.objectContaining({ type: 'sdk.turn.complete' })) + + // An interrupted turn on the subscribed thread must NOT chime. + turnCompletedHandler?.({ + threadId: 'thread-new-1', + params: { threadId: 'thread-new-1', turn: { id: 'turn-1', status: 'interrupted' } }, + }) + expect(listener).not.toHaveBeenCalledWith(expect.objectContaining({ type: 'sdk.turn.complete' })) + + // A completed turn on the subscribed thread chimes exactly once. + turnCompletedHandler?.({ + threadId: 'thread-new-1', + params: { threadId: 'thread-new-1', turn: { id: 'turn-2', status: 'completed' } }, + }) + const completeCalls = listener.mock.calls.filter(([event]) => event?.type === 'sdk.turn.complete') + expect(completeCalls).toHaveLength(1) + expect(completeCalls[0][0]).toMatchObject({ type: 'sdk.turn.complete', sessionId: 'thread-new-1' }) + expect(typeof completeCalls[0][0].at).toBe('number') + + unsubscribe?.() + expect(offLifecycle).toHaveBeenCalledTimes(1) + expect(offTurnCompleted).toHaveBeenCalledTimes(1) + }) + + it('chimes for a flat params.status completion shape and skips a flat interrupted', async () => { + // The app-server client passes the notification params straight through, and the + // repo's own client tests model turn/completed as a FLAT { threadId, turnId, status } + // (status at params.status, not params.turn.status). Freshcodex must detect that shape + // too, or green/sound silently never fires. + let turnCompletedHandler: ((event: any) => void) | undefined + const runtime = { + startThread: vi.fn(), + resumeThread: vi.fn(), + onThreadLifecycle: vi.fn(() => vi.fn()), + onTurnCompleted: vi.fn((handler) => { + turnCompletedHandler = handler + return vi.fn() + }), + readThread: vi.fn(), + listThreadTurns: vi.fn(), + readThreadTurn: vi.fn(), + } + const adapter = createCodexFreshAgentAdapter({ runtime: runtime as any }) + const listener = vi.fn() + await adapter.subscribe?.('thread-new-1', listener) + + turnCompletedHandler?.({ threadId: 'thread-new-1', params: { threadId: 'thread-new-1', turnId: 'turn-1', status: 'interrupted' } }) + expect(listener).not.toHaveBeenCalledWith(expect.objectContaining({ type: 'sdk.turn.complete' })) + + turnCompletedHandler?.({ threadId: 'thread-new-1', params: { threadId: 'thread-new-1', turnId: 'turn-2', status: 'completed' } }) + const completeCalls = listener.mock.calls.filter(([event]) => event?.type === 'sdk.turn.complete') + expect(completeCalls).toHaveLength(1) + expect(completeCalls[0][0]).toMatchObject({ type: 'sdk.turn.complete', sessionId: 'thread-new-1' }) + }) + + it('stamps a strictly-increasing at on successive completed turns even within the same millisecond', async () => { + let turnCompletedHandler: ((event: any) => void) | undefined + const runtime = { + startThread: vi.fn(), + resumeThread: vi.fn(), + onThreadLifecycle: vi.fn(() => vi.fn()), + onTurnCompleted: vi.fn((handler) => { + turnCompletedHandler = handler + return vi.fn() + }), + readThread: vi.fn(), + listThreadTurns: vi.fn(), + readThreadTurn: vi.fn(), + } + const adapter = createCodexFreshAgentAdapter({ runtime: runtime as any }) + const listener = vi.fn() + await adapter.subscribe?.('thread-new-1', listener) + + const nowSpy = vi.spyOn(Date, 'now').mockReturnValue(7000) + try { + turnCompletedHandler?.({ threadId: 'thread-new-1', params: { threadId: 'thread-new-1', turn: { id: 'turn-1', status: 'completed' } } }) + turnCompletedHandler?.({ threadId: 'thread-new-1', params: { threadId: 'thread-new-1', turn: { id: 'turn-2', status: 'completed' } } }) + } finally { + nowSpy.mockRestore() + } + + const ats = listener.mock.calls + .map(([event]) => event) + .filter((event) => event?.type === 'sdk.turn.complete') + .map((event) => event.at) + expect(ats).toHaveLength(2) + expect(ats[1]).toBeGreaterThan(ats[0]) + }) + + it('keeps the turn-complete clock monotonic per thread across a re-subscribe (WS reconnect)', async () => { + // WS fresh-agent subscriptions are torn down and recreated on reconnect, but the + // client store's dedupe state survives. The monotonic `at` clamp must therefore live + // on per-thread adapter state (like Claude/OpenCode session state), not the subscribe + // closure, or a same-ms / backward-clock completion right after a reconnect is dropped. + let turnCompletedHandler: ((event: any) => void) | undefined + const runtime = { + startThread: vi.fn(), + resumeThread: vi.fn(), + onThreadLifecycle: vi.fn(() => vi.fn()), + onTurnCompleted: vi.fn((handler) => { + turnCompletedHandler = handler + return vi.fn() + }), + readThread: vi.fn(), + listThreadTurns: vi.fn(), + readThreadTurn: vi.fn(), + } + const adapter = createCodexFreshAgentAdapter({ runtime: runtime as any }) + + const nowSpy = vi.spyOn(Date, 'now').mockReturnValue(9000) + try { + const firstListener = vi.fn() + const unsub1 = await adapter.subscribe?.('thread-new-1', firstListener) + turnCompletedHandler?.({ threadId: 'thread-new-1', params: { threadId: 'thread-new-1', turn: { id: 't1', status: 'completed' } } }) + unsub1?.() + + // Reconnect: a brand-new subscription to the same thread, same wall-clock ms. + const secondListener = vi.fn() + await adapter.subscribe?.('thread-new-1', secondListener) + turnCompletedHandler?.({ threadId: 'thread-new-1', params: { threadId: 'thread-new-1', turn: { id: 't2', status: 'completed' } } }) + + const firstAt = firstListener.mock.calls.map(([e]) => e).find((e) => e?.type === 'sdk.turn.complete')?.at + const secondAt = secondListener.mock.calls.map(([e]) => e).find((e) => e?.type === 'sdk.turn.complete')?.at + expect(firstAt).toBe(9000) + expect(secondAt).toBeGreaterThan(firstAt) + } finally { + nowSpy.mockRestore() + } + }) + + it('resets the per-thread turn-complete clock on shutdown (not just on reconnect)', async () => { + // shutdown() must clear *all* per-thread state, including the turn-complete clock, + // so a reused-in-process adapter never clamps a fresh completion against a stale + // pre-shutdown timestamp. (A plain reconnect deliberately keeps the clock — see the + // test above — but a full shutdown is a clean slate.) + let turnCompletedHandler: ((event: any) => void) | undefined + const runtime = { + startThread: vi.fn(), + resumeThread: vi.fn(), + onThreadLifecycle: vi.fn(() => vi.fn()), + onTurnCompleted: vi.fn((handler) => { + turnCompletedHandler = handler + return vi.fn() + }), + readThread: vi.fn(), + listThreadTurns: vi.fn(), + readThreadTurn: vi.fn(), + } + // Injected (non-owned) runtime survives shutdown(), so the post-shutdown resubscribe + // reuses it and we can observe the clock starting fresh. + const adapter = createCodexFreshAgentAdapter({ runtime: runtime as any }) + + const nowSpy = vi.spyOn(Date, 'now') + try { + nowSpy.mockReturnValue(9000) + const firstListener = vi.fn() + await adapter.subscribe?.('thread-new-1', firstListener) + turnCompletedHandler?.({ threadId: 'thread-new-1', params: { threadId: 'thread-new-1', turn: { id: 't1', status: 'completed' } } }) + + await adapter.shutdown?.() + + // Reuse the same thread id after shutdown, with an *earlier* wall clock. + nowSpy.mockReturnValue(5000) + const secondListener = vi.fn() + await adapter.subscribe?.('thread-new-1', secondListener) + turnCompletedHandler?.({ threadId: 'thread-new-1', params: { threadId: 'thread-new-1', turn: { id: 't2', status: 'completed' } } }) + + const firstAt = firstListener.mock.calls.map(([e]) => e).find((e) => e?.type === 'sdk.turn.complete')?.at + const secondAt = secondListener.mock.calls.map(([e]) => e).find((e) => e?.type === 'sdk.turn.complete')?.at + expect(firstAt).toBe(9000) + // Without the shutdown reset, the stale 9000 would clamp this to 9001. + expect(secondAt).toBe(5000) + } finally { + nowSpy.mockRestore() + } + }) + it('lazily resumes a Codex runtime before subscribing to a persisted thread after server reload', async () => { let lifecycleHandler: ((event: any) => void) | undefined const off = vi.fn() diff --git a/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts b/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts index 7736a6754..e60c2a71e 100644 --- a/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts +++ b/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts @@ -134,6 +134,181 @@ describe('OpenCode serve adapter: create + send', () => { expect(events).toContainEqual({ type: 'sdk.session.snapshot', sessionId: 'freshopencode-req-3', status: 'idle' }) }) + it('emits exactly one server-authoritative sdk.turn.complete on a successful send', async () => { + const manager = makeFakeManager() + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-tc', sessionType: 'freshopencode', provider: 'opencode' }) + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-tc', (e) => events.push(e)) + await adapter.send?.('freshopencode-req-tc', { text: 'go' }) + // A second idle snapshot relayed from the serve SSE must NOT produce a second completion. + manager._emit('ses_real_1', { kind: 'session.idle', sessionId: 'ses_real_1', raw: { type: 'session.idle', properties: { sessionID: 'ses_real_1' } } }) + + const completions = events.filter((e): e is { type: string; sessionId: string; at: number } => + !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete') + expect(completions).toHaveLength(1) + expect(completions[0].sessionId).toBe('freshopencode-req-tc') + expect(typeof completions[0].at).toBe('number') + }) + + it('stamps a strictly-increasing at across successive completions even at the same wall-clock ms', async () => { + const manager = makeFakeManager() + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-mono', sessionType: 'freshopencode', provider: 'opencode' }) + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-mono', (e) => events.push(e)) + const nowSpy = vi.spyOn(Date, 'now').mockReturnValue(5000) + try { + await adapter.send?.('freshopencode-req-mono', { text: 'one' }) + await adapter.send?.('freshopencode-req-mono', { text: 'two' }) + } finally { + nowSpy.mockRestore() + } + const ats = events + .filter((e): e is { type: string; at: number } => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete') + .map((e) => e.at) + expect(ats).toHaveLength(2) + expect(ats[1]).toBeGreaterThan(ats[0]) + }) + + it('does NOT emit sdk.turn.complete when the in-flight turn is interrupted, even though onceIdle resolves on the abort-triggered idle', async () => { + // interrupt() aborts the turn; the sidecar then emits session.idle, which RESOLVES + // onceIdle (it does not reject). Without tracking the abort, the success path would + // fire a false chime/green for an interrupted turn. + const manager = makeFakeManager() + let resolveIdle: (() => void) | undefined + manager.onceIdle = vi.fn(() => new Promise((resolve) => { resolveIdle = resolve })) + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-int', sessionType: 'freshopencode', provider: 'opencode' }) + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-int', (e) => events.push(e)) + + const sendPromise = adapter.send?.('freshopencode-req-int', { text: 'go' }) + await vi.waitFor(() => expect(manager.onceIdle).toHaveBeenCalled()) + + await adapter.interrupt?.('freshopencode-req-int') + resolveIdle?.() + await sendPromise + + const completions = events.filter((e) => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete') + expect(completions).toHaveLength(0) + // The interrupt still returns the pane to idle (clears blue) — it just must not chime. + expect(events).toContainEqual({ type: 'sdk.session.snapshot', sessionId: 'freshopencode-req-int', status: 'idle' }) + }) + + it('resumes chiming on the next completed turn after an interrupt', async () => { + const manager = makeFakeManager() + let resolveIdle: (() => void) | undefined + manager.onceIdle = vi.fn(() => new Promise((resolve) => { resolveIdle = resolve })) + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-int2', sessionType: 'freshopencode', provider: 'opencode' }) + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-int2', (e) => events.push(e)) + + const interrupted = adapter.send?.('freshopencode-req-int2', { text: 'one' }) + await vi.waitFor(() => expect(manager.onceIdle).toHaveBeenCalledTimes(1)) + await adapter.interrupt?.('freshopencode-req-int2') + resolveIdle?.() + await interrupted + + // A subsequent clean turn (no interrupt) must chime again — the abort flag must not stick. + manager.onceIdle = vi.fn(async () => undefined) + await adapter.send?.('freshopencode-req-int2', { text: 'two' }) + const completions = events.filter((e) => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete') + expect(completions).toHaveLength(1) + }) + + it('still chimes when an interrupt abort request fails and the turn then completes normally', async () => { + // If the abort POST fails, the turn was NOT actually interrupted and may complete + // normally. The abort flag must not stick, or a real completion gets no green/sound. + const manager = makeFakeManager() + let resolveIdle: (() => void) | undefined + manager.onceIdle = vi.fn(() => new Promise((resolve) => { resolveIdle = resolve })) + manager.abort = vi.fn(async () => { throw new Error('abort failed') }) + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-af', sessionType: 'freshopencode', provider: 'opencode' }) + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-af', (e) => events.push(e)) + + const sendPromise = adapter.send?.('freshopencode-req-af', { text: 'go' }) + await vi.waitFor(() => expect(manager.onceIdle).toHaveBeenCalled()) + + await expect(adapter.interrupt?.('freshopencode-req-af')).rejects.toThrow('abort failed') + // Abort failed → the turn proceeds and completes normally. + resolveIdle?.() + await sendPromise + + const completions = events.filter((e) => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete') + expect(completions).toHaveLength(1) + }) + + it('does NOT emit sdk.turn.complete when the turn reports session.error before going idle', async () => { + // OpenCode surfaces a failed turn via an out-of-band `session.error` SSE event and + // then lets the session go idle. onceIdle resolves on that idle (it never inspects the + // error), so the success path must independently know the turn errored — otherwise a + // failed turn falsely greens/chimes as a positive completion. This is the OpenCode + // analogue of Claude's `subtype === 'success'` and Codex's `status === 'completed'`. + const manager = makeFakeManager() + let resolveIdle: (() => void) | undefined + manager.onceIdle = vi.fn(() => new Promise((resolve) => { resolveIdle = resolve })) + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-err', sessionType: 'freshopencode', provider: 'opencode' }) + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-err', (e) => events.push(e)) + + const sendPromise = adapter.send?.('freshopencode-req-err', { text: 'go' }) + await vi.waitFor(() => expect(manager.onceIdle).toHaveBeenCalled()) + + // The turn errors (relayed as sdk.error) and then the session goes idle. + manager._emit('ses_real_1', { kind: 'session.error', sessionId: 'ses_real_1', properties: { error: { message: 'provider boom' } } }) + resolveIdle?.() + await sendPromise + + const completions = events.filter((e) => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete') + expect(completions).toHaveLength(0) + // The error is still surfaced, and the pane still returns to idle (clears blue). + expect(events.some((e) => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.error')).toBe(true) + expect(events).toContainEqual({ type: 'sdk.session.snapshot', sessionId: 'freshopencode-req-err', status: 'idle' }) + }) + + it('resumes chiming on the next clean turn after an errored turn', async () => { + // The error flag must reset per turn, exactly like the abort flag — a single failed + // turn must not permanently suppress completion chimes. + const manager = makeFakeManager() + let resolveIdle: (() => void) | undefined + manager.onceIdle = vi.fn(() => new Promise((resolve) => { resolveIdle = resolve })) + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-err2', sessionType: 'freshopencode', provider: 'opencode' }) + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-err2', (e) => events.push(e)) + + const errored = adapter.send?.('freshopencode-req-err2', { text: 'one' }) + await vi.waitFor(() => expect(manager.onceIdle).toHaveBeenCalledTimes(1)) + manager._emit('ses_real_1', { kind: 'session.error', sessionId: 'ses_real_1', properties: { error: { message: 'boom' } } }) + resolveIdle?.() + await errored + + // A subsequent clean turn (no error) must chime again — the error flag must not stick. + manager.onceIdle = vi.fn(async () => undefined) + await adapter.send?.('freshopencode-req-err2', { text: 'two' }) + const completions = events.filter((e) => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete') + expect(completions).toHaveLength(1) + }) + + it('does NOT emit sdk.turn.complete when a send aborts (onceIdle rejects)', async () => { + const manager = makeFakeManager() + manager.onceIdle = vi.fn(() => Promise.reject(new Error('opencode serve sidecar was lost.'))) + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-abort', sessionType: 'freshopencode', provider: 'opencode' }) + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-abort', (e) => events.push(e)) + await expect(adapter.send?.('freshopencode-req-abort', { text: 'go' })).rejects.toThrow() + + // The catch path still returns the pane to idle (clearing blue) but must not chime. + expect(events).toContainEqual({ type: 'sdk.session.snapshot', sessionId: 'freshopencode-req-abort', status: 'idle' }) + expect(events.find((e) => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete')).toBeUndefined() + }) + it('emits running before first-send session materialization resolves', async () => { const manager = makeFakeManager() const createSession = createDeferred<{ id: string; directory?: string; title?: string }>() @@ -716,6 +891,17 @@ describe('OpenCode serve adapter: control', () => { expect(manager.compact).toHaveBeenCalledWith('ses_real_1') }) + it('emits a server-authoritative sdk.turn.complete on a successful compact', async () => { + // Removing the client busy->idle derivation left compact (a user-visible /compact + // command) with no completion edge; like a normal send it must green/chime when done. + const { adapter } = await materialized() + const events: unknown[] = [] + adapter.subscribe?.('freshopencode-req-c', (e) => events.push(e)) + await adapter.compact?.('freshopencode-req-c', { instructions: 'trim' }) + const completions = events.filter((e) => !!e && typeof e === 'object' && (e as { type?: unknown }).type === 'sdk.turn.complete') + expect(completions).toHaveLength(1) + }) + it('fork registers child state so the child session can be sent/subscribed', async () => { const { manager, adapter } = await materialized() await expect(adapter.fork?.('freshopencode-req-c')).resolves.toEqual({ diff --git a/test/unit/server/fresh-agent/sdk-events.test.ts b/test/unit/server/fresh-agent/sdk-events.test.ts new file mode 100644 index 000000000..22a10e498 --- /dev/null +++ b/test/unit/server/fresh-agent/sdk-events.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, it } from 'vitest' + +import { normalizeFreshAgentProviderEvent } from '../../../../server/fresh-agent/sdk-events.js' + +describe('normalizeFreshAgentProviderEvent', () => { + it('maps a provider sdk.turn.complete edge to a freshAgent.turn.complete event', () => { + const normalized = normalizeFreshAgentProviderEvent({ + type: 'sdk.turn.complete', + sessionId: 'ses_123', + at: 1782200000123, + }) + + expect(normalized).toEqual({ + type: 'freshAgent.turn.complete', + sessionId: 'ses_123', + at: 1782200000123, + }) + }) + + it('passes through an already-normalized freshAgent.turn.complete event unchanged', () => { + const event = { type: 'freshAgent.turn.complete', sessionId: 'ses_123', at: 5 } + expect(normalizeFreshAgentProviderEvent(event)).toBe(event) + }) +}) diff --git a/test/unit/server/fresh-agent/turn-complete-clock.test.ts b/test/unit/server/fresh-agent/turn-complete-clock.test.ts new file mode 100644 index 000000000..3ae03fc8a --- /dev/null +++ b/test/unit/server/fresh-agent/turn-complete-clock.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, it } from 'vitest' +import { nextMonotonicTurnCompleteAt } from '../../../../server/fresh-agent/turn-complete-clock.js' + +describe('nextMonotonicTurnCompleteAt', () => { + it('uses the wall clock for the first completion of a session', () => { + expect(nextMonotonicTurnCompleteAt(undefined, 1000)).toBe(1000) + }) + + it('uses the wall clock when it has advanced past the previous completion', () => { + expect(nextMonotonicTurnCompleteAt(1000, 1500)).toBe(1500) + }) + + it('breaks a same-millisecond tie so two distinct turns never collide', () => { + // Two genuine completions stamped in the same Date.now() millisecond must remain + // distinguishable, otherwise the client at<=last dedupe would swallow the second. + expect(nextMonotonicTurnCompleteAt(1000, 1000)).toBe(1001) + }) + + it('never regresses when the system clock steps backwards', () => { + // A backward clock step (NTP correction) must not make a real later completion look + // like a stale replay. + expect(nextMonotonicTurnCompleteAt(2000, 1500)).toBe(2001) + }) +}) diff --git a/test/unit/server/sdk-bridge.test.ts b/test/unit/server/sdk-bridge.test.ts index 2516201a2..0fdf45ee6 100644 --- a/test/unit/server/sdk-bridge.test.ts +++ b/test/unit/server/sdk-bridge.test.ts @@ -311,6 +311,87 @@ describe('SdkBridge', () => { expect(bridge.getSession(session.sessionId)?.totalInputTokens).toBe(1000) }) + it('emits a server-authoritative sdk.turn.complete edge on a successful result', async () => { + mockKeepStreamOpen = true + mockMessages.push({ + type: 'result', + subtype: 'success', + duration_ms: 3000, + is_error: false, + num_turns: 1, + total_cost_usd: 0.05, + usage: { input_tokens: 10, output_tokens: 5 }, + session_id: 'cli-123', + uuid: 'test-uuid', + }) + + const session = await bridge.createSession({ cwd: '/tmp' }) + const received: any[] = [] + bridge.subscribe(session.sessionId, (msg) => received.push(msg)) + + await new Promise(resolve => setTimeout(resolve, 100)) + + const completion = received.find(m => m.type === 'sdk.turn.complete') + expect(completion).toBeDefined() + expect(completion.sessionId).toBe(session.sessionId) + expect(typeof completion.at).toBe('number') + // The completion edge follows the result so the client clears blue before greening. + const order = received.map(m => m.type) + expect(order.indexOf('sdk.turn.complete')).toBeGreaterThan(order.indexOf('sdk.result')) + }) + + it('stamps a strictly-increasing at on successive completions even within the same millisecond', async () => { + mockKeepStreamOpen = true + const success = { + type: 'result' as const, + subtype: 'success' as const, + duration_ms: 1, + is_error: false, + num_turns: 1, + session_id: 'cli-123', + uuid: 'test-uuid', + } + mockMessages.push({ ...success }, { ...success }) + const nowSpy = vi.spyOn(Date, 'now').mockReturnValue(1000) + try { + const session = await bridge.createSession({ cwd: '/tmp' }) + const received: any[] = [] + bridge.subscribe(session.sessionId, (msg) => received.push(msg)) + + await new Promise(resolve => setTimeout(resolve, 100)) + + const ats = received.filter(m => m.type === 'sdk.turn.complete').map(m => m.at) + expect(ats).toHaveLength(2) + // Same wall-clock ms for both, but the per-session clamp keeps them distinct so + // the client never drops the second turn as a replay. + expect(ats[1]).toBeGreaterThan(ats[0]) + } finally { + nowSpy.mockRestore() + } + }) + + it('does NOT emit sdk.turn.complete when a turn ends with a non-success result subtype', async () => { + mockKeepStreamOpen = true + mockMessages.push({ + type: 'result', + subtype: 'error_during_execution', + duration_ms: 1000, + is_error: true, + num_turns: 1, + session_id: 'cli-123', + uuid: 'test-uuid', + }) + + const session = await bridge.createSession({ cwd: '/tmp' }) + const received: any[] = [] + bridge.subscribe(session.sessionId, (msg) => received.push(msg)) + + await new Promise(resolve => setTimeout(resolve, 100)) + + expect(received.find(m => m.type === 'sdk.result')).toBeDefined() + expect(received.find(m => m.type === 'sdk.turn.complete')).toBeUndefined() + }) + it('translates stream_event with parent_tool_use_id', async () => { mockKeepStreamOpen = true mockMessages.push({