diff --git a/packages/client/src/protocol-parser.ts b/packages/client/src/protocol-parser.ts index 01c666e8..612c6909 100644 --- a/packages/client/src/protocol-parser.ts +++ b/packages/client/src/protocol-parser.ts @@ -161,6 +161,14 @@ export function parseServerMessage( if (tokens) { callbacks.onTokensHydrated?.(tokens); } + // Restore running state from the server so the client UI matches + // the actual session state on reattach. Without this, the client + // defaults to running=false after switchSession resets state, + // causing the first send to go through the normal path even when + // the session is actively generating. + if (typeof msg.running === 'boolean') { + result.messagesActions.push({ type: 'SET_RUNNING', running: msg.running }); + } break; } diff --git a/server/__tests__/ws-handler-v2.test.ts b/server/__tests__/ws-handler-v2.test.ts index 6afe9867..347eb964 100644 --- a/server/__tests__/ws-handler-v2.test.ts +++ b/server/__tests__/ws-handler-v2.test.ts @@ -86,7 +86,7 @@ function mockEventStore() { return { getEventsAfter: vi.fn().mockReturnValue([]), getSession: vi.fn().mockReturnValue(null), - getSessionState: vi.fn().mockReturnValue(null), + getSessionState: vi.fn().mockReturnValue('ACTIVE'), setSessionState: vi.fn(), }; } @@ -462,6 +462,99 @@ describe('handleSwitchSession', () => { }), ); }); + + it('calls watch() before sending session_switched', async () => { + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + sessionId: 'sess-1', + mode: 'agent', + cwd: '/test', + branch: 'main', + wtId: null, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + }); + + const ctx = createContext({ + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + await handleSwitchSession('c1', { type: 'switch_session', sessionId: 'sess-1' }, ctx); + + // watch() should have been called + expect(ctx.connRegistry.hasOpenWatchers('sess-1')).toBe(true); + }); + + it('returns running: true when session has active query loop', async () => { + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + sessionId: 'sess-1', + mode: 'agent', + cwd: '/test', + branch: 'main', + wtId: null, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + }); + eventStore.getSessionState.mockReturnValue('ACTIVE'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + await handleSwitchSession('c1', { type: 'switch_session', sessionId: 'sess-1' }, ctx); + + const resp = transport.sent[0]; + expect(resp).toHaveProperty('running', true); + }); + + it('returns running: false when store state is ENDED (zombie)', async () => { + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSession.mockReturnValue({ + sessionId: 'sess-1', + mode: 'agent', + cwd: '/test', + branch: 'main', + wtId: null, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheCreationTokens: 0, + totalCostUsd: 0, + }); + eventStore.getSessionState.mockReturnValue('ENDED'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + await handleSwitchSession('c1', { type: 'switch_session', sessionId: 'sess-1' }, ctx); + + const resp = transport.sent[0]; + expect(resp).toHaveProperty('running', false); + }); }); // ─── handleSetModeV2 ───────────────────────────────────────────────────────── @@ -1706,21 +1799,22 @@ describe('handleSendV2 connection ownership', () => { }); }); -// ─── stale session — EventStore cross-reference ───────────────────────────── +// ─── state-based routing (Phase 3) ────────────────────────────────────────── -describe('handleSendV2 stale session via EventStore', () => { - it('allows send when in-memory registry is stale but EventStore says inactive', () => { +describe('handleSendV2 state-based routing', () => { + it('aborts zombie and resumes when state is ENDED but registry still has session', () => { (startChat as ReturnType).mockClear(); - (isActive as ReturnType).mockReturnValueOnce(true); + (stopChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'old-conn:sess-1', session: {} }); sessionReg.isActive.mockReturnValue(true); - sessionReg.isAttached.mockReturnValue(true); // would normally reject + sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - // EventStore ground truth: session is NOT active (query loop ended) - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: false }); + // State machine says ENDED — query loop should be dead + eventStore.getSessionState.mockReturnValue('ENDED'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -1736,17 +1830,15 @@ describe('handleSendV2 stale session via EventStore', () => { ctx, ); - // Should NOT get active_elsewhere error - expect(transport.sent).not.toContainEqual( - expect.objectContaining({ code: 'active_elsewhere' }), - ); - // Should fall through to resume path (startChat for resume) + // Should abort the zombie, not just remove + expect(stopChat).toHaveBeenCalledWith('old-conn:sess-1'); + // Should fall through to resume path expect(startChat).toHaveBeenCalled(); (isActive as ReturnType).mockReturnValue(false); }); - it('takes over even when EventStore confirms session IS active', () => { + it('takes over when state is ACTIVE and different owner', () => { (sendToChat as ReturnType).mockClear(); (reattachChat as ReturnType).mockClear(); (isActive as ReturnType).mockReturnValueOnce(true); @@ -1761,7 +1853,7 @@ describe('handleSendV2 stale session via EventStore', () => { sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: true }); + eventStore.getSessionState.mockReturnValue('ACTIVE'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -1780,30 +1872,22 @@ describe('handleSendV2 stale session via EventStore', () => { expect(oldTransport.sent).toContainEqual(expect.objectContaining({ type: 'session_takeover' })); expect(sendToChat).toHaveBeenCalled(); - expect(transport.sent).not.toContainEqual( - expect.objectContaining({ code: 'active_elsewhere' }), - ); (isActive as ReturnType).mockReturnValue(false); }); - it('does not remove session on rapid resume when EventStore shows isActive: true', () => { - // Regression test for repeated-prompt bug: when resuming a session, - // startChat should set isActive: true in EventStore immediately so that - // a second rapid send doesn't incorrectly trigger the stale removal path. + it('routes to sendToChat when state is ACTIVE and same owner', () => { (sendToChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); (isActive as ReturnType).mockReturnValueOnce(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); sessionReg.isActive.mockReturnValue(true); sessionReg.isAttached.mockReturnValue(true); - sessionReg.remove.mockClear(); const eventStore = mockEventStore(); - // EventStore correctly reflects that the session is active - // (because startChat set isActive: true on resume) - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: true }); + eventStore.getSessionState.mockReturnValue('ACTIVE'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -1819,9 +1903,8 @@ describe('handleSendV2 stale session via EventStore', () => { ctx, ); - // Should NOT remove the session from registry (stale check should pass) - expect(sessionReg.remove).not.toHaveBeenCalled(); - // Should use the active session + // Should NOT abort — state says ACTIVE + expect(stopChat).not.toHaveBeenCalled(); expect(sendToChat).toHaveBeenCalledWith( 'c1:sess-1', 'hello again', @@ -1832,20 +1915,90 @@ describe('handleSendV2 stale session via EventStore', () => { (isActive as ReturnType).mockReturnValue(false); }); + + it('reattaches own detached session when state is DETACHED', () => { + (sendToChat as ReturnType).mockClear(); + (reattachChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValueOnce(true); + + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(false); // detached + + const eventStore = mockEventStore(); + eventStore.getSessionState.mockReturnValue('DETACHED'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleSendV2( + 'c1', + transport, + { type: 'send' as const, sessionId: 'sess-1', prompt: 'back', clientMsgId: 'reattach-1' }, + ctx, + ); + + expect(reattachChat).toHaveBeenCalled(); + expect(sendToChat).toHaveBeenCalled(); + + (isActive as ReturnType).mockReturnValue(false); + }); + + it('treats CLOSING as zombie and resumes', () => { + (startChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); + (sendToChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); + + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSessionState.mockReturnValue('CLOSING'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleSendV2( + 'c1', + transport, + { type: 'send' as const, sessionId: 'sess-1', prompt: 'hi', clientMsgId: 'closing-1' }, + ctx, + ); + + expect(sendToChat).not.toHaveBeenCalled(); + expect(stopChat).toHaveBeenCalledWith('c1:sess-1'); + expect(startChat).toHaveBeenCalled(); + + (isActive as ReturnType).mockReturnValue(false); + }); }); -describe('handleInterruptV2 stale session via EventStore', () => { - it('resumes via startChat when EventStore says session is inactive', () => { +describe('handleInterruptV2 state-based routing', () => { + it('aborts zombie and resumes when state is ENDED', () => { (startChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); (interruptChat as ReturnType).mockClear(); - (isActive as ReturnType).mockReturnValueOnce(true); + (isActive as ReturnType).mockReturnValue(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'old-conn:sess-1', session: {} }); - sessionReg.isAttached.mockReturnValue(true); // would normally reject + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: false }); + eventStore.getSessionState.mockReturnValue('ENDED'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -1861,12 +2014,8 @@ describe('handleInterruptV2 stale session via EventStore', () => { ctx, ); - expect(transport.sent).not.toContainEqual( - expect.objectContaining({ code: 'active_elsewhere' }), - ); - // Should NOT call interruptChat with dead key + expect(stopChat).toHaveBeenCalledWith('old-conn:sess-1'); expect(interruptChat).not.toHaveBeenCalled(); - // Should resume via startChat with fresh sessionClientId expect(startChat).toHaveBeenCalledWith( transport, 'c1:sess-1', @@ -1876,6 +2025,74 @@ describe('handleInterruptV2 stale session via EventStore', () => { (isActive as ReturnType).mockReturnValue(false); }); + + it('routes to interruptChat when state is ACTIVE and same owner', () => { + (interruptChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValueOnce(true); + + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSessionState.mockReturnValue('ACTIVE'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleInterruptV2( + 'c1', + transport, + { type: 'interrupt', sessionId: 'sess-1', prompt: 'stop', clientMsgId: 'i-active' }, + ctx, + ); + + expect(stopChat).not.toHaveBeenCalled(); + expect(interruptChat).toHaveBeenCalled(); + + (isActive as ReturnType).mockReturnValue(false); + }); + + it('treats CLOSING as zombie and resumes', () => { + (startChat as ReturnType).mockClear(); + (stopChat as ReturnType).mockClear(); + (interruptChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); + + const sessionReg = mockSessionRegistry(); + sessionReg.findBySessionId.mockReturnValue({ clientId: 'c1:sess-1', session: {} }); + sessionReg.isActive.mockReturnValue(true); + sessionReg.isAttached.mockReturnValue(true); + + const eventStore = mockEventStore(); + eventStore.getSessionState.mockReturnValue('CLOSING'); + + const ctx = createContext({ + sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], + eventStore: eventStore as unknown as V2HandlerContext['eventStore'], + }); + const transport = mockTransport(); + ctx.connRegistry.register('c1', transport); + + handleInterruptV2( + 'c1', + transport, + { type: 'interrupt', sessionId: 'sess-1', prompt: 'stop', clientMsgId: 'closing-i' }, + ctx, + ); + + expect(interruptChat).not.toHaveBeenCalled(); + expect(stopChat).toHaveBeenCalledWith('c1:sess-1'); + expect(startChat).toHaveBeenCalled(); + + (isActive as ReturnType).mockReturnValue(false); + }); }); // ─── handleReconnect — ownership guard ────────────────────────────────────── @@ -2523,9 +2740,10 @@ describe('stale session cleanup removes registry entry', () => { expect(sessionReg.remove).toHaveBeenCalledWith('old-conn:sess-1'); }); - it('handleSendV2 removes stale session from registry before resume', () => { + it('handleSendV2 aborts zombie session before resume', () => { (startChat as ReturnType).mockClear(); - (isActive as ReturnType).mockReturnValueOnce(true); + (stopChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'old-conn:sess-1', session: {} }); @@ -2533,7 +2751,7 @@ describe('stale session cleanup removes registry entry', () => { sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: false }); + eventStore.getSessionState.mockReturnValue('ENDED'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -2549,15 +2767,16 @@ describe('stale session cleanup removes registry entry', () => { ctx, ); - expect(sessionReg.remove).toHaveBeenCalledWith('old-conn:sess-1'); + expect(stopChat).toHaveBeenCalledWith('old-conn:sess-1'); expect(startChat).toHaveBeenCalled(); (isActive as ReturnType).mockReturnValue(false); }); - it('handleInterruptV2 removes stale session from registry before resume', () => { + it('handleInterruptV2 aborts zombie session before resume', () => { (startChat as ReturnType).mockClear(); - (isActive as ReturnType).mockReturnValueOnce(true); + (stopChat as ReturnType).mockClear(); + (isActive as ReturnType).mockReturnValue(true); const sessionReg = mockSessionRegistry(); sessionReg.findBySessionId.mockReturnValue({ clientId: 'old-conn:sess-1', session: {} }); @@ -2565,7 +2784,7 @@ describe('stale session cleanup removes registry entry', () => { sessionReg.isAttached.mockReturnValue(true); const eventStore = mockEventStore(); - eventStore.getSession.mockReturnValue({ sessionId: 'sess-1', isActive: false }); + eventStore.getSessionState.mockReturnValue('ENDED'); const ctx = createContext({ sessionRegistry: sessionReg as unknown as V2HandlerContext['sessionRegistry'], @@ -2581,7 +2800,7 @@ describe('stale session cleanup removes registry entry', () => { ctx, ); - expect(sessionReg.remove).toHaveBeenCalledWith('old-conn:sess-1'); + expect(stopChat).toHaveBeenCalledWith('old-conn:sess-1'); expect(startChat).toHaveBeenCalled(); (isActive as ReturnType).mockReturnValue(false); diff --git a/server/ws-handler-v2.ts b/server/ws-handler-v2.ts index 8906c71d..286d95ee 100644 --- a/server/ws-handler-v2.ts +++ b/server/ws-handler-v2.ts @@ -361,8 +361,22 @@ export async function handleSwitchSession( if (prev && prev !== msg.sessionId) { ctx.connRegistry.unwatch(connectionId, prev); } + // Watch the session immediately so events from a running query loop + // reach this client before the first send. Without this, the client + // sits in a blind spot between switch_session and the first send — + // any events emitted by the query loop during that window are lost. + ctx.connRegistry.watch(connectionId, msg.sessionId); ctx.connRegistry.setActive(connectionId, msg.sessionId); + // Cross-reference registry with durable state to avoid reporting + // a zombie query loop as running. + const found = ctx.sessionRegistry.findBySessionId(msg.sessionId); + const storeState = ctx.eventStore.getSessionState(msg.sessionId); + const running = + found && storeState !== 'ENDED' && storeState !== 'CLOSING' && storeState !== null + ? ctx.sessionRegistry.isActive(found.clientId) + : false; + ctx.connRegistry.get(connectionId)?.transport.send({ type: 'session_switched', sessionId: msg.sessionId, @@ -370,6 +384,7 @@ export async function handleSwitchSession( cwd: sessionMeta.cwd, branch: sessionMeta.branch, wtId: sessionMeta.wtId, + running, tokens: { input: sessionMeta.inputTokens, output: sessionMeta.outputTokens, @@ -382,7 +397,6 @@ export async function handleSwitchSession( // Re-send boot_context so pills appear on session switch. // Hot path: running session in SessionRegistry (in-memory cache). // Cold path: ended session — read serialized JSON from EventStore. - const found = ctx.sessionRegistry.findBySessionId(msg.sessionId); if (found?.session?.bootContext) { ctx.connRegistry.get(connectionId)?.transport.send({ type: 'boot_context', @@ -400,7 +414,7 @@ export async function handleSwitchSession( } } - log.info('switch_session', { connectionId, sessionId: msg.sessionId }); + log.info('switch_session', { connectionId, sessionId: msg.sessionId, running }); }, ); } @@ -484,62 +498,68 @@ export function handleSendV2( span.setAttribute('session.state_mismatch', mismatch.details ?? 'unknown'); } - if (found && isActive(found.clientId)) { - const storeMeta = ctx.eventStore.getSession(sessionId); - const staleInMemory = storeMeta && !storeMeta.isActive; + // State-based routing (Phase 3): durable state is the single source of truth. + if ( + found && + isActive(found.clientId) && + storeState !== 'ENDED' && + storeState !== 'CLOSING' && + storeState !== null + ) { + const ownerConnection = getOwnerConnection(found.clientId); + const isOwner = ownerConnection === connectionId; + const isDetached = !ctx.sessionRegistry.isAttached(found.clientId); + + let activeClientId = found.clientId; + if (!isOwner) { + const oldTransport = found.session?.transport; + if (oldTransport?.isOpen()) { + oldTransport.send({ type: 'session_takeover', sessionId }); + } + ctx.connRegistry.unwatch(ownerConnection, sessionId); + denyPendingBySession(sessionId); - if (staleInMemory) { - log.info('removing stale session from registry (send)', { + reattachChat(found.clientId, transport); + const newClientId = `${connectionId}:${sessionId}`; + if (found.clientId !== newClientId) { + rekeyChat(found.clientId, newClientId); + activeClientId = newClientId; + } + log.info('takeover on send', { + connectionId, + sessionId, + oldOwner: ownerConnection, + newClientId: activeClientId, + storeState, + }); + } else if (isDetached) { + reattachChat(found.clientId, transport); + log.info('reattached own detached session on send', { connectionId, sessionId, - clientId: found.clientId, storeState, }); - ctx.sessionRegistry.remove(found.clientId); - } else { - const ownerConnection = getOwnerConnection(found.clientId); - const isOwner = ownerConnection === connectionId; - const isDetached = !ctx.sessionRegistry.isAttached(found.clientId); - - let activeClientId = found.clientId; - if (!isOwner) { - const oldTransport = found.session?.transport; - if (oldTransport?.isOpen()) { - oldTransport.send({ type: 'session_takeover', sessionId }); - } - ctx.connRegistry.unwatch(ownerConnection, sessionId); - denyPendingBySession(sessionId); - - reattachChat(found.clientId, transport); - const newClientId = `${connectionId}:${sessionId}`; - if (found.clientId !== newClientId) { - rekeyChat(found.clientId, newClientId); - activeClientId = newClientId; - } - log.info('takeover on send', { - connectionId, - sessionId, - oldOwner: ownerConnection, - newClientId: activeClientId, - storeState, - }); - } else if (isDetached) { - reattachChat(found.clientId, transport); - log.info('reattached own detached session on send', { - connectionId, - sessionId, - storeState, - }); - } - applySkillPolicy(activeClientId); - sendToChat(activeClientId, prompt, msg.images, msg.contextBlocks, msg.clientMsgId); - ctx.connRegistry.watch(connectionId, sessionId); - ctx.connRegistry.setActive(connectionId, sessionId); - span.setAttribute('routing.decision', isOwner ? 'active' : 'takeover'); - return; } + applySkillPolicy(activeClientId); + ctx.connRegistry.watch(connectionId, sessionId); + ctx.connRegistry.setActive(connectionId, sessionId); + sendToChat(activeClientId, prompt, msg.images, msg.contextBlocks, msg.clientMsgId); + span.setAttribute('routing.decision', isOwner ? 'active' : 'takeover'); + return; } + // Zombie — abort before resume. + if (found && isActive(found.clientId)) { + log.info('aborting zombie session before resume', { + connectionId, + sessionId, + clientId: found.clientId, + storeState, + }); + stopChat(found.clientId); + } + + // Resume from history. const sessionClientId = `${connectionId}:${sessionId}`; ctx.connRegistry.watch(connectionId, sessionId); ctx.connRegistry.setActive(connectionId, sessionId); @@ -631,61 +651,66 @@ export function handleInterruptV2( }); } - if (isActive(found.clientId)) { - const storeMeta = ctx.eventStore.getSession(msg.sessionId); - const staleInMemory = storeMeta && !storeMeta.isActive; + // State-based routing (Phase 3): durable state is the single source of truth. + if ( + found && + isActive(found.clientId) && + storeState !== 'ENDED' && + storeState !== 'CLOSING' && + storeState !== null + ) { + const ownerConnection = getOwnerConnection(found.clientId); + const isOwner = ownerConnection === connectionId; + const isDetached = !ctx.sessionRegistry.isAttached(found.clientId); - if (staleInMemory) { - log.info('removing stale session from registry (interrupt)', { + if (!isOwner) { + const oldTransport = found.session?.transport; + if (oldTransport?.isOpen()) { + oldTransport.send({ type: 'session_takeover', sessionId: msg.sessionId }); + } + ctx.connRegistry.unwatch(ownerConnection, msg.sessionId); + denyPendingBySession(msg.sessionId); + + reattachChat(found.clientId, transport); + const newClientId = `${connectionId}:${msg.sessionId}`; + if (found.clientId !== newClientId) { + rekeyChat(found.clientId, newClientId); + activeClientId = newClientId; + } + log.info('takeover on interrupt', { connectionId, sessionId: msg.sessionId, - clientId: found.clientId, + oldOwner: ownerConnection, + newClientId: activeClientId, storeState, }); - ctx.sessionRegistry.remove(found.clientId); - } else { - const ownerConnection = getOwnerConnection(found.clientId); - const isOwner = ownerConnection === connectionId; - const isDetached = !ctx.sessionRegistry.isAttached(found.clientId); + } else if (isDetached) { + reattachChat(found.clientId, transport); + } - if (!isOwner) { - const oldTransport = found.session?.transport; - if (oldTransport?.isOpen()) { - oldTransport.send({ type: 'session_takeover', sessionId: msg.sessionId }); - } - ctx.connRegistry.unwatch(ownerConnection, msg.sessionId); - denyPendingBySession(msg.sessionId); - - reattachChat(found.clientId, transport); - const newClientId = `${connectionId}:${msg.sessionId}`; - if (found.clientId !== newClientId) { - rekeyChat(found.clientId, newClientId); - activeClientId = newClientId; - } - log.info('takeover on interrupt', { - connectionId, - sessionId: msg.sessionId, - oldOwner: ownerConnection, - newClientId: activeClientId, - storeState, - }); - } else if (isDetached) { - reattachChat(found.clientId, transport); - } + ctx.connRegistry.watch(connectionId, msg.sessionId); + ctx.connRegistry.setActive(connectionId, msg.sessionId); + interruptChat( + activeClientId, + msg.prompt, + msg.images, + msg.contextBlocks, + msg.clientMsgId, + msg.model, + ); + log.info('interrupt', { connectionId, sessionId: msg.sessionId }); + return; + } - ctx.connRegistry.watch(connectionId, msg.sessionId); - ctx.connRegistry.setActive(connectionId, msg.sessionId); - interruptChat( - activeClientId, - msg.prompt, - msg.images, - msg.contextBlocks, - msg.clientMsgId, - msg.model, - ); - log.info('interrupt', { connectionId, sessionId: msg.sessionId }); - return; - } + // Zombie — abort before resume. + if (found && isActive(found.clientId)) { + log.info('aborting zombie session before resume (interrupt)', { + connectionId, + sessionId: msg.sessionId, + clientId: found.clientId, + storeState, + }); + stopChat(found.clientId); } const sessionClientId = `${connectionId}:${msg.sessionId}`;