From bd7162f288f35a2e92113ca34749785e63db8f38 Mon Sep 17 00:00:00 2001 From: dimakis Date: Sat, 30 May 2026 11:41:53 +0100 Subject: [PATCH 1/2] fix(server): state-based session routing to eliminate double-message bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the staleInMemory check (isActive boolean) with Phase 3 state-based routing using the durable getSessionState() column as the single source of truth. The old check could incorrectly kill a running query loop when EventStore and SessionRegistry diverged after navigation, causing the first message to be swallowed. Changes: - handleSwitchSession: add watch() so events reach the connection immediately; return `running` state in session_switched response - handleSendV2: reorder watch before send; replace staleInMemory with state-based routing (ACTIVE/DETACHED → route, ENDED → abort + resume) - handleInterruptV2: same Phase 3 fix as handleSendV2 - protocol-parser: dispatch SET_RUNNING on session_switched - Use stopChat() (abort signal) instead of registry.remove() for zombie sessions to prevent leaked query loops Co-Authored-By: Claude Opus 4.6 --- packages/client/src/protocol-parser.ts | 8 + server/__tests__/ws-handler-v2.test.ts | 150 ++++++++++----- server/ws-handler-v2.ts | 241 +++++++++++++++---------- 3 files changed, 254 insertions(+), 145 deletions(-) diff --git a/packages/client/src/protocol-parser.ts b/packages/client/src/protocol-parser.ts index 01c666e8..612c6909 100644 --- a/packages/client/src/protocol-parser.ts +++ b/packages/client/src/protocol-parser.ts @@ -161,6 +161,14 @@ export function parseServerMessage( if (tokens) { callbacks.onTokensHydrated?.(tokens); } + // Restore running state from the server so the client UI matches + // the actual session state on reattach. Without this, the client + // defaults to running=false after switchSession resets state, + // causing the first send to go through the normal path even when + // the session is actively generating. + if (typeof msg.running === 'boolean') { + result.messagesActions.push({ type: 'SET_RUNNING', running: msg.running }); + } break; } diff --git a/server/__tests__/ws-handler-v2.test.ts b/server/__tests__/ws-handler-v2.test.ts index 6afe9867..84a88731 100644 --- a/server/__tests__/ws-handler-v2.test.ts +++ b/server/__tests__/ws-handler-v2.test.ts @@ -86,7 +86,7 @@ function mockEventStore() { return { getEventsAfter: vi.fn().mockReturnValue([]), getSession: vi.fn().mockReturnValue(null), - getSessionState: vi.fn().mockReturnValue(null), + getSessionState: vi.fn().mockReturnValue('ACTIVE'), setSessionState: vi.fn(), }; } @@ -1706,21 +1706,22 @@ describe('handleSendV2 connection ownership', () => { }); }); -// ─── stale session — EventStore cross-reference ───────────────────────────── +// ─── state-based routing (Phase 3) ────────────────────────────────────────── -describe('handleSendV2 stale session via EventStore', () => { - it('allows send when in-memory registry is stale but EventStore says inactive', () => { +describe('handleSendV2 state-based routing', () => { + it('aborts zombie and resumes when state is ENDED but registry still has session', () => { (startChat as ReturnType).mockClear(); - (isActive as ReturnType).mockReturnValueOnce(true); + (stopChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'old-conn:sess-1', session: {} }); sessionReg.isActive.mockReturnValue(true); - sessionReg.isAttached.mockReturnValue(true); // would normally reject + sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - // EventStore ground truth: session is NOT active (query loop ended) - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: false }); + // State machine says ENDED — query loop should be dead + eventStore.getSessionState.mockReturnValue('ENDED'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -1736,17 +1737,15 @@ describe('handleSendV2 stale session via EventStore', () => { ctx, ); - // Should NOT get active_elsewhere error - expect(transport.sent).not.toContainEqual( - expect.objectContaining({ code: 'active_elsewhere' }), - ); - // Should fall through to resume path (startChat for resume) + // Should abort the zombie, not just remove + expect(stopChat).toHaveBeenCalledWith('old-conn:sess-1'); + // Should fall through to resume path expect(startChat).toHaveBeenCalled(); (isActive as ReturnType).mockReturnValue(false); }); - it('takes over even when EventStore confirms session IS active', () => { + it('takes over when state is ACTIVE and different owner', () => { (sendToChat as ReturnType).mockClear(); (reattachChat as ReturnType).mockClear(); (isActive as ReturnType).mockReturnValueOnce(true); @@ -1761,7 +1760,7 @@ describe('handleSendV2 stale session via EventStore', () => { sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: true }); + eventStore.getSessionState.mockReturnValue('ACTIVE'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -1780,30 +1779,22 @@ describe('handleSendV2 stale session via EventStore', () => { expect(oldTransport.sent).toContainEqual(expect.objectContaining({ type: 'session_takeover' })); expect(sendToChat).toHaveBeenCalled(); - expect(transport.sent).not.toContainEqual( - expect.objectContaining({ code: 'active_elsewhere' }), - ); (isActive as ReturnType).mockReturnValue(false); }); - it('does not remove session on rapid resume when EventStore shows isActive: true', () => { - // Regression test for repeated-prompt bug: when resuming a session, - // startChat should set isActive: true in EventStore immediately so that - // a second rapid send doesn't incorrectly trigger the stale removal path. + it('routes to sendToChat when state is ACTIVE and same owner', () => { (sendToChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); (isActive as ReturnType).mockReturnValueOnce(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); sessionReg.isActive.mockReturnValue(true); sessionReg.isAttached.mockReturnValue(true); - sessionReg.remove.mockClear(); const eventStore = mockEventStore(); - // EventStore correctly reflects that the session is active - // (because startChat set isActive: true on resume) - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: true }); + eventStore.getSessionState.mockReturnValue('ACTIVE'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -1819,9 +1810,8 @@ describe('handleSendV2 stale session via EventStore', () => { ctx, ); - // Should NOT remove the session from registry (stale check should pass) - expect(sessionReg.remove).not.toHaveBeenCalled(); - // Should use the active session + // Should NOT abort — state says ACTIVE + expect(stopChat).not.toHaveBeenCalled(); expect(sendToChat).toHaveBeenCalledWith( 'c1:sess-1', 'hello again', @@ -1832,20 +1822,55 @@ describe('handleSendV2 stale session via EventStore', () => { (isActive as ReturnType).mockReturnValue(false); }); + + it('reattaches own detached session when state is DETACHED', () => { + (sendToChat as ReturnType).mockClear(); + (reattachChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValueOnce(true); + + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(false); // detached + + const eventStore = mockEventStore(); + eventStore.getSessionState.mockReturnValue('DETACHED'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleSendV2( + 'c1', + transport, + { type: 'send' as const, sessionId: 'sess-1', prompt: 'back', clientMsgId: 'reattach-1' }, + ctx, + ); + + expect(reattachChat).toHaveBeenCalled(); + expect(sendToChat).toHaveBeenCalled(); + + (isActive as ReturnType).mockReturnValue(false); + }); }); -describe('handleInterruptV2 stale session via EventStore', () => { - it('resumes via startChat when EventStore says session is inactive', () => { +describe('handleInterruptV2 state-based routing', () => { + it('aborts zombie and resumes when state is ENDED', () => { (startChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); (interruptChat as ReturnType).mockClear(); - (isActive as ReturnType).mockReturnValueOnce(true); + (isActive as ReturnType).mockReturnValue(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'old-conn:sess-1', session: {} }); - sessionReg.isAttached.mockReturnValue(true); // would normally reject + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: false }); + eventStore.getSessionState.mockReturnValue('ENDED'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -1861,12 +1886,8 @@ describe('handleInterruptV2 stale session via EventStore', () => { ctx, ); - expect(transport.sent).not.toContainEqual( - expect.objectContaining({ code: 'active_elsewhere' }), - ); - // Should NOT call interruptChat with dead key + expect(stopChat).toHaveBeenCalledWith('old-conn:sess-1'); expect(interruptChat).not.toHaveBeenCalled(); - // Should resume via startChat with fresh sessionClientId expect(startChat).toHaveBeenCalledWith( transport, 'c1:sess-1', @@ -1876,6 +1897,39 @@ describe('handleInterruptV2 stale session via EventStore', () => { (isActive as ReturnType).mockReturnValue(false); }); + + it('routes to interruptChat when state is ACTIVE and same owner', () => { + (interruptChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValueOnce(true); + + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSessionState.mockReturnValue('ACTIVE'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleInterruptV2( + 'c1', + transport, + { type: 'interrupt', sessionId: 'sess-1', prompt: 'stop', clientMsgId: 'i-active' }, + ctx, + ); + + expect(stopChat).not.toHaveBeenCalled(); + expect(interruptChat).toHaveBeenCalled(); + + (isActive as ReturnType).mockReturnValue(false); + }); }); // ─── handleReconnect — ownership guard ────────────────────────────────────── @@ -2523,9 +2577,10 @@ describe('stale session cleanup removes registry entry', () => { expect(sessionReg.remove).toHaveBeenCalledWith('old-conn:sess-1'); }); - it('handleSendV2 removes stale session from registry before resume', () => { + it('handleSendV2 aborts zombie session before resume', () => { (startChat as ReturnType).mockClear(); - (isActive as ReturnType).mockReturnValueOnce(true); + (stopChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'old-conn:sess-1', session: {} }); @@ -2533,7 +2588,7 @@ describe('stale session cleanup removes registry entry', () => { sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: false }); + eventStore.getSessionState.mockReturnValue('ENDED'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -2549,15 +2604,16 @@ describe('stale session cleanup removes registry entry', () => { ctx, ); - expect(sessionReg.remove).toHaveBeenCalledWith('old-conn:sess-1'); + expect(stopChat).toHaveBeenCalledWith('old-conn:sess-1'); expect(startChat).toHaveBeenCalled(); (isActive as ReturnType).mockReturnValue(false); }); - it('handleInterruptV2 removes stale session from registry before resume', () => { + it('handleInterruptV2 aborts zombie session before resume', () => { (startChat as ReturnType).mockClear(); - (isActive as ReturnType).mockReturnValueOnce(true); + (stopChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'old-conn:sess-1', session: {} }); @@ -2565,7 +2621,7 @@ describe('stale session cleanup removes registry entry', () => { sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: false }); + eventStore.getSessionState.mockReturnValue('ENDED'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -2581,7 +2637,7 @@ describe('stale session cleanup removes registry entry', () => { ctx, ); - expect(sessionReg.remove).toHaveBeenCalledWith('old-conn:sess-1'); + expect(stopChat).toHaveBeenCalledWith('old-conn:sess-1'); expect(startChat).toHaveBeenCalled(); (isActive as ReturnType).mockReturnValue(false); diff --git a/server/ws-handler-v2.ts b/server/ws-handler-v2.ts index 8906c71d..df27ee00 100644 --- a/server/ws-handler-v2.ts +++ b/server/ws-handler-v2.ts @@ -361,8 +361,21 @@ export async function handleSwitchSession( if (prev && prev !== msg.sessionId) { ctx.connRegistry.unwatch(connectionId, prev); } + // Watch the session immediately so events from a running query loop + // reach this client before the first send. Without this, the client + // sits in a blind spot between switch_session and the first send — + // any events emitted by the query loop during that window are lost. + ctx.connRegistry.watch(connectionId, msg.sessionId); ctx.connRegistry.setActive(connectionId, msg.sessionId); + // Determine running state so the client can restore its UI correctly. + // Without this, the client shows an idle input after switching to a + // session whose query loop is still generating — leading to the + // "double message" bug where the first send queues behind a stale + // running=false state. + const found = ctx.sessionRegistry.findBySessionId(msg.sessionId); + const running = found ? ctx.sessionRegistry.isActive(found.clientId) : false; + ctx.connRegistry.get(connectionId)?.transport.send({ type: 'session_switched', sessionId: msg.sessionId, @@ -370,6 +383,7 @@ export async function handleSwitchSession( cwd: sessionMeta.cwd, branch: sessionMeta.branch, wtId: sessionMeta.wtId, + running, tokens: { input: sessionMeta.inputTokens, output: sessionMeta.outputTokens, @@ -382,7 +396,6 @@ export async function handleSwitchSession( // Re-send boot_context so pills appear on session switch. // Hot path: running session in SessionRegistry (in-memory cache). // Cold path: ended session — read serialized JSON from EventStore. - const found = ctx.sessionRegistry.findBySessionId(msg.sessionId); if (found?.session?.bootContext) { ctx.connRegistry.get(connectionId)?.transport.send({ type: 'boot_context', @@ -400,7 +413,7 @@ export async function handleSwitchSession( } } - log.info('switch_session', { connectionId, sessionId: msg.sessionId }); + log.info('switch_session', { connectionId, sessionId: msg.sessionId, running }); }, ); } @@ -484,62 +497,83 @@ export function handleSendV2( span.setAttribute('session.state_mismatch', mismatch.details ?? 'unknown'); } - if (found && isActive(found.clientId)) { - const storeMeta = ctx.eventStore.getSession(sessionId); - const staleInMemory = storeMeta && !storeMeta.isActive; + // --- State-based routing (Phase 3) --- + // + // Use the durable state column as the single source of truth + // for routing decisions. This replaces the old staleInMemory + // check that used the isActive boolean — that check could + // incorrectly kill a running query loop when the EventStore + // and SessionRegistry diverged, causing the "double message" + // bug on reattach. + + // Case 1: Registry has the session AND state says it should + // be running (ACTIVE, DETACHED, SUSPENDED, STARTING, CREATED). + // Route the message to the existing query loop. + if ( + found && + isActive(found.clientId) && + storeState !== 'ENDED' && + storeState !== null + ) { + const ownerConnection = getOwnerConnection(found.clientId); + const isOwner = ownerConnection === connectionId; + const isDetached = !ctx.sessionRegistry.isAttached(found.clientId); + + let activeClientId = found.clientId; + if (!isOwner) { + const oldTransport = found.session?.transport; + if (oldTransport?.isOpen()) { + oldTransport.send({ type: 'session_takeover', sessionId }); + } + ctx.connRegistry.unwatch(ownerConnection, sessionId); + denyPendingBySession(sessionId); - if (staleInMemory) { - log.info('removing stale session from registry (send)', { + reattachChat(found.clientId, transport); + const newClientId = `${connectionId}:${sessionId}`; + if (found.clientId !== newClientId) { + rekeyChat(found.clientId, newClientId); + activeClientId = newClientId; + } + log.info('takeover on send', { + connectionId, + sessionId, + oldOwner: ownerConnection, + newClientId: activeClientId, + storeState, + }); + } else if (isDetached) { + reattachChat(found.clientId, transport); + log.info('reattached own detached session on send', { connectionId, sessionId, - clientId: found.clientId, storeState, }); - ctx.sessionRegistry.remove(found.clientId); - } else { - const ownerConnection = getOwnerConnection(found.clientId); - const isOwner = ownerConnection === connectionId; - const isDetached = !ctx.sessionRegistry.isAttached(found.clientId); - - let activeClientId = found.clientId; - if (!isOwner) { - const oldTransport = found.session?.transport; - if (oldTransport?.isOpen()) { - oldTransport.send({ type: 'session_takeover', sessionId }); - } - ctx.connRegistry.unwatch(ownerConnection, sessionId); - denyPendingBySession(sessionId); - - reattachChat(found.clientId, transport); - const newClientId = `${connectionId}:${sessionId}`; - if (found.clientId !== newClientId) { - rekeyChat(found.clientId, newClientId); - activeClientId = newClientId; - } - log.info('takeover on send', { - connectionId, - sessionId, - oldOwner: ownerConnection, - newClientId: activeClientId, - storeState, - }); - } else if (isDetached) { - reattachChat(found.clientId, transport); - log.info('reattached own detached session on send', { - connectionId, - sessionId, - storeState, - }); - } - applySkillPolicy(activeClientId); - sendToChat(activeClientId, prompt, msg.images, msg.contextBlocks, msg.clientMsgId); - ctx.connRegistry.watch(connectionId, sessionId); - ctx.connRegistry.setActive(connectionId, sessionId); - span.setAttribute('routing.decision', isOwner ? 'active' : 'takeover'); - return; } + applySkillPolicy(activeClientId); + // Watch BEFORE sending so any events emitted by the query + // loop reach this connection. The resume path (below) already + // does this correctly — match the ordering here. + ctx.connRegistry.watch(connectionId, sessionId); + ctx.connRegistry.setActive(connectionId, sessionId); + sendToChat(activeClientId, prompt, msg.images, msg.contextBlocks, msg.clientMsgId); + span.setAttribute('routing.decision', isOwner ? 'active' : 'takeover'); + return; + } + + // Case 2: Registry has the session but state says ENDED (or + // is missing). The query loop is a zombie — abort it cleanly + // so it doesn't leak, then fall through to resume. + if (found && isActive(found.clientId)) { + log.info('aborting zombie session before resume', { + connectionId, + sessionId, + clientId: found.clientId, + storeState, + }); + stopChat(found.clientId); } + // Case 3: No running query loop — resume from history. const sessionClientId = `${connectionId}:${sessionId}`; ctx.connRegistry.watch(connectionId, sessionId); ctx.connRegistry.setActive(connectionId, sessionId); @@ -631,61 +665,72 @@ export function handleInterruptV2( }); } - if (isActive(found.clientId)) { - const storeMeta = ctx.eventStore.getSession(msg.sessionId); - const staleInMemory = storeMeta && !storeMeta.isActive; + // --- State-based routing (Phase 3) --- + // + // Same logic as handleSendV2: use the durable state column as + // the single source of truth for routing decisions. + + // Case 1: Registry has the session AND state says it should + // be running. Route the interrupt to the existing query loop. + if ( + found && + isActive(found.clientId) && + storeState !== 'ENDED' && + storeState !== null + ) { + const ownerConnection = getOwnerConnection(found.clientId); + const isOwner = ownerConnection === connectionId; + const isDetached = !ctx.sessionRegistry.isAttached(found.clientId); - if (staleInMemory) { - log.info('removing stale session from registry (interrupt)', { + if (!isOwner) { + const oldTransport = found.session?.transport; + if (oldTransport?.isOpen()) { + oldTransport.send({ type: 'session_takeover', sessionId: msg.sessionId }); + } + ctx.connRegistry.unwatch(ownerConnection, msg.sessionId); + denyPendingBySession(msg.sessionId); + + reattachChat(found.clientId, transport); + const newClientId = `${connectionId}:${msg.sessionId}`; + if (found.clientId !== newClientId) { + rekeyChat(found.clientId, newClientId); + activeClientId = newClientId; + } + log.info('takeover on interrupt', { connectionId, sessionId: msg.sessionId, - clientId: found.clientId, + oldOwner: ownerConnection, + newClientId: activeClientId, storeState, }); - ctx.sessionRegistry.remove(found.clientId); - } else { - const ownerConnection = getOwnerConnection(found.clientId); - const isOwner = ownerConnection === connectionId; - const isDetached = !ctx.sessionRegistry.isAttached(found.clientId); + } else if (isDetached) { + reattachChat(found.clientId, transport); + } - if (!isOwner) { - const oldTransport = found.session?.transport; - if (oldTransport?.isOpen()) { - oldTransport.send({ type: 'session_takeover', sessionId: msg.sessionId }); - } - ctx.connRegistry.unwatch(ownerConnection, msg.sessionId); - denyPendingBySession(msg.sessionId); - - reattachChat(found.clientId, transport); - const newClientId = `${connectionId}:${msg.sessionId}`; - if (found.clientId !== newClientId) { - rekeyChat(found.clientId, newClientId); - activeClientId = newClientId; - } - log.info('takeover on interrupt', { - connectionId, - sessionId: msg.sessionId, - oldOwner: ownerConnection, - newClientId: activeClientId, - storeState, - }); - } else if (isDetached) { - reattachChat(found.clientId, transport); - } + ctx.connRegistry.watch(connectionId, msg.sessionId); + ctx.connRegistry.setActive(connectionId, msg.sessionId); + interruptChat( + activeClientId, + msg.prompt, + msg.images, + msg.contextBlocks, + msg.clientMsgId, + msg.model, + ); + log.info('interrupt', { connectionId, sessionId: msg.sessionId }); + return; + } - ctx.connRegistry.watch(connectionId, msg.sessionId); - ctx.connRegistry.setActive(connectionId, msg.sessionId); - interruptChat( - activeClientId, - msg.prompt, - msg.images, - msg.contextBlocks, - msg.clientMsgId, - msg.model, - ); - log.info('interrupt', { connectionId, sessionId: msg.sessionId }); - return; - } + // Case 2: Registry has the session but state says ENDED (or + // is missing). The query loop is a zombie — abort it cleanly. + if (found && isActive(found.clientId)) { + log.info('aborting zombie session before resume (interrupt)', { + connectionId, + sessionId: msg.sessionId, + clientId: found.clientId, + storeState, + }); + stopChat(found.clientId); } const sessionClientId = `${connectionId}:${msg.sessionId}`; From b2f449545beeced84a54d0e62c75987b4d6d597b Mon Sep 17 00:00:00 2001 From: dimakis Date: Sat, 30 May 2026 12:14:39 +0100 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20address=20Centaur=20review=20?= =?UTF-8?q?=E2=80=94=20CLOSING=20state,=20running=20cross-ref,=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Exclude CLOSING from active routing (treats as zombie alongside ENDED) - Cross-reference eventStore.getSessionState() in handleSwitchSession running detection to avoid reporting zombie as running - Trim verbose per-case comments to one-liners - Add tests: watch() in handleSwitchSession, running field (true/false), CLOSING state routing for both send and interrupt handlers Co-Authored-By: Claude Opus 4.6 --- server/__tests__/ws-handler-v2.test.ts | 163 +++++++++++++++++++++++++ server/ws-handler-v2.ts | 48 +++----- 2 files changed, 177 insertions(+), 34 deletions(-) diff --git a/server/__tests__/ws-handler-v2.test.ts b/server/__tests__/ws-handler-v2.test.ts index 84a88731..347eb964 100644 --- a/server/__tests__/ws-handler-v2.test.ts +++ b/server/__tests__/ws-handler-v2.test.ts @@ -462,6 +462,99 @@ describe('handleSwitchSession', () => { }), ); }); + + it('calls watch() before sending session_switched', async () => { + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + sessionId: 'sess-1', + mode: 'agent', + cwd: '/test', + branch: 'main', + wtId: null, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + }); + + const ctx = createContext({ + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + await handleSwitchSession('c1', { type: 'switch_session', sessionId: 'sess-1' }, ctx); + + // watch() should have been called + expect(ctx.connRegistry.hasOpenWatchers('sess-1')).toBe(true); + }); + + it('returns running: true when session has active query loop', async () => { + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + sessionId: 'sess-1', + mode: 'agent', + cwd: '/test', + branch: 'main', + wtId: null, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + }); + eventStore.getSessionState.mockReturnValue('ACTIVE'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + await handleSwitchSession('c1', { type: 'switch_session', sessionId: 'sess-1' }, ctx); + + const resp = transport.sent[0]; + expect(resp).toHaveProperty('running', true); + }); + + it('returns running: false when store state is ENDED (zombie)', async () => { + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + sessionId: 'sess-1', + mode: 'agent', + cwd: '/test', + branch: 'main', + wtId: null, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + }); + eventStore.getSessionState.mockReturnValue('ENDED'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + await handleSwitchSession('c1', { type: 'switch_session', sessionId: 'sess-1' }, ctx); + + const resp = transport.sent[0]; + expect(resp).toHaveProperty('running', false); + }); }); // ─── handleSetModeV2 ───────────────────────────────────────────────────────── @@ -1855,6 +1948,41 @@ describe('handleSendV2 state-based routing', () => { (isActive as ReturnType).mockReturnValue(false); }); + + it('treats CLOSING as zombie and resumes', () => { + (startChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); + (sendToChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); + + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSessionState.mockReturnValue('CLOSING'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleSendV2( + 'c1', + transport, + { type: 'send' as const, sessionId: 'sess-1', prompt: 'hi', clientMsgId: 'closing-1' }, + ctx, + ); + + expect(sendToChat).not.toHaveBeenCalled(); + expect(stopChat).toHaveBeenCalledWith('c1:sess-1'); + expect(startChat).toHaveBeenCalled(); + + (isActive as ReturnType).mockReturnValue(false); + }); }); describe('handleInterruptV2 state-based routing', () => { @@ -1930,6 +2058,41 @@ describe('handleInterruptV2 state-based routing', () => { (isActive as ReturnType).mockReturnValue(false); }); + + it('treats CLOSING as zombie and resumes', () => { + (startChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); + (interruptChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); + + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSessionState.mockReturnValue('CLOSING'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleInterruptV2( + 'c1', + transport, + { type: 'interrupt', sessionId: 'sess-1', prompt: 'stop', clientMsgId: 'closing-i' }, + ctx, + ); + + expect(interruptChat).not.toHaveBeenCalled(); + expect(stopChat).toHaveBeenCalledWith('c1:sess-1'); + expect(startChat).toHaveBeenCalled(); + + (isActive as ReturnType).mockReturnValue(false); + }); }); // ─── handleReconnect — ownership guard ────────────────────────────────────── diff --git a/server/ws-handler-v2.ts b/server/ws-handler-v2.ts index df27ee00..286d95ee 100644 --- a/server/ws-handler-v2.ts +++ b/server/ws-handler-v2.ts @@ -368,13 +368,14 @@ export async function handleSwitchSession( ctx.connRegistry.watch(connectionId, msg.sessionId); ctx.connRegistry.setActive(connectionId, msg.sessionId); - // Determine running state so the client can restore its UI correctly. - // Without this, the client shows an idle input after switching to a - // session whose query loop is still generating — leading to the - // "double message" bug where the first send queues behind a stale - // running=false state. + // Cross-reference registry with durable state to avoid reporting + // a zombie query loop as running. const found = ctx.sessionRegistry.findBySessionId(msg.sessionId); - const running = found ? ctx.sessionRegistry.isActive(found.clientId) : false; + const storeState = ctx.eventStore.getSessionState(msg.sessionId); + const running = + found && storeState !== 'ENDED' && storeState !== 'CLOSING' && storeState !== null + ? ctx.sessionRegistry.isActive(found.clientId) + : false; ctx.connRegistry.get(connectionId)?.transport.send({ type: 'session_switched', @@ -497,22 +498,12 @@ export function handleSendV2( span.setAttribute('session.state_mismatch', mismatch.details ?? 'unknown'); } - // --- State-based routing (Phase 3) --- - // - // Use the durable state column as the single source of truth - // for routing decisions. This replaces the old staleInMemory - // check that used the isActive boolean — that check could - // incorrectly kill a running query loop when the EventStore - // and SessionRegistry diverged, causing the "double message" - // bug on reattach. - - // Case 1: Registry has the session AND state says it should - // be running (ACTIVE, DETACHED, SUSPENDED, STARTING, CREATED). - // Route the message to the existing query loop. + // State-based routing (Phase 3): durable state is the single source of truth. if ( found && isActive(found.clientId) && storeState !== 'ENDED' && + storeState !== 'CLOSING' && storeState !== null ) { const ownerConnection = getOwnerConnection(found.clientId); @@ -550,9 +541,6 @@ export function handleSendV2( }); } applySkillPolicy(activeClientId); - // Watch BEFORE sending so any events emitted by the query - // loop reach this connection. The resume path (below) already - // does this correctly — match the ordering here. ctx.connRegistry.watch(connectionId, sessionId); ctx.connRegistry.setActive(connectionId, sessionId); sendToChat(activeClientId, prompt, msg.images, msg.contextBlocks, msg.clientMsgId); @@ -560,9 +548,7 @@ export function handleSendV2( return; } - // Case 2: Registry has the session but state says ENDED (or - // is missing). The query loop is a zombie — abort it cleanly - // so it doesn't leak, then fall through to resume. + // Zombie — abort before resume. if (found && isActive(found.clientId)) { log.info('aborting zombie session before resume', { connectionId, @@ -573,7 +559,7 @@ export function handleSendV2( stopChat(found.clientId); } - // Case 3: No running query loop — resume from history. + // Resume from history. const sessionClientId = `${connectionId}:${sessionId}`; ctx.connRegistry.watch(connectionId, sessionId); ctx.connRegistry.setActive(connectionId, sessionId); @@ -665,17 +651,12 @@ export function handleInterruptV2( }); } - // --- State-based routing (Phase 3) --- - // - // Same logic as handleSendV2: use the durable state column as - // the single source of truth for routing decisions. - - // Case 1: Registry has the session AND state says it should - // be running. Route the interrupt to the existing query loop. + // State-based routing (Phase 3): durable state is the single source of truth. if ( found && isActive(found.clientId) && storeState !== 'ENDED' && + storeState !== 'CLOSING' && storeState !== null ) { const ownerConnection = getOwnerConnection(found.clientId); @@ -721,8 +702,7 @@ export function handleInterruptV2( return; } - // Case 2: Registry has the session but state says ENDED (or - // is missing). The query loop is a zombie — abort it cleanly. + // Zombie — abort before resume. if (found && isActive(found.clientId)) { log.info('aborting zombie session before resume (interrupt)', { connectionId,