From af850f68c009ddbab446a92e8c4bfa395e06cdd2 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Fri, 8 May 2026 18:28:21 -0700 Subject: [PATCH 1/4] fix(langgraph): restore messages$ and values$ from latest checkpoint on reconnect --- .../internals/stream-manager.bridge.spec.ts | 51 +++++++++++++++++++ .../lib/internals/stream-manager.bridge.ts | 26 ++++++++++ 2 files changed, 77 insertions(+) diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts index 1c3c1835a..04bb4dc5d 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts @@ -181,6 +181,57 @@ describe('createStreamManagerBridge', () => { destroy$.next(); }); + it('populates messages$ and values$ from the latest checkpoint on initial connect', async () => { + const transport = new MockAgentTransport(); + transport.history = [ + { + values: { + messages: [ + { type: 'human', id: 'u-1', content: 'previous question', _getType: () => 'human' }, + { type: 'ai', id: 'a-1', content: 'previous answer', _getType: () => 'ai' }, + ], + model: 'gpt-5-mini', + reasoning_effort: 'medium', + }, + next: [], + checkpoint: { + thread_id: 'persisted-thread-1', + checkpoint_ns: '', + checkpoint_id: 'cp-1', + checkpoint_map: null, + }, + metadata: null, + created_at: '2026-05-08T12:00:00.000Z', + parent_checkpoint: null, + tasks: [], + } as never, + ]; + + const subjects = makeSubjects(); + const destroy$ = new Subject(); + + createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'chat', transport }, + subjects, + threadId$: of('persisted-thread-1'), + destroy$: destroy$.asObservable(), + }); + + // Wait for the refreshHistory promise chain to resolve. + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.messages$.value.length).toBe(2); + expect((subjects.messages$.value[0] as { content: unknown }).content).toBe('previous question'); + expect((subjects.messages$.value[1] as { content: unknown }).content).toBe('previous answer'); + + const values = subjects.values$.value as Record; + expect(values['model']).toBe('gpt-5-mini'); + expect(values['reasoning_effort']).toBe('medium'); + expect(values).not.toHaveProperty('messages'); + + destroy$.next(); + }); + it('refreshes history after a stream completes', async () => { const firstHistory = [makeThreadState('checkpoint-1')]; const secondHistory = [ diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts index 724a53328..7ff6558cb 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts @@ -154,6 +154,32 @@ export function createStreamManagerBridge[]); + + // Project the latest checkpoint into messages$ + values$ on first + // connect. The user expectation (per the canonical examples/chat + // demo spec) is that reloading mid-conversation reattaches to the + // existing thread and the history reappears in the chat UI. The + // chat composition reads messages$ (not history$), so this + // projection is the bridge between "we fetched the checkpoint" + // and "the user can see the conversation". + // + // Guard: only populate when messages$ is currently empty, so we + // don't overwrite optimistic local state if the user already + // submitted a message in the gap between threadId-set and + // history-fetched. + const latest = history[0] as + | { values?: { messages?: BaseMessage[] } & T } + | undefined; + if (latest?.values && subjects.messages$.value.length === 0) { + const restoredMessages = latest.values.messages ?? []; + const restoredValues = { ...(latest.values as T) }; + // Strip the `messages` field from values — messages$ is the + // canonical surface for them; keeping a duplicate in values$ + // would confuse downstream consumers reading both subjects. + delete (restoredValues as { messages?: unknown }).messages; + subjects.messages$.next(restoredMessages); + subjects.values$.next(restoredValues); + } } } catch (err) { if (!controller.signal.aborted && (err as Error)?.name !== 'AbortError') { From 81c40eddcd7fa238735a48dcb81d79a9dd9e4c89 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Fri, 8 May 2026 18:28:52 -0700 Subject: [PATCH 2/4] test(langgraph): pin race-guard for optimistic submit during history fetch --- .../internals/stream-manager.bridge.spec.ts | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts index 04bb4dc5d..b5333ab15 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts @@ -232,6 +232,71 @@ describe('createStreamManagerBridge', () => { destroy$.next(); }); + it('does not clobber local optimistic messages if a submit beats the history fetch', async () => { + const historyFetched: ThreadState>[] = [ + { + values: { + messages: [ + { type: 'human', id: 'old-u', content: 'old prompt', _getType: () => 'human' }, + ], + }, + next: [], + checkpoint: { + thread_id: 'persisted-thread-2', + checkpoint_ns: '', + checkpoint_id: 'cp-old', + checkpoint_map: null, + }, + metadata: null, + created_at: '2026-05-08T12:00:00.000Z', + parent_checkpoint: null, + tasks: [], + } as never, + ]; + + // Inline mock transport with a delayed getHistory so we can observe + // a state mutation between threadId-set and history resolution. + const transport: AgentTransport & { + getHistory: ( + threadId: string, + signal: AbortSignal, + ) => Promise>[]>; + } = { + async *stream() { + yield* []; + }, + async getHistory(_threadId, _signal) { + await new Promise(r => setTimeout(r, 50)); + return historyFetched; + }, + }; + + const subjects = makeSubjects(); + const destroy$ = new Subject(); + + createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'chat', transport }, + subjects, + threadId$: of('persisted-thread-2'), + destroy$: destroy$.asObservable(), + }); + + // Synchronously simulate an optimistic local submit BEFORE history + // resolves: the user clicks Send during the 50ms history fetch. + subjects.messages$.next([ + { type: 'human', id: 'fresh', content: 'fresh prompt', _getType: () => 'human' }, + ] as never); + + // Wait past the history fetch delay. + await new Promise(r => setTimeout(r, 80)); + + // Local optimistic message preserved; history projection skipped. + expect(subjects.messages$.value.length).toBe(1); + expect((subjects.messages$.value[0] as { content: unknown }).content).toBe('fresh prompt'); + + destroy$.next(); + }); + it('refreshes history after a stream completes', async () => { const firstHistory = [makeThreadState('checkpoint-1')]; const secondHistory = [ From bcbe5d87cceb91964a32dc955b71981140f06a32 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Fri, 8 May 2026 18:23:50 -0700 Subject: [PATCH 3/4] docs(spec): @ngaf/langgraph thread restoration on reconnect Targets Finding D from the live smoke pass: after a page reload, the conversation is NOT restored even though the threadId is persisted in localStorage and passed to agent({threadId}). Root cause: refreshHistory() populates history$ but does not project the latest checkpoint into messages$ or values$. The chat composition reads messages$, so the user sees the welcome state. Fix: extend refreshHistory to project history[0].values into messages$ and values$ on first connect. Guarded by a "messages$ is empty" check so an optimistic local submit that beats history fetch is preserved. 12 LOC change in the bridge plus 2 unit tests pinning the restore path and the race-guard. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...-08-langgraph-thread-restoration-design.md | 214 ++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-08-langgraph-thread-restoration-design.md diff --git a/docs/superpowers/specs/2026-05-08-langgraph-thread-restoration-design.md b/docs/superpowers/specs/2026-05-08-langgraph-thread-restoration-design.md new file mode 100644 index 000000000..56bf1e383 --- /dev/null +++ b/docs/superpowers/specs/2026-05-08-langgraph-thread-restoration-design.md @@ -0,0 +1,214 @@ +# `@ngaf/langgraph` Thread Restoration on Reconnect + +**Date:** 2026-05-08 +**Status:** Approved +**Surfaced by:** live-Chrome smoke pass against `examples/chat` after PR #218 merged (`a97721f7` on `origin/main`). + +## Goal + +Make `agent({ threadId })` actually restore the full conversation state when reconnecting to an existing thread. The Phase 1 spec (PR #213) explicitly listed reload-mid-conversation as a feature. It currently does not work — only `history$` is populated; the chat composition reads `messages$`. + +## The bug + +**Reproduction (currently broken on `origin/main`):** + +1. Open `examples/chat` at `/embed`. +2. Send a message — the demo creates a server thread; `onThreadId` writes the thread id to `localStorage` under key `ngaf-chat-demo:palette`. +3. Reload the page (Cmd+R). +4. The demo shell rehydrates `threadIdSignal` from `localStorage` and passes the persisted id to `agent({ threadId: this.threadIdSignal })`. +5. The bridge sees the threadId and calls `refreshHistory()`, which fetches the checkpoint history into `subjects.history$`. +6. **But** `subjects.messages$` stays at `[]`, so the chat composition shows the welcome state. Conversation appears lost. + +Verified live just now: `localStorage` retained the threadId (`019e0a1c-2287-...`); `document.querySelectorAll('chat-message').length === 0`; the welcome heading "How can I help?" was visible. + +## Root cause + +`stream-manager.bridge.ts` `refreshHistory()`: + +```ts +async function refreshHistory(): Promise { + const getHistory = transport.getHistory?.bind(transport); + if (!currentThreadId || !getHistory) return; + + // ...AbortController setup... + + try { + const history = await getHistory(threadId, controller.signal); + if (!controller.signal.aborted && currentThreadId === threadId) { + subjects.history$.next(history as ThreadState[]); // ← only history$ + } + } catch (err) { /* ... */ } +} +``` + +Only `history$` is populated. The chat composition reads `messages$` (via `agent.messages()`), so the user sees nothing. `values$` is also unpopulated — meaning state fields beyond messages (e.g. `state.model`, `state.reasoning_effort`) wouldn't restore either, although the demo currently mirrors those in `localStorage` so the practical user impact is just the missing message bubbles. + +## Approach + +Extend `refreshHistory()` to project the most recent checkpoint into `messages$` and `values$` after the history fetch resolves. Guarded so it doesn't clobber local optimistic state if the user submitted in the gap before history fetched. + +### The fix + +```ts +async function refreshHistory(): Promise { + const getHistory = transport.getHistory?.bind(transport); + if (!currentThreadId || !getHistory) return; + + historyAbortController?.abort(); + const controller = new AbortController(); + historyAbortController = controller; + const threadId = currentThreadId; + subjects.isThreadLoading$.next(true); + + try { + const history = await getHistory(threadId, controller.signal); + if (!controller.signal.aborted && currentThreadId === threadId) { + subjects.history$.next(history as ThreadState[]); + + // Project the latest checkpoint into messages$ + values$ on first + // load. The user expectation (per the Phase 1 examples/chat demo + // spec) is that reloading mid-conversation reattaches to the + // existing thread and the history reappears. Guard: only populate + // when messages$ is currently empty, so we don't overwrite + // optimistic local state if the user already submitted a message + // in the gap between threadId-set and history-fetched. + const latest = history[0] as { values?: { messages?: BaseMessage[] } & T } | undefined; + if (latest?.values && subjects.messages$.value.length === 0) { + const restoredMessages = latest.values.messages ?? []; + const restoredValues = { ...(latest.values as T) }; + // Strip messages from values — messages$ is the canonical surface + // for them; keeping a duplicate in values$ would confuse downstream + // consumers that read both subjects. + delete (restoredValues as { messages?: unknown }).messages; + subjects.messages$.next(restoredMessages); + subjects.values$.next(restoredValues); + } + } + } catch (err) { + if (!controller.signal.aborted && (err as Error)?.name !== 'AbortError') { + subjects.error$.next(err); + } + } finally { + if (historyAbortController === controller) { + historyAbortController = null; + subjects.isThreadLoading$.next(false); + } + } +} +``` + +Net change: ~12 LOC added inside the existing `if (!controller.signal.aborted && currentThreadId === threadId)` block. No new functions, no new exports, no behaviour change to any other surface. + +### Race conditions (handled) + +Three things race during init when `threadId` is non-null: + +1. `threadId$` emits the persisted id → `setThreadId(id, false)` → triggers `refreshHistory()`. +2. The user clicks Send before history fetches → `submit()` mutates `messages$`. +3. History resolves → wants to project to `messages$`. + +The guard `subjects.messages$.value.length === 0` makes #3 only fire when #2 hasn't yet. Practical effect: #2 (a user click typically taking >300ms after page paint) usually fires AFTER #3 (refreshHistory typically <100ms), so the common case is #3 wins, restoring history. In the rare case where the user clicks send before history resolves, the local optimistic message stays, history is silently skipped, and the conversation continues from the user's local state. + +### Approaches considered and rejected + +- **Expose `agent.restoreFromHistory()` for consumers to call.** Breaks the abstraction. The Phase 1 spec already promised auto-restore as default behaviour. Rejected. +- **Opt-in via `agent({ restoreOnInit: true })`.** Adds a knob to a feature that should be the default. Nobody is relying on the broken behaviour. Rejected. + +## Tests + +Add to `libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts`: + +```ts +it('populates messages$ from the latest checkpoint on initial connect', async () => { + const transport = new MockAgentTransport(); + transport.history = [ + { values: { messages: [ + { type: 'human', id: 'u-1', content: 'previous question', _getType: () => 'human' }, + { type: 'ai', id: 'a-1', content: 'previous answer', _getType: () => 'ai' }, + ] } }, + ]; + + const subjects = makeSubjects(); + const destroy$ = new Subject(); + createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'chat', transport }, + subjects, + threadId$: of('persisted-thread-1'), + destroy$: destroy$.asObservable(), + }); + + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.messages$.value.length).toBe(2); + expect((subjects.messages$.value[1] as { content: unknown }).content).toBe('previous answer'); +}); + +it('does not clobber local optimistic messages if a submit beats the history fetch', async () => { + const transport = new MockAgentTransport(); + // delay history so submit wins the race + transport.getHistoryDelayMs = 50; + transport.history = [ + { values: { messages: [ + { type: 'human', id: 'old', content: 'old', _getType: () => 'human' }, + ] } }, + ]; + + const subjects = makeSubjects(); + const destroy$ = new Subject(); + createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'chat', transport }, + subjects, + threadId$: of('persisted-thread-2'), + destroy$: destroy$.asObservable(), + }); + + // simulate optimistic local submit BEFORE history resolves + subjects.messages$.next([ + { type: 'human', id: 'fresh', content: 'fresh prompt', _getType: () => 'human' }, + ] as never); + + await new Promise(r => setTimeout(r, 60)); + + // local message preserved; history NOT projected because guard fired + expect(subjects.messages$.value.length).toBe(1); + expect((subjects.messages$.value[0] as { content: unknown }).content).toBe('fresh prompt'); +}); +``` + +The first test requires `MockAgentTransport.history` to be settable (likely already is via the `historyCalls`/`history` fixture pattern in the existing transport mock; if not, add a 1-line setter). The second test additionally needs `getHistoryDelayMs` — adding `await new Promise(r => setTimeout(r, this.getHistoryDelayMs ?? 0))` inside the mock's `getHistory()` is a small extension. + +The existing test at `it('loads history when initialized with a thread id', ...)` continues to pass — it asserts only that `history$` gets populated, which still happens. + +## Demo-side: nothing changes + +`examples/chat/angular/src/app/shell/demo-shell.component.ts` already initialises `threadIdSignal` from `persistence.read('threadId')` and passes it into `agent({ threadId: this.threadIdSignal })`. That code is correct as-is. The fix is entirely inside the adapter. + +## Out of scope (defer) + +- **Restoring custom palette state from server values.** The demo uses `state.model` and `state.reasoning_effort`. Both are mirrored in `localStorage` via `palette-persistence.service.ts`, so the user-visible behaviour (palette reflects the persisted choice) already works. Reading them back from `values$` after restore would be more correct philosophically, but adds plumbing in the demo for no observable user benefit. Defer. +- **Migrating the entire chat composition to consume `history$` directly.** That would be a different (and much larger) architectural change. +- **Multi-thread switcher.** The reload case is single-thread. Switching between persisted threads is a Phase 5+ feature. + +## Files touched + +| Path | Change | +|---|---| +| `libs/langgraph/src/lib/internals/stream-manager.bridge.ts` | extend `refreshHistory` to populate `messages$` + `values$` from the latest checkpoint (~12 LOC) | +| `libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts` | +2 unit tests pinning the restore path and the race-guard | +| `libs/langgraph/src/lib/transport/mock-stream.transport.ts` (if needed) | tiny extension: `history` setter and optional `getHistoryDelayMs` (≤5 LOC) | + +Total ≈ 65 LOC. + +## Definition of done + +1. PR merged. +2. CI green: `nx run langgraph:test` shows the 2 new tests passing alongside the existing 53. +3. Live smoke (manual, against the workspace `examples/chat` demo): send a message → reload → conversation reappears with the prior user/assistant exchange visible. No flash of welcome state. +4. Cross-mode reload still works: send in `/embed`, reload, switch to `/popup`, open popup — the prior conversation shows in the popup. + +## Risks + +- **Race with optimistic submit beating history fetch.** Mitigated by the `messages$.value.length === 0` guard. Pinned by the second unit test. +- **History fetch fails silently.** Existing behaviour preserved — error goes to `error$`, `messages$` stays empty, welcome state shows. User can retry by sending a new message. +- **Thread exists but has no messages (just metadata).** `latest.values.messages` is `[]`. Guard still passes; `messages$.next([])` is a no-op. Welcome state remains. Acceptable. +- **Latest checkpoint has stale messages relative to a concurrently running stream.** The bridge's existing in-flight stream logic handles this: any active stream's incoming chunks merge through `mergeMessages` against `messages$`. If we just restored an empty messages$ then a streaming chunk arrives, mergeMessages handles it as "first chunk for this AI". If we restored 2 messages and a new chunk arrives for a 3rd, the merge appends correctly. No new race introduced. From 50c9dd496d9c3069fc481a497f9a79ff3ee228b8 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Fri, 8 May 2026 18:25:57 -0700 Subject: [PATCH 4/4] docs(plan): @ngaf/langgraph thread restoration plan Five-phase plan: branch, TDD failing restore test, implementation (extend refreshHistory to project latest checkpoint into messages$ + values$, guarded by messages$ empty check), race-guard test, verification + PR. ~92 LOC, 2-3 commits. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...2026-05-08-langgraph-thread-restoration.md | 418 ++++++++++++++++++ 1 file changed, 418 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-08-langgraph-thread-restoration.md diff --git a/docs/superpowers/plans/2026-05-08-langgraph-thread-restoration.md b/docs/superpowers/plans/2026-05-08-langgraph-thread-restoration.md new file mode 100644 index 000000000..b409cf7f6 --- /dev/null +++ b/docs/superpowers/plans/2026-05-08-langgraph-thread-restoration.md @@ -0,0 +1,418 @@ +# `@ngaf/langgraph` Thread Restoration on Reconnect — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** When `agent({ threadId })` is created with a non-null thread id, restore the conversation: project the latest checkpoint's `values.messages` into `messages$` and the rest of `values` into `values$`. Reload-mid-conversation in the canonical demo will then reattach to the existing thread and the prior exchange will reappear. + +**Architecture:** Single-function extension to `refreshHistory()` inside the bridge — after the existing `subjects.history$.next(history)` line, project the most recent checkpoint into `subjects.messages$` and `subjects.values$`. Guarded by `subjects.messages$.value.length === 0` so an optimistic local submit that beats history fetch is preserved. + +**Tech Stack:** TypeScript (`libs/langgraph` + vitest), `@langchain/langgraph-sdk` ThreadState types. + +**Spec:** `docs/superpowers/specs/2026-05-08-langgraph-thread-restoration-design.md` + +**Branch:** `claude/langgraph-thread-restoration`, branched from `origin/main`. + +**Hard constraint:** Never reference hashbrown / copilotkit / chatgpt / chatbot-kit / claude in code, commits, or PR titles/bodies. + +--- + +## File Structure + +``` +libs/langgraph/src/lib/internals/ +├── stream-manager.bridge.ts # +12 LOC inside refreshHistory() +└── stream-manager.bridge.spec.ts # +2 unit tests (~80 LOC including helpers) +``` + +Total ≈ 92 LOC. ~3 commits. + +`MockAgentTransport` already has a public mutable `history: ThreadState[] = []` field (line 20 of `mock-stream.transport.ts`) and an implemented `getHistory()` method that returns it. We can either extend the mock with an optional `getHistoryDelayMs` field for the race test, or use the established inline-mock pattern (custom transport with a delayed `getHistory`) — the plan uses the inline-mock to keep the existing `MockAgentTransport` untouched. + +--- + +## Phase 0 — Branch creation + +### Task 0.1: Create implementation branch + +- [ ] **Step 1: Branch from origin/main** + +```bash +cd /Users/blove/repos/angular-agent-framework +git fetch origin main +git checkout -b claude/langgraph-thread-restoration origin/main +git rev-parse --abbrev-ref HEAD # must echo claude/langgraph-thread-restoration +git log --oneline -1 # must be on origin/main HEAD +``` + +--- + +## Phase 1 — TDD: failing restore test + +### Task 1.1: Add the failing test + +**Files:** +- Modify: `libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts` + +The spec file already imports `MockAgentTransport`, `makeSubjects()`, `makeThreadState()`, `BehaviorSubject`/`Subject`/`of` from rxjs, and `createStreamManagerBridge`. Locate the `describe('createStreamManagerBridge', () => {...})` block (around line 61) and the existing `it('loads history when initialized with a thread id', ...)` test inside it. + +- [ ] **Step 1: Add a new test directly after the existing history test** + +Insert this new `it` block immediately AFTER the existing `it('loads history when initialized with a thread id', ...)` test: + +```ts + it('populates messages$ and values$ from the latest checkpoint on initial connect', async () => { + const transport = new MockAgentTransport(); + transport.history = [ + { + values: { + messages: [ + { type: 'human', id: 'u-1', content: 'previous question', _getType: () => 'human' }, + { type: 'ai', id: 'a-1', content: 'previous answer', _getType: () => 'ai' }, + ], + model: 'gpt-5-mini', + reasoning_effort: 'medium', + }, + next: [], + checkpoint: { + thread_id: 'persisted-thread-1', + checkpoint_ns: '', + checkpoint_id: 'cp-1', + checkpoint_map: null, + }, + metadata: null, + created_at: '2026-05-08T12:00:00.000Z', + parent_checkpoint: null, + tasks: [], + } as never, + ]; + + const subjects = makeSubjects(); + const destroy$ = new Subject(); + + createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'chat', transport }, + subjects, + threadId$: of('persisted-thread-1'), + destroy$: destroy$.asObservable(), + }); + + // Wait one microtask for the refreshHistory promise chain to resolve. + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.messages$.value.length).toBe(2); + expect((subjects.messages$.value[0] as { content: unknown }).content).toBe('previous question'); + expect((subjects.messages$.value[1] as { content: unknown }).content).toBe('previous answer'); + + // values$ contains the rest of the thread state — but NOT a duplicate + // `messages` field, since messages$ is the canonical surface. + const values = subjects.values$.value as Record; + expect(values['model']).toBe('gpt-5-mini'); + expect(values['reasoning_effort']).toBe('medium'); + expect(values).not.toHaveProperty('messages'); + + destroy$.next(); + }); +``` + +- [ ] **Step 2: Run test to verify it FAILS** + +```bash +cd /Users/blove/repos/angular-agent-framework +npx nx run langgraph:test --skip-nx-cache 2>&1 | tail -15 +``` + +Expected: 1 test fails — `populates messages$ and values$ from the latest checkpoint on initial connect`. The pre-existing `loads history when initialized with a thread id` still passes because it asserts only on `history$`. Pre-existing 53 tests still pass. + +The failure should be `expected 0 to be 2` for `subjects.messages$.value.length`. + +Do NOT commit yet — Phase 2 commits the test + implementation together. + +--- + +## Phase 2 — Implement the `refreshHistory` extension + +### Task 2.1: Project latest checkpoint into messages$ + values$ + +**Files:** +- Modify: `libs/langgraph/src/lib/internals/stream-manager.bridge.ts` + +Locate `refreshHistory()` (around line 143). The current relevant block: + +```ts + try { + const history = await getHistory(threadId, controller.signal); + if (!controller.signal.aborted && currentThreadId === threadId) { + subjects.history$.next(history as ThreadState[]); + } + } catch (err) { +``` + +- [ ] **Step 1: Add the projection inside the success branch** + +Replace the inner `if` block with: + +```ts + try { + const history = await getHistory(threadId, controller.signal); + if (!controller.signal.aborted && currentThreadId === threadId) { + subjects.history$.next(history as ThreadState[]); + + // Project the latest checkpoint into messages$ + values$ on first + // connect. The user expectation (per the canonical examples/chat + // demo spec) is that reloading mid-conversation reattaches to the + // existing thread and the history reappears in the chat UI. The + // chat composition reads messages$ (not history$), so this + // projection is the bridge between "we fetched the checkpoint" + // and "the user can see the conversation". + // + // Guard: only populate when messages$ is currently empty, so we + // don't overwrite optimistic local state if the user already + // submitted a message in the gap between threadId-set and + // history-fetched. + const latest = history[0] as + | { values?: { messages?: BaseMessage[] } & T } + | undefined; + if (latest?.values && subjects.messages$.value.length === 0) { + const restoredMessages = latest.values.messages ?? []; + const restoredValues = { ...(latest.values as T) }; + // Strip the `messages` field from values — messages$ is the + // canonical surface for them; keeping a duplicate in values$ + // would confuse downstream consumers reading both subjects. + delete (restoredValues as { messages?: unknown }).messages; + subjects.messages$.next(restoredMessages); + subjects.values$.next(restoredValues); + } + } + } catch (err) { +``` + +- [ ] **Step 2: Run the test — must now PASS** + +```bash +cd /Users/blove/repos/angular-agent-framework +npx nx run langgraph:test --skip-nx-cache 2>&1 | tail -10 +``` + +Expected: all langgraph tests pass — the previously-failing restore test now passes (54 total). Pre-existing tests still green. + +- [ ] **Step 3: Lint** + +```bash +npx nx run langgraph:lint --skip-nx-cache 2>&1 | tail -5 +``` + +Expected: 0 errors. + +- [ ] **Step 4: Commit fix + restore test together** + +```bash +git add libs/langgraph/src/lib/internals/stream-manager.bridge.ts \ + libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +git commit -m "fix(langgraph): restore messages$ and values$ from latest checkpoint on reconnect" +``` + +--- + +## Phase 3 — Race-guard test + +### Task 3.1: Pin the optimistic-submit-beats-history-fetch behavior + +**Files:** +- Modify: `libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts` + +This test pins the guard added in Phase 2: if `messages$` already has content when history resolves, the projection is silently skipped. Uses an inline mock transport with a delayed `getHistory()` so the test can observe a state mutation between `threadId$` emission and history resolution. + +- [ ] **Step 1: Add the test directly after the restore test from Phase 1** + +```ts + it('does not clobber local optimistic messages if a submit beats the history fetch', async () => { + const historyFetched: ThreadState>[] = [ + { + values: { + messages: [ + { type: 'human', id: 'old-u', content: 'old prompt', _getType: () => 'human' }, + ], + }, + next: [], + checkpoint: { + thread_id: 'persisted-thread-2', + checkpoint_ns: '', + checkpoint_id: 'cp-old', + checkpoint_map: null, + }, + metadata: null, + created_at: '2026-05-08T12:00:00.000Z', + parent_checkpoint: null, + tasks: [], + } as never, + ]; + + // Inline mock transport with a delayed getHistory so we can observe + // a state mutation between threadId-set and history resolution. + const transport: AgentTransport & { + getHistory: ( + threadId: string, + signal: AbortSignal, + ) => Promise>[]>; + } = { + async *stream() { + yield* []; + }, + async getHistory(_threadId, _signal) { + await new Promise(r => setTimeout(r, 50)); + return historyFetched; + }, + }; + + const subjects = makeSubjects(); + const destroy$ = new Subject(); + + createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'chat', transport }, + subjects, + threadId$: of('persisted-thread-2'), + destroy$: destroy$.asObservable(), + }); + + // Synchronously simulate an optimistic local submit BEFORE history + // resolves: the user clicks Send during the 50ms history fetch. + subjects.messages$.next([ + { type: 'human', id: 'fresh', content: 'fresh prompt', _getType: () => 'human' }, + ] as never); + + // Wait past the history fetch delay. + await new Promise(r => setTimeout(r, 80)); + + // Local optimistic message preserved; history projection skipped. + expect(subjects.messages$.value.length).toBe(1); + expect((subjects.messages$.value[0] as { content: unknown }).content).toBe('fresh prompt'); + + destroy$.next(); + }); +``` + +- [ ] **Step 2: Run the test — must PASS** + +```bash +cd /Users/blove/repos/angular-agent-framework +npx nx run langgraph:test --skip-nx-cache 2>&1 | tail -5 +``` + +Expected: all langgraph tests pass (55 total — 53 existing + restore + race-guard). The race-guard test passes immediately because the implementation from Phase 2 already has the `messages$.value.length === 0` guard. + +If the test fails, the guard is broken — review Task 2.1 Step 1 and confirm the `if (latest?.values && subjects.messages$.value.length === 0)` guard exists exactly as written. + +- [ ] **Step 3: Lint** + +```bash +npx nx run langgraph:lint --skip-nx-cache 2>&1 | tail -5 +``` + +Expected: 0 errors. + +- [ ] **Step 4: Commit** + +```bash +git add libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +git commit -m "test(langgraph): pin race-guard for optimistic submit during history fetch" +``` + +--- + +## Phase 4 — Verification + PR + +### Task 4.1: Full local sweep + +- [ ] **Step 1: Lint** + +```bash +cd /Users/blove/repos/angular-agent-framework +npx nx run langgraph:lint --skip-nx-cache 2>&1 | tail -5 +``` + +Expected: 0 errors. + +- [ ] **Step 2: Test** + +```bash +npx nx run langgraph:test --skip-nx-cache 2>&1 | tail -10 +``` + +Expected: 55 tests pass (53 existing + 2 new). + +- [ ] **Step 3: Confirm commit count** + +```bash +git rev-list --count origin/main..HEAD +``` + +Expected: 2 commits. + +### Task 4.2: Push + open PR + +- [ ] **Step 1: Push** + +```bash +git push -u origin claude/langgraph-thread-restoration 2>&1 | tail -3 +``` + +- [ ] **Step 2: Open PR** + +```bash +gh pr create --title "fix(langgraph): restore conversation on reconnect to existing thread" --body "$(cat <<'EOF' +## Summary + +Targets Finding D from the live smoke pass: after a page reload, the conversation is NOT restored even though the threadId is persisted in localStorage and passed to \`agent({ threadId })\`. + +## Root cause + +\`stream-manager.bridge.ts\` \`refreshHistory()\` populates \`history$\` from the transport's checkpoint history but does not project the latest checkpoint's \`values.messages\` into \`messages$\`. The chat composition reads \`messages$\` (not \`history$\`), so the user sees the welcome state instead of their prior conversation. + +## Fix + +After the existing \`subjects.history$.next(history)\` line, project the most recent checkpoint into \`messages$\` and \`values$\`. Guarded by \`messages$.value.length === 0\` so an optimistic local submit that beats the history fetch is preserved. + +\`\`\`ts +const latest = history[0]; +if (latest?.values && subjects.messages$.value.length === 0) { + const restoredMessages = latest.values.messages ?? []; + const restoredValues = { ...latest.values }; + delete restoredValues.messages; + subjects.messages$.next(restoredMessages); + subjects.values$.next(restoredValues); +} +\`\`\` + +The \`messages\` field is stripped from \`values$\` so consumers reading both subjects don't see duplicates — \`messages$\` is the canonical surface. + +## Test plan + +### Verified locally +- [x] \`nx run langgraph:lint\` — 0 errors +- [x] \`nx run langgraph:test\` — 55 tests pass (53 existing + 2 new) +- [x] New test 1: populates messages$ and values$ from latest checkpoint on initial connect +- [x] New test 2: does not clobber local optimistic messages if a submit beats the history fetch + +### Pending visual verification +- [ ] After merge: live smoke against the workspace examples/chat demo. Send a message → reload → conversation reappears with prior user/assistant exchange visible. No flash of welcome state. + +Spec: \`docs/superpowers/specs/2026-05-08-langgraph-thread-restoration-design.md\` +Plan: \`docs/superpowers/plans/2026-05-08-langgraph-thread-restoration.md\` +EOF +)" +``` + +- [ ] **Step 3: Note the PR URL.** + +- [ ] **Step 4: Wait for CI; address failures.** + +- [ ] **Step 5: Merge once green.** + +--- + +## Definition of done + +1. PR merged. +2. CI green: `nx run langgraph:lint` and `nx run langgraph:test` (55 tests). +3. Live smoke (manual, post-merge): reload mid-conversation in the workspace `examples/chat` demo → conversation reappears with the prior user/assistant exchange. +4. The 2 new bridge unit tests pin both the restore path and the race-guard.