diff --git a/server/fresh-agent/adapters/codex/adapter.ts b/server/fresh-agent/adapters/codex/adapter.ts index a0c98979..325d8c4a 100644 --- a/server/fresh-agent/adapters/codex/adapter.ts +++ b/server/fresh-agent/adapters/codex/adapter.ts @@ -900,16 +900,22 @@ export function createCodexFreshAgentAdapter(deps: { listener(makeCodexStatusEvent(sessionId, event.status)) }) - // Server-authoritative turn-complete edge for the GREEN/SOUND pipeline. The - // app-server fires turn/completed for interrupts too, carrying the authoritative - // outcome inline at params.turn.status, so we chime only for a positive - // completion ('completed') and never on interrupt/failure. + // onTurnCompleted fires after the turn is committed to the app-server's + // thread history. thread_status_changed(idle) can fire BEFORE that commit, + // leaving the client with an empty transcript. Emit an idle snapshot here + // to make the client re-fetch the committed transcript (parity with + // freshopencode's post-idle emit). const offTurnCompleted = runtime.onTurnCompleted?.((event) => { if (event.threadId !== sessionId) return - // turn/completed fires for interrupts/failures too, so chime only on a positive - // completion. The authoritative status appears either inline at params.turn.status - // (codex-cli 0.142.0, probed live) or flat at params.status (the shape the - // app-server client tests model); accept either so neither version silently fails. + activeTurnByThread.delete(sessionId) + listener(makeCodexStatusEvent(sessionId, 'idle')) + + // Server-authoritative turn-complete edge for the GREEN/SOUND pipeline. + // turn/completed fires for interrupts/failures too, so chime only on a + // positive completion. The authoritative status appears either inline at + // params.turn.status (codex-cli 0.142.0, probed live) or flat at + // params.status (the shape the app-server client tests model); accept + // either so neither version silently fails. const params = event.params as { status?: unknown; turn?: { status?: unknown } } | undefined const status = params?.turn?.status ?? params?.status if (status !== 'completed') return @@ -917,7 +923,6 @@ export function createCodexFreshAgentAdapter(deps: { lastTurnCompleteAtByThread.set(sessionId, at) listener({ type: 'sdk.turn.complete', sessionId, at }) }) - return () => { offLifecycle() offTurnCompleted?.() diff --git a/server/fresh-agent/adapters/opencode/adapter.ts b/server/fresh-agent/adapters/opencode/adapter.ts index 15a5d430..3fb79313 100644 --- a/server/fresh-agent/adapters/opencode/adapter.ts +++ b/server/fresh-agent/adapters/opencode/adapter.ts @@ -171,7 +171,12 @@ export function createOpencodeFreshAgentAdapter(options: CreateOpencodeFreshAgen } try { const status = await getSessionStatus.call(serveManager, realId, cwdRoute(state.cwd) ?? {}) - if (!status || typeof status !== 'object' || Array.isArray(status) || typeof status.type !== 'string') { + // The opencode /session/status map only reports active (busy/retry) sessions, + // so an idle session is absent (undefined). Treat a missing entry as idle — + // consistent with the serve manager's onceIdle treatment of absence as idle — + // rather than logging a false-positive malformed warning. + if (status == null) return + if (typeof status !== 'object' || Array.isArray(status) || typeof status.type !== 'string') { log.warn({ ...logContext, reason: 'malformed_session_status', diff --git a/server/fresh-agent/adapters/opencode/serve-manager.ts b/server/fresh-agent/adapters/opencode/serve-manager.ts index a2fdfada..a0df6d61 100644 --- a/server/fresh-agent/adapters/opencode/serve-manager.ts +++ b/server/fresh-agent/adapters/opencode/serve-manager.ts @@ -112,13 +112,13 @@ export class OpencodeServeManager { private shutdownRequested = false constructor(options: OpencodeServeManagerOptions = {}) { - this.command = options.command ?? 'opencode' + this.env = options.env ?? process.env + this.command = options.command ?? (this.env.OPENCODE_CMD || 'opencode') this.spawnFn = options.spawnFn ?? spawn this.fetchFn = options.fetchFn ?? fetch this.allocatePort = options.allocatePort ?? allocateLocalhostPort this.connectEventStream = options.connectEventStream this.healthTimeoutMs = options.healthTimeoutMs ?? 20_000 - this.env = options.env ?? process.env this.idlePollMs = options.idlePollMs ?? DEFAULT_IDLE_POLL_MS this.requestTimeoutMs = options.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS } diff --git a/server/fresh-agent/runtime-manager.ts b/server/fresh-agent/runtime-manager.ts index d3b3914b..b1d25834 100644 --- a/server/fresh-agent/runtime-manager.ts +++ b/server/fresh-agent/runtime-manager.ts @@ -205,7 +205,7 @@ export class FreshAgentRuntimeManager { } async subscribe(locator: FreshAgentSessionLocator, listener: (message: unknown) => void) { - const record = this.requireSession(locator) + const record = await this.requireOrRecoverSession(locator) if (!record.adapter.subscribe) { throw new FreshAgentUnsupportedCapabilityError(`Subscribe is not supported for ${record.sessionType}`) } diff --git a/test/unit/server/fresh-agent/codex-adapter.test.ts b/test/unit/server/fresh-agent/codex-adapter.test.ts index 2c1f38ed..3f3bff08 100644 --- a/test/unit/server/fresh-agent/codex-adapter.test.ts +++ b/test/unit/server/fresh-agent/codex-adapter.test.ts @@ -1132,6 +1132,67 @@ describe('Codex fresh-agent adapter', () => { expect(offTurnCompleted).toHaveBeenCalledTimes(1) }) + it('emits a snapshot event after a codex turn completes so the client re-fetches the committed transcript', async () => { + let lifecycleHandler: ((event: any) => void) | undefined + let turnCompletedHandler: ((event: any) => void) | undefined + const offLifecycle = vi.fn() + const offTurnCompleted = vi.fn() + const runtime = { + startThread: vi.fn(), + resumeThread: vi.fn(), + onThreadLifecycle: vi.fn((handler: any) => { + lifecycleHandler = handler + return offLifecycle + }), + onTurnCompleted: vi.fn((handler: any) => { + turnCompletedHandler = handler + return offTurnCompleted + }), + readThread: vi.fn(), + listThreadTurns: vi.fn(), + readThreadTurn: vi.fn(), + } + const adapter = createCodexFreshAgentAdapter({ runtime: runtime as any }) + const listener = vi.fn() + + const unsubscribe = await adapter.subscribe?.('thread-new-1', listener) + + expect(runtime.onThreadLifecycle).toHaveBeenCalledWith(expect.any(Function)) + expect(runtime.onTurnCompleted).toHaveBeenCalledWith(expect.any(Function)) + + // thread_status_changed(idle) fires BEFORE the completed turn is committed + // to the app-server's thread history, so the client re-fetches but gets + // an empty transcript. This produces one idle snapshot. + lifecycleHandler?.({ + kind: 'thread_status_changed', + threadId: 'thread-new-1', + status: { type: 'idle' }, + }) + + const idleSnapshotsBeforeCompletion = listener.mock.calls.filter( + ([event]: any[]) => event?.type === 'sdk.session.snapshot' && event?.status === 'idle', + ) + expect(idleSnapshotsBeforeCompletion).toHaveLength(1) + + // onTurnCompleted fires AFTER the turn is committed to the thread history. + // The adapter must emit another snapshot-invalidating event so the client + // re-fetches and renders the committed transcript (parity with freshopencode). + turnCompletedHandler?.({ threadId: 'thread-new-1', turnId: 'turn-1', params: {} }) + + const idleSnapshotsAfterCompletion = listener.mock.calls.filter( + ([event]: any[]) => event?.type === 'sdk.session.snapshot' && event?.status === 'idle', + ) + expect(idleSnapshotsAfterCompletion).toHaveLength(2) + + // Turn-completed events for other threads must not trigger emission. + turnCompletedHandler?.({ threadId: 'other-thread', turnId: 'turn-2', params: {} }) + expect(idleSnapshotsAfterCompletion).toHaveLength(2) + + unsubscribe?.() + expect(offLifecycle).toHaveBeenCalledTimes(1) + expect(offTurnCompleted).toHaveBeenCalledTimes(1) + }) + it('chimes for a flat params.status completion shape and skips a flat interrupted', async () => { // The app-server client passes the notification params straight through, and the // repo's own client tests model turn/completed as a FLAT { threadId, turnId, status } diff --git a/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts b/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts index e60c2a71..823f4822 100644 --- a/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts +++ b/test/unit/server/fresh-agent/opencode-serve-adapter.test.ts @@ -110,6 +110,37 @@ describe('OpenCode serve adapter: create + send', () => { expect(manager.onceIdle).toHaveBeenCalledWith('ses_real_1', expect.any(Number), { cwd: '/repo' }) }) + it('attach during an in-flight send reuses the materialized state (no duplicate serve subscription)', async () => { + const idle = createDeferred() + const manager = makeFakeManager() + manager.onceIdle = vi.fn(() => idle.promise) + const adapter = makeAdapter(manager) + await adapter.create({ requestId: 'req-race', sessionType: 'freshopencode', provider: 'opencode', cwd: '/repo' }) + + // Start the send: it materializes (remember + bindServeStream subscribes once), + // emits freshAgent.session.materialized, then parks at `await idle`. + const sendPromise = adapter.send?.('freshopencode-req-race', { text: 'go' }) + // Wait until materialization is done and the send is in-flight at await idle + // (promptAsync called => past emitMaterialized, before onceIdle resolves). + await vi.waitFor(() => expect(manager.promptAsync).toHaveBeenCalledWith('ses_real_1', expect.anything(), expect.anything())) + + // Concurrently attach the real id while the send is still in-flight. attach + // MUST find the already-remembered state (existing-branch) and NOT bind a + // second serve stream. This pins concurrent attach idempotency: exactly one + // serve subscription for the real id, regardless of when attach arrives + // during the send lifecycle. + const attached = await adapter.attach?.({ + sessionId: 'ses_real_1', sessionType: 'freshopencode', provider: 'opencode', cwd: '/repo', + }) + expect(attached).toEqual({ sessionId: 'ses_real_1', sessionRef: { provider: 'opencode', sessionId: 'ses_real_1' } }) + expect(manager.subscribe).toHaveBeenCalledTimes(1) + expect(manager.subscribe).toHaveBeenCalledWith('ses_real_1', expect.any(Function)) + + // The in-flight send still completes with the correct result once idle resolves. + idle.resolve() + await expect(sendPromise).resolves.toEqual({ sessionId: 'ses_real_1', sessionRef: { provider: 'opencode', sessionId: 'ses_real_1' } }) + }) + it('continues a materialized session on later sends without re-creating it', async () => { const manager = makeFakeManager() const adapter = makeAdapter(manager) @@ -595,6 +626,35 @@ describe('OpenCode serve adapter: create + send', () => { ) }) + it('treats a session absent from the status map as idle (no malformed warning)', async () => { + loggerMocks.logger.warn.mockClear() + const manager = makeFakeManager() + // The opencode /session/status map only reports active (busy/retry) sessions; + // an idle session is absent (undefined). This must NOT be treated as malformed + // (it matches the serve manager's onceIdle semantics). + manager.getSessionStatus = vi.fn(async () => undefined) + const adapter = makeAdapter(manager) + + await adapter.attach?.({ + sessionId: 'ses_idle_absent', + sessionType: 'freshopencode', + provider: 'opencode', + cwd: '/repo/safe', + }) + + await expect(adapter.getSnapshot?.({ + threadId: 'ses_idle_absent', + sessionType: 'freshopencode', + provider: 'opencode', + cwd: '/repo/safe', + })).resolves.toMatchObject({ status: 'idle' }) + expect(manager.getSessionStatus).toHaveBeenCalledWith('ses_idle_absent', { cwd: '/repo/safe' }) + expect(loggerMocks.logger.warn).not.toHaveBeenCalledWith( + expect.objectContaining({ reason: 'malformed_session_status' }), + expect.any(String), + ) + }) + it('keeps recovered sessions idle and warns when getSessionStatus is missing', async () => { loggerMocks.logger.warn.mockClear() const manager = makeFakeManager() diff --git a/test/unit/server/fresh-agent/opencode-serve-manager.test.ts b/test/unit/server/fresh-agent/opencode-serve-manager.test.ts index 2122693c..94619f6e 100644 --- a/test/unit/server/fresh-agent/opencode-serve-manager.test.ts +++ b/test/unit/server/fresh-agent/opencode-serve-manager.test.ts @@ -32,6 +32,9 @@ function makeManager(overrides: Partial[ return jsonResponse({}) }) const manager = new OpencodeServeManager({ + // Isolate from host process.env so tests asserting the 'opencode' default + // command don't break when a developer has OPENCODE_CMD set in their shell. + env: {}, spawnFn: spawnFn as any, fetchFn: fetchFn as any, allocatePort: async () => ({ hostname: '127.0.0.1', port: 47999 }), @@ -58,6 +61,26 @@ describe('OpencodeServeManager lifecycle', () => { expect(fetchFn).toHaveBeenCalledWith('http://127.0.0.1:47999/global/health', expect.anything()) }) + it('honors the OPENCODE_CMD env var to override the serve binary (parity with CODEX_CMD/CLAUDE_CMD)', async () => { + const { manager, spawnFn } = makeManager({ env: { OPENCODE_CMD: '/custom/opencode-bin' } }) + await manager.ensureStarted() + expect(spawnFn).toHaveBeenCalledWith( + '/custom/opencode-bin', + ['serve', '--hostname', '127.0.0.1', '--port', '47999'], + expect.objectContaining({ env: expect.objectContaining({ FRESHELL_OPENCODE_SIDECAR_ID: expect.any(String) }) }), + ) + }) + + it('falls back to the default opencode command when OPENCODE_CMD is unset', async () => { + const { manager, spawnFn } = makeManager({ env: {} }) + await manager.ensureStarted() + expect(spawnFn).toHaveBeenCalledWith( + 'opencode', + ['serve', '--hostname', '127.0.0.1', '--port', '47999'], + expect.objectContaining({ env: expect.objectContaining({ FRESHELL_OPENCODE_SIDECAR_ID: expect.any(String) }) }), + ) + }) + it('routes the requested session directory without changing the serve process cwd', async () => { const calls: Array<{ url: string; init: any }> = [] const fetchFn = vi.fn(async (url: string, init: any) => { @@ -125,6 +148,7 @@ describe('OpencodeServeManager lifecycle', () => { return jsonResponse({}) }) const manager = new OpencodeServeManager({ + env: {}, spawnFn: spawnFn as any, fetchFn: fetchFn as any, allocatePort: vi.fn().mockResolvedValue({ hostname: '127.0.0.1', port: 47999 }), @@ -435,6 +459,7 @@ describe('OpencodeServeManager HTTP client', () => { return jsonResponse({}, { status: 404 }) }) const manager = new OpencodeServeManager({ + env: {}, spawnFn: spawnFn as any, fetchFn: fetchFn as any, allocatePort: vi.fn() diff --git a/test/unit/server/fresh-agent/runtime-manager.test.ts b/test/unit/server/fresh-agent/runtime-manager.test.ts index a1fed14a..4dee74cb 100644 --- a/test/unit/server/fresh-agent/runtime-manager.test.ts +++ b/test/unit/server/fresh-agent/runtime-manager.test.ts @@ -394,6 +394,40 @@ describe('FreshAgentRuntimeManager', () => { expect(opencodeAdapter.compact).toHaveBeenCalledWith('opencode-restored-1', { instructions: 'keep decisions' }) }) + it('recovers (attaches) a not-yet-registered FreshOpenCode session on subscribe instead of throwing (materialization race)', async () => { + const listener = vi.fn() + const off = vi.fn() + const opencodeAdapter = { + create: vi.fn().mockResolvedValue({ sessionId: 'freshopencode-req-1' }), + attach: vi.fn().mockResolvedValue({ sessionId: 'ses_materialized_1' }), + subscribe: vi.fn().mockReturnValue(off), + send: vi.fn().mockResolvedValue(undefined), + } + const registry = createFreshAgentProviderRegistry([{ + sessionType: 'freshopencode', + runtimeProvider: 'opencode', + adapter: opencodeAdapter as any, + }]) + const manager = new FreshAgentRuntimeManager({ registry }) + + // Simulate the materialization race: the real session id is not yet registered + // with the runtime manager (adapter.send hasn't resolved) when subscribe is + // called for the materialized real id. This must recover via attach rather + // than throwing "is not tracked" (which would leak to the client as an error). + await expect(manager.subscribe( + { sessionId: 'ses_materialized_1', sessionType: 'freshopencode', provider: 'opencode', cwd: '/repo/safe' }, + listener, + )).resolves.toBe(off) + + expect(opencodeAdapter.attach).toHaveBeenCalledWith(expect.objectContaining({ + sessionId: 'ses_materialized_1', + sessionType: 'freshopencode', + provider: 'opencode', + cwd: '/repo/safe', + })) + expect(opencodeAdapter.subscribe).toHaveBeenCalledWith('ses_materialized_1', listener) + }) + it('recovers a missing FreshOpenCode durable session with cwd before mutation', async () => { const opencodeAdapter = { create: vi.fn(),