Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/harness/src/session-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export interface ManagedSession {
telosTaskId?: string;
/** Active subagent task IDs — task_id → tool_use_id (parent_tool_use_id). */
activeTaskIds: Map<string, string>;
/** Agent name used for this session (e.g. 'mitzo-conversational'). */
agentName?: string;
/** Cached boot_context payload for replay on reconnect/switch. */
bootContext?: Record<string, unknown>;
}

export interface ActiveSessionInfo {
Expand Down
30 changes: 30 additions & 0 deletions packages/protocol/src/event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ interface SessionRow {
last_speaker_at: number | null;
state: string | null;
last_state_change: number | null;
agent_name: string | null;
boot_context: string | null;
created_at: number;
updated_at: number;
}
Expand Down Expand Up @@ -122,6 +124,7 @@ export class EventStore {
this.migrateCloseTracking(db);
this.migrateAttentionTracking(db);
this.migrateSessionState(db);
this.migrateBootContext(db);

this.log.info('EventStore initialized', { dbPath });

Expand Down Expand Up @@ -281,6 +284,19 @@ export class EventStore {
}
}

private migrateBootContext(db: Database.Database): void {
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 style: Migration method migrateBootContext also adds the agent_name column. Consider renaming to migrateBootContextAndAgentName or migrateSessionAgentFields to accurately reflect its scope. [fixable]

const columns = db.prepare("PRAGMA table_info('sessions')").all() as Array<{ name: string }>;
const columnNames = new Set(columns.map((c) => c.name));
if (!columnNames.has('agent_name')) {
db.exec('ALTER TABLE sessions ADD COLUMN agent_name TEXT');
this.log.info('migrated sessions table: added agent_name');
}
if (!columnNames.has('boot_context')) {
db.exec('ALTER TABLE sessions ADD COLUMN boot_context TEXT');
this.log.info('migrated sessions table: added boot_context');
}
}

close(): void {
if (this.db) {
this.db.close();
Expand Down Expand Up @@ -351,6 +367,14 @@ export class EventStore {
fields.push('closed_by = ?');
values.push(meta.closedBy);
}
if (meta.agentName !== undefined) {
fields.push('agent_name = ?');
values.push(meta.agentName);
}
if (meta.bootContext !== undefined) {
fields.push('boot_context = ?');
values.push(meta.bootContext);
}
if (meta.updatedAt !== undefined) {
fields.push('updated_at = ?');
values.push(meta.updatedAt);
Expand All @@ -374,6 +398,8 @@ export class EventStore {
'goal_id',
'telos_task_id',
'closed_by',
'agent_name',
'boot_context',
];
const vals: unknown[] = [
meta.sessionId,
Expand All @@ -387,6 +413,8 @@ export class EventStore {
meta.goalId ?? null,
meta.telosTaskId ?? null,
meta.closedBy ?? null,
meta.agentName ?? null,
meta.bootContext ?? null,
];
if (meta.updatedAt !== undefined) {
cols.push('updated_at');
Expand Down Expand Up @@ -653,6 +681,8 @@ function rowToSession(row: SessionRow): SessionMeta {
lastSpeakerAt: row.last_speaker_at ?? null,
state: (row.state as SessionMeta['state']) ?? null,
lastStateChange: row.last_state_change ?? null,
agentName: row.agent_name ?? null,
bootContext: row.boot_context ?? null,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
Expand Down
3 changes: 3 additions & 0 deletions packages/protocol/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ export interface SessionMeta {
lastSpeakerAt: number | null;
state: SessionState | null;
lastStateChange: number | null;
agentName: string | null;
/** Serialized JSON of the boot_context payload (sources, tokens, sections). */
bootContext: string | null;
createdAt: number;
updatedAt: number;
}
Expand Down
76 changes: 50 additions & 26 deletions server/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,9 @@ async function _startChatInner(
const mcpAllowed = buildMcpAllowedTools(clientId);
const extraTools = options.extraTools ? options.extraTools.split(',').map((t) => t.trim()) : [];

// Resolve agent name early — needed for registration, resume upsert, and boot context.
const agentName = options.agentName ?? DEFAULT_AGENT_NAME;

// Streaming-input queue — kept open for the session lifetime.
const inputQueue = new AsyncQueue<SDKUserMessage>();
inputQueue.push(makeUserMessage(fullPrompt, 'now'));
Expand All @@ -687,6 +690,7 @@ async function _startChatInner(
wtId,
sessionAllowList: new Set<string>(),
worktreePath,
agentName,
// Set sessionId early so pre-assistant events are persisted (iOS reconnect).
...(options.resume ? { sessionId: options.resume } : {}),
...(options.telosTaskId ? { telosTaskId: options.telosTaskId } : {}),
Expand Down Expand Up @@ -726,6 +730,7 @@ async function _startChatInner(
...(worktreePath ? { wtId } : {}),
...(options.telosTaskId ? { telosTaskId: options.telosTaskId } : {}),
...(existingMeta ? { updatedAt: existingMeta.updatedAt } : {}),
agentName,
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 bugs: The resume upsert at line 724–734 persists agentName but not bootContext. On resume, if the prior session had a cached bootContext in the EventStore, this upsert overwrites session metadata without preserving bootContext. The field won't be lost (SQL UPDATE only sets provided fields), but this is an asymmetry worth noting — the resume path reconstructs the boot context from scratch (the IIFE runs again), so the stale stored value will eventually be overwritten. No data loss, but if the boot context IIFE fails on resume, the old value persists, which is actually correct. No fix needed — noting for clarity.

});
}

Expand All @@ -741,7 +746,6 @@ async function _startChatInner(
// Build session env with worktree paths for the agent (all repos including primary)
const sessionEnv = sdkEnv();
sessionEnv.MITZO_SESSION_ID = wtId;
const agentName = options.agentName ?? DEFAULT_AGENT_NAME;
sessionEnv.MITZO_AGENT_NAME = agentName;
for (const [name, { path }] of repoWorktrees) {
sessionEnv[`MITZO_REPO_${name.toUpperCase()}`] = path;
Expand Down Expand Up @@ -777,16 +781,18 @@ async function _startChatInner(
} catch (importErr: unknown) {
const msg = importErr instanceof Error ? importErr.message : String(importErr);
log.info('contexgin not available, using fallback', { error: msg });
send(transport, {
type: 'boot_context',
source: 'local-fallback',
const fallback = {
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 style: The fallback boot_context object is duplicated three times (lines 784–792, 830–845, 921–928) with identical shape but verbose inline type annotations ([] as Array<{ source: string; heading: string; tokens: number; content: string }>). Extract a helper like makeEmptyBootContext(tokenBudget: number) to reduce duplication and the risk of shape drift between the three copies. [fixable]

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 style: The fallback boot_context object is constructed identically in three places (import error at ~784, invalid compile result at ~828, and catch block at ~918), each with verbose inline type annotations like [] as Array<{ source: string; heading: string; tokens: number; content: string }>. Extract a shared makeFallbackBootContext(tokenBudget: number) helper to eliminate the repetition and reduce the diff noise. [fixable]

source: 'local-fallback' as const,
sourceCount: 0,
tokenCount: 0,
tokenBudget: DEFAULT_TOKEN_BUDGET,
sources: [],
included: [],
trimmed: [],
});
sources: [] as Array<{ path: string; kind: string }>,
included: [] as Array<{ source: string; heading: string; tokens: number; content: string }>,
trimmed: [] as Array<{ source: string; heading: string; tokens: number; content: string }>,
};
send(transport, { type: 'boot_context', ...fallback });
const s = registry.get(clientId);
if (s) s.bootContext = fallback;
return;
}

Expand Down Expand Up @@ -819,16 +825,28 @@ async function _startChatInner(
// Validate the compiled object shape
if (!compiled || typeof compiled !== 'object') {
log.warn('contexgin compile() returned unexpected shape', { compiled });
send(transport, {
type: 'boot_context',
source: 'local-fallback',
const fallback = {
source: 'local-fallback' as const,
sourceCount: 0,
tokenCount: 0,
tokenBudget: tokenBudget,
sources: [],
included: [],
trimmed: [],
});
sources: [] as Array<{ path: string; kind: string }>,
included: [] as Array<{
source: string;
heading: string;
tokens: number;
content: string;
}>,
trimmed: [] as Array<{
source: string;
heading: string;
tokens: number;
content: string;
}>,
};
send(transport, { type: 'boot_context', ...fallback });
const s = registry.get(clientId);
if (s) s.bootContext = fallback;
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 style: The pattern const s = registry.get(clientId); if (s) s.bootContext = fallbackPayload; is repeated three times in the boot context compilation block (lines ~795, ~849, ~929). This could be a single helper or moved after the try/catch to always set whichever payload was sent. [fixable]

return;
}

Expand Down Expand Up @@ -879,30 +897,36 @@ async function _startChatInner(
const trimmed = extractSections(rawTrimmed);
const fullMarkdown = typeof obj.bootPayload === 'string' ? obj.bootPayload : undefined;

send(transport, {
type: 'boot_context',
source: 'contexgin',
const bootPayload = {
source: 'contexgin' as const,
sourceCount: sources.length,
tokenCount: bootTokens,
tokenBudget: tokenBudget,
sources,
included,
trimmed,
fullMarkdown,
});
};
send(transport, { type: 'boot_context', ...bootPayload });

// Cache in ManagedSession for replay on reconnect/switch
const s = registry.get(clientId);
if (s) s.bootContext = bootPayload;
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
log.warn('boot context compilation failed', { error: msg });
send(transport, {
type: 'boot_context',
source: 'local-fallback',
const fallbackPayload = {
source: 'local-fallback' as const,
sourceCount: 0,
tokenCount: 0,
tokenBudget: tokenBudget,
sources: [],
included: [],
trimmed: [],
});
sources: [] as Array<{ path: string; kind: string }>,
included: [] as Array<{ source: string; heading: string; tokens: number; content: string }>,
trimmed: [] as Array<{ source: string; heading: string; tokens: number; content: string }>,
};
send(transport, { type: 'boot_context', ...fallbackPayload });
const s = registry.get(clientId);
if (s) s.bootContext = fallbackPayload;
}
})();
capturePromptComparison(wtId, cwd, systemPromptAppend, repoWorktrees).catch(() => {});
Expand Down
4 changes: 4 additions & 0 deletions server/query-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ async function _runQueryLoopInner(
...(currentSession.worktreePath ? { wtId: currentSession.wtId } : {}),
...(initialPrompt ? { initialPrompt } : {}),
...(currentSession.telosTaskId ? { telosTaskId: currentSession.telosTaskId } : {}),
...(currentSession.agentName ? { agentName: currentSession.agentName } : {}),
...(currentSession.bootContext
? { bootContext: JSON.stringify(currentSession.bootContext) }
: {}),
});
if (initialPrompt) {
// Store the initial prompt as a user_message event so
Expand Down
29 changes: 29 additions & 0 deletions server/ws-handler-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ export function handleReconnect(
running,
});

// Re-send cached boot_context so pills reappear after reconnect
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 bugs: Reconnect boot_context replay lacks sessionId. The reconnect handler loops over all tracked sessions and sends boot_context for every running session. These messages have no sessionId, so the client's event filter treats them as global and applies each to the current messages state (SET_BOOT_CONTEXT overwrites). If a client tracks multiple running sessions, a non-active session's boot_context will overwrite the active session's. Fix: add sessionId: entry.sessionId to the replayed boot_context message, matching how other per-session events are tagged. [fixable]

if (found && running && found.session?.bootContext) {
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 bugs: handleReconnect only replays boot_context for running sessions (hot path). Non-running sessions have no cold path — unlike handleSwitchSession which falls back to EventStore's serialized bootContext. Since boot_context is sent via raw send() (not sendOrBuffer), it is NOT stored in the EventStore event stream, so reconnecting to an ended session will never show boot context pills. Add an else branch that reads ctx.eventStore.getSession(entry.sessionId)?.bootContext and parses it, mirroring the cold path in handleSwitchSession (lines 391–401). [fixable]

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 bugs: handleReconnect only replays boot_context from in-memory SessionRegistry (hot path) when running is true, but does NOT fall back to EventStore for ended sessions. In contrast, handleSwitchSession (line 391) correctly implements a cold path fallback via sessionMeta.bootContext. A user reconnecting after a session has ended will not see boot_context pills, even though the data is persisted in the EventStore. The fix would be to add an else branch that reads from ctx.eventStore.getSession(entry.sessionId)?.bootContext and parses it, mirroring the switch_session logic. [fixable]

ctx.connRegistry.get(connectionId)?.transport.send({
type: 'boot_context',
...found.session.bootContext,
});
}

log.info('reconnect replay', {
connectionId,
sessionId: entry.sessionId,
Expand Down Expand Up @@ -371,6 +379,27 @@ export async function handleSwitchSession(
},
});

// Re-send boot_context so pills appear on session switch.
// Hot path: running session in SessionRegistry (in-memory cache).
// Cold path: ended session — read serialized JSON from EventStore.
const found = ctx.sessionRegistry.findBySessionId(msg.sessionId);
if (found?.session?.bootContext) {
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 bugs: Same sessionId omission on switch_session boot_context replay. Both the hot path (in-memory) and cold path (JSON.parse) send boot_context without a sessionId. This works today because switchSession resets messages state before the reply arrives, but it's inconsistent with the multiplexed protocol where all session-scoped events should carry sessionId for client demuxing. [fixable]

ctx.connRegistry.get(connectionId)?.transport.send({
type: 'boot_context',
...found.session.bootContext,
});
} else if (sessionMeta.bootContext) {
try {
const parsed = JSON.parse(sessionMeta.bootContext);
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 unsafe_assumptions: The cold path in handleSwitchSession does JSON.parse(sessionMeta.bootContext) and spreads the result into a WS message with no shape validation. If the stored JSON has a type key (which bootContext payloads don't, but could via corruption or future changes), it would overwrite the type: 'boot_context' set on line 394. Consider parsing into a variable and deleting parsed.type before spreading, or validating the parsed shape. [fixable]

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 unsafe_assumptions: JSON.parse of sessionMeta.bootContext could be exploited if the database value is tampered with or corrupted, resulting in arbitrary object shapes being spread into a WebSocket message. Consider validating the parsed shape (e.g., checking for expected keys like source, sourceCount) before sending to the client, or at minimum using a Zod schema. Low risk since the DB is local/trusted, but worth noting for defense-in-depth. [fixable]

ctx.connRegistry.get(connectionId)?.transport.send({
type: 'boot_context',
...parsed,
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 unsafe_assumptions: Cold-path JSON.parse has no shape validation. sessionMeta.bootContext is parsed and spread directly into the WS message. If the stored JSON was written by an older code version with a different shape, or is corrupted, the client receives an unvalidated payload. The protocol-parser on the client side is defensive (checks each field type), so this isn't exploitable, but a basic typeof check on parsed (must be a non-null object) would prevent spreading a string or array into the message. [fixable]

});
} catch {
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 style: The catch block silently swallows JSON parse errors with no logging. Add log.warn('failed to parse stored bootContext', { sessionId: msg.sessionId }) for debuggability — corrupted bootContext would otherwise be invisible. [fixable]

// Invalid JSON — skip
}
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 style: Silent catch on JSON.parse — project conventions use log.warn() for recoverable error paths. Adding a warn log with sessionId and the error message would help diagnose boot_context replay failures. [fixable]


log.info('switch_session', { connectionId, sessionId: msg.sessionId });
},
);
Expand Down
Loading