diff --git a/docs/adrs/025.server.channel.md b/docs/adrs/025.server.channel.md new file mode 100644 index 0000000..7066dc9 --- /dev/null +++ b/docs/adrs/025.server.channel.md @@ -0,0 +1,169 @@ +# ADR 025: Server — Session-scoped client integration channel + +**SPEC:** [client-integration](../specs/client-integration.md) +**Status:** Accepted +**Date:** 2026-06-08 + +--- + +## Context + +Agent tools and CLI scripts that produce structured output need a way to stream results to a browser UI in real-time. The pattern is simple: one publisher (a curl call or MCP tool), N subscribers (browser tabs). + +The current workaround in the Fusion project is a standalone `sync-server.ts` — a separate Bun process on a separate port (`2347`) that must be started independently. This creates unnecessary friction: two processes to manage, a hardcoded port, and app-specific code that is really a generic utility. + +webtty already runs an HTTP server when `webtty go ` is invoked. Adding publish/subscribe endpoints to that server eliminates the separate process entirely. + +--- + +## Decision + +Add a session-scoped integration channel to the existing webtty HTTP server. + +### API + +| Method | Path | Description | +|--------|------|-------------| +| `POST` | `/s/:id/publish` | Read body line by line; broadcast each valid JSON line to all current subscribers as a discrete WS text frame; `204` after publisher closes; `400` if `Content-Type` is not `application/json`; `404` if session does not exist; `409` if session exists but PTY is not running | +| `GET` | `/ws/:id/events` | WebSocket upgrade — joins `session.subscribers`; receives published payloads as JSON text frames; WS close `4001` if session does not exist; WS close `4002` if PTY is not running | + +One channel per session. No separate channel name or creation step — the session ID is the channel. One-shot and streaming use the same endpoint: a one-shot publisher sends one line and closes; a streaming publisher keeps the connection open and writes lines over time. + +### Channel lifecycle + +The channel is only available while the session's PTY is running. This keeps the channel lifecycle consistent with the terminal session. + +| Event | `session.clients` (PTY) | `session.subscribers` (channel) | +|-------|-------------------------|---------------------------------| +| PTY not yet spawned | n/a | publish → `409`; subscribe → WS close `4002` (session exists but PTY not running) | +| PTY running | connected normally | connected normally | +| Shell exits | closed; session deleted | closed; session deleted | +| `DELETE /api/sessions/:id` | closed | closed | +| Server stops | closed | closed | + +### Channel flow + +``` +Publisher webtty server Subscriber(s) +───────────────────────────────────────────────────────────────────────── +POST /s/:id/publish + body line 1 ──────────► parse JSON + broadcast ───────────────────► ws.send(line1) + body line 2 ──────────► parse JSON + broadcast ───────────────────► ws.send(line2) + (invalid line) ────────► skip silently + body line N ──────────► parse JSON + broadcast ───────────────────► ws.send(lineN) + [close] ───────────────► respond 204 +``` + +The `204` is returned after the publisher closes the connection — not before — because the server cannot know the stream is complete until the body ends. For one-shot publishers this is instantaneous. + +### Publisher usage + +**One-shot** (single JSON object, close immediately): + +```sh +curl -X POST http://localhost:2346/s/my-session/publish \ + -H 'Content-Type: application/json' \ + -d '{"type":"result","items":[...]}' +``` + +**Streaming** (pipe agent output line by line): + +```sh +my-agent --stream | curl -X POST http://localhost:2346/s/my-session/publish \ + -H 'Content-Type: application/json' \ + --data-binary @- +``` + +### Subscriber usage + +```js +const ws = new WebSocket('ws://localhost:2346/ws/my-session/events'); +ws.onmessage = (e) => { + const event = JSON.parse(e.data); + // render event in UI +}; +``` + +### Implementation — files to touch + +1. **`src/server/session.ts`** — add `subscribers: Set` to the `Session` interface; initialise as `new Set()` in `createSession` +2. **`src/server/routes.ts`** — add `POST /s/:id/publish` branch: validate session exists, check `session.pty !== null` (→ `409` if not), stream body line by line via `on('data')`, buffer incomplete lines, call `broadcastToSubscribers` per valid JSON line, respond `204` on `end` +3. **`src/server/websocket.ts`**: + - Rename the existing PTY upgrade path from `/ws/:id` to `/ws/:id/pty` + - Widen the `upgrade` handler to also accept `/ws/:id/events`, routing to a new `handleSubscribe` function + - In `handleSubscribe(ws, id)` — look up session, check `session.pty !== null` (→ WS close `4002` "PTY not running" if not), add `ws` to `session.subscribers`, remove on close + - Add `broadcastToSubscribers(session, line)` — iterates `session.subscribers`, calls `ws.send(line)` for each open socket + - Update `closeSession` / `closeAllSessions` to close and clear `session.subscribers` alongside `session.clients` + +--- + +## Reasons + +### Channel requires an active PTY + +The channel is only available when `session.pty !== null`. This enforces a clear mental model: the channel is a sidecar to a running terminal session, not a standalone message bus. It also simplifies lifecycle — the channel always closes with the PTY, so there are no orphaned subscribers or dangling publish requests to reason about. + +### Separate subscriber set from PTY clients + +`session.clients` holds WebSocket connections for PTY terminal output. `session.subscribers` holds connections for the integration channel. Keeping them as separate `Set` fields on `Session` means cleanup on session delete is automatic and neither set interferes with the other. + +### Line framing — one WS frame per line + +WebSocket is message-based, so the server needs a framing rule. Line-by-line (newline as delimiter) is the simplest framing that works for both one-shot and streaming publishers without a content-type distinction. The subscriber always receives one complete JSON object per frame regardless of how the publisher sent it. + +### No content-type distinction between one-shot and streaming + +Requiring `application/x-ndjson` for streaming adds friction with no benefit — the server reads line-by-line either way. Both use `Content-Type: application/json`. + +### 204 after publisher closes + +Holding the HTTP response until the publisher closes is correct for streaming: the server cannot know the publish is complete until the body stream ends. For one-shot publishers this is instantaneous; for streaming publishers the caller's `fetch` / `curl` naturally awaits the response after the pipe closes. + +### Silent skip on invalid JSON lines + +Closing the publish connection on a single malformed line would interrupt an otherwise healthy stream. Agents occasionally emit partial or diagnostic lines that are not JSON. Skipping silently is more robust. + +### Session as the natural channel namespace + +Both publisher and subscriber already share the session ID. Session-scoped routing removes the need for out-of-band channel name coordination. + +--- + +## Considered Options + +### Option A: Named channels (`/channel/:name/*`) instead of session-scoped + +More flexible — multiple independent channels per instance — but adds a second naming dimension and requires coordination between publisher and subscriber beyond the session ID. + +Rejected — one channel per session covers all current use cases. Named channels can be layered on top later. + +### Option B: Separate standalone server (current `sync-server.ts`) + +Works but requires a second process, a second port, and duplicates the same ~90-line pattern across projects. + +Rejected — the code is generic enough to live in webtty once. + +### Option C: Content-type distinction for streaming vs one-shot + +`application/json` for one-shot, `application/x-ndjson` for streaming. Adds publisher friction and a server branch with no benefit. + +Rejected — one endpoint, one content-type. + +### Option D: SSE instead of WebSocket for subscribe + +Simpler for one-way streaming and works natively with `EventSource`. However, webtty already has WebSocket infrastructure and browser clients already use WebSocket. + +Rejected for now — SSE can be added as an additional endpoint later if needed. + +--- + +## Consequences + +- `apps/fusion/sync-server.ts` can be deleted; Fusion points its agent push at `POST /s/:id/publish` and its UI at `ws://host/ws/:id/events`. +- The PTY WebSocket path changes from `/ws/:id` to `/ws/:id/pty` — server and browser client ship together so this is an internal refactor, not a breaking change. +- No auth on publish — callers must be on the same host or behind the same network boundary as webtty (consistent with the existing session API). +- The `upgrade` handler in `websocket.ts` must be updated to route `/ws/:id/pty` (PTY) and `/ws/:id/events` (channel) separately; anything else is destroyed as before. +- `session.subscribers` must be closed and cleared in `closeSession` and `closeAllSessions` alongside the existing `session.clients` handling. diff --git a/docs/specs/client-integration.md b/docs/specs/client-integration.md new file mode 100644 index 0000000..25f75a3 --- /dev/null +++ b/docs/specs/client-integration.md @@ -0,0 +1,69 @@ +# SPEC: Client Integration (CLI → Web) + +**Last Updated:** 2026-06-08 + +--- + +## Description + +A pattern for pushing live updates from a CLI tool or agent into a browser UI — without running a separate server process. + +**Persona:** Developers building CLI agents or tools that produce structured output and want to surface that output in a browser UI in real-time. + +webtty's HTTP server is already running whenever a session is open. The integration channel piggybacks on that server so any CLI process can publish structured events and any number of browser tabs receive them instantly — no extra port, no extra process. + +--- + +## Architecture + +``` +┌─────────────────┐ ws /ws/:id/pty ┌──────────────────────┐ +│ Browser tab │ ◄───────────────────────────────── │ │ +│ (terminal) │ ─────────────────────────────────► │ session.clients │ +└─────────────────┘ keyboard / resize └──────────────────────┘ + +┌─────────────────┐ POST /s/:id/publish ┌──────────────────────┐ +│ CLI / Agent │ ─────────────────────────────────► │ │ +│ (publisher) │ │ webtty server │ +└─────────────────┘ │ │ + │ session.subscribers │ +┌─────────────────┐ ws /ws/:id/events │ │ +│ Browser panel │ ◄───────────────────────────────── │ │ +│ (subscriber) │ one WS frame per event │ │ +└─────────────────┘ └──────────────────────┘ +``` + +Subscribers (integration channel) and terminal clients (PTY) are independent. A browser tab can be one, the other, or both. + +--- + +## Use Cases + +### Agent streaming results to a UI + +An AI agent or search tool runs in the terminal and emits structured results — search hits, status updates, token streams — that a browser panel renders as they arrive. The agent publishes to the session channel; the browser subscribes. + +### Replacing a bespoke sync server + +Projects like Fusion today ship a standalone `sync-server.ts` on a separate port that must be started independently. The session channel replaces it: one webtty process, one port, zero extra setup. + +--- + +## How It Works + +1. `bunx webtty go my-session` — the only process to start +2. A terminal client connects to `/ws/:id/pty` — this spawns the PTY and activates the channel +3. Browser panels subscribe via `/ws/:id/events` (requires an active PTY) +4. CLI tools or agents POST JSON to `/s/:id/publish` — one-shot or as a stream of lines +5. Each JSON line is broadcast to all subscribers as a discrete WebSocket frame as it arrives + +For interface details, channel flow, and API reference see [ADR 025](../adrs/025.server.channel.md). + +--- + +## Features + +| Feature | Description | ADR | Done? | +|---------|-------------|-----|-------| +| Session channel — publish | CLI tools POST JSON (one-shot or streaming) to the session; each event broadcast to subscribers in real-time | [ADR 025](../adrs/025.server.channel.md) | ✅ | +| Session channel — subscribe | Browser panels subscribe via WebSocket and receive one JSON object per frame | [ADR 025](../adrs/025.server.channel.md) | ✅ | diff --git a/docs/specs/webtty.md b/docs/specs/webtty.md index 1b021b0..d3059fa 100644 --- a/docs/specs/webtty.md +++ b/docs/specs/webtty.md @@ -80,3 +80,4 @@ Session IDs appear directly in the URL path (`/s/:id`), so they must be valid UR | Default session | `GET /` redirects to last-used session, or creates `main` if none exists | — | ✅ | | Multi-client sessions | Multiple browser tabs can attach to the same session; PTY output broadcast to all; scrollback replayed on reconnect | [ADR 007](../adrs/007.webtty.session-client.md) | ✅ | | Config file | Shell, port, font, theme from `~/.config/webtty/config.json`; hot-reload on tab reload | [ADR 008](../adrs/008.webtty.config.md) | ✅ | +| Client integration (CLI → Web) | `POST /s/:id/publish` (one-shot or streaming JSON) + `ws /ws/:id/events` subscribe; channel active only while PTY is running; no extra process or port | [ADR 025](../adrs/025.server.channel.md) | ✅ | diff --git a/src/cli/http.test.ts b/src/cli/http.test.ts index d6bca24..f06cc9a 100644 --- a/src/cli/http.test.ts +++ b/src/cli/http.test.ts @@ -177,6 +177,107 @@ describe('startServer', () => { configSpy.mockRestore(); }); + test('uses node executor on win32 with Bun', async () => { + const existsSpy = spyOn(fs, 'existsSync').mockReturnValue(true); + const fakeChild = { unref: mock(() => {}), on: mock(() => {}) }; + const spawnMock = mock(() => fakeChild); + + globalThis.fetch = mock( + async () => new Response('[]', { status: 200 }), + ) as unknown as typeof fetch; + + const configModule = await import('../config'); + const configSpy = spyOn(configModule, 'loadConfig').mockReturnValue({ + ...configModule.DEFAULT_CONFIG, + }); + + const origPlatform = process.platform; + Object.defineProperty(process, 'platform', { value: 'win32', configurable: true }); + + const { startServer } = await import('./http'); + await startServer(10000, spawnMock as never); + + const [exec] = spawnMock.mock.calls[0] as unknown as [string, string[]]; + expect(exec).toBe('node'); + + Object.defineProperty(process, 'platform', { value: origPlatform, configurable: true }); + existsSpy.mockRestore(); + configSpy.mockRestore(); + }); + + test('logs error and exits when spawn emits error (non-windows hint)', async () => { + const existsSpy = spyOn(fs, 'existsSync').mockReturnValue(true); + const exitSpy = spyOn(process, 'exit').mockImplementation((() => {}) as () => never); + const errorSpy = spyOn(console, 'error').mockImplementation(() => {}); + + const fakeChild = { + unref: mock(() => {}), + on: mock((event: string, cb: (err: Error) => void) => { + if (event === 'error') cb(new Error('spawn ENOENT')); + }), + }; + const spawnMock = mock(() => fakeChild); + + globalThis.fetch = mock( + async () => new Response('[]', { status: 200 }), + ) as unknown as typeof fetch; + + const configModule = await import('../config'); + const configSpy = spyOn(configModule, 'loadConfig').mockReturnValue({ + ...configModule.DEFAULT_CONFIG, + }); + + const { startServer } = await import('./http'); + await startServer(10000, spawnMock as never); + + expect(errorSpy).toHaveBeenCalledWith(expect.stringContaining('failed to start server')); + expect(errorSpy).toHaveBeenCalledWith(expect.not.stringContaining('Node.js must be on PATH')); + expect(exitSpy).toHaveBeenCalledWith(1); + + existsSpy.mockRestore(); + exitSpy.mockRestore(); + errorSpy.mockRestore(); + configSpy.mockRestore(); + }); + + test('logs error with win32 hint when spawn emits error on win32', async () => { + const existsSpy = spyOn(fs, 'existsSync').mockReturnValue(true); + const exitSpy = spyOn(process, 'exit').mockImplementation((() => {}) as () => never); + const errorSpy = spyOn(console, 'error').mockImplementation(() => {}); + + const fakeChild = { + unref: mock(() => {}), + on: mock((event: string, cb: (err: Error) => void) => { + if (event === 'error') cb(new Error('spawn ENOENT')); + }), + }; + const spawnMock = mock(() => fakeChild); + + globalThis.fetch = mock( + async () => new Response('[]', { status: 200 }), + ) as unknown as typeof fetch; + + const configModule = await import('../config'); + const configSpy = spyOn(configModule, 'loadConfig').mockReturnValue({ + ...configModule.DEFAULT_CONFIG, + }); + + const origPlatform = process.platform; + Object.defineProperty(process, 'platform', { value: 'win32', configurable: true }); + + const { startServer } = await import('./http'); + await startServer(10000, spawnMock as never); + + expect(errorSpy).toHaveBeenCalledWith(expect.stringContaining('Node.js must be on PATH')); + expect(exitSpy).toHaveBeenCalledWith(1); + + Object.defineProperty(process, 'platform', { value: origPlatform, configurable: true }); + existsSpy.mockRestore(); + exitSpy.mockRestore(); + errorSpy.mockRestore(); + configSpy.mockRestore(); + }); + test('exits with error when server does not start within timeout', async () => { const existsSpy = spyOn(fs, 'existsSync').mockReturnValue(true); const spawnMock = mock(() => ({ unref: () => {}, on: () => {} })); diff --git a/src/client/index.ts b/src/client/index.ts index 8a4485f..142e480 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -114,7 +114,7 @@ const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; let ws: WebSocket; function connect(): void { - const wsUrl = `${protocol}//${window.location.host}/ws/${sessionId}?cols=${term.cols}&rows=${term.rows}`; + const wsUrl = `${protocol}//${window.location.host}/ws/${sessionId}/pty?cols=${term.cols}&rows=${term.rows}`; ws = new WebSocket(wsUrl); const DIM = '\x1b[2m', diff --git a/src/server/routes.test.ts b/src/server/routes.test.ts index 496e488..8c3b8e7 100644 --- a/src/server/routes.test.ts +++ b/src/server/routes.test.ts @@ -195,6 +195,43 @@ describe('server — routes', () => { expect(res.status).toBe(404); }); + test('POST /s/:id/publish returns 404 for unknown session', async () => { + const res = await fetch(`${baseUrl}/s/no-such-session/publish`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ type: 'test' }), + }); + expect(res.status).toBe(404); + }); + + test('POST /s/:id/publish returns 400 for wrong content-type', async () => { + await fetch(`${baseUrl}/api/sessions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ id: 'publish-ct-test' }), + }); + const res = await fetch(`${baseUrl}/s/publish-ct-test/publish`, { + method: 'POST', + headers: { 'Content-Type': 'text/plain' }, + body: 'hello', + }); + expect(res.status).toBe(400); + }); + + test('POST /s/:id/publish returns 409 when PTY not running', async () => { + await fetch(`${baseUrl}/api/sessions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ id: 'publish-no-pty' }), + }); + const res = await fetch(`${baseUrl}/s/publish-no-pty/publish`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ type: 'test' }), + }); + expect(res.status).toBe(409); + }); + test('POST /api/server/stop returns 200 and stops server', async () => { const res = await fetch(`${baseUrl}/api/server/stop`, { method: 'POST' }); expect(res.status).toBe(200); diff --git a/src/server/routes.ts b/src/server/routes.ts index 3da4a41..c9204a6 100644 --- a/src/server/routes.ts +++ b/src/server/routes.ts @@ -11,7 +11,7 @@ import { setLastUsedId, } from './session'; import { serveFile } from './static'; -import { closeSession } from './websocket'; +import { broadcastToSubscribers, closeSession } from './websocket'; const MAX_BODY = 64 * 1024; @@ -244,6 +244,67 @@ export async function handleRequest( return; } + const publishMatch = pathname.match(/^\/s\/([^/]+)\/publish$/); + if (req.method === 'POST' && publishMatch) { + const id = decodeId(publishMatch[1]); + if (!id) { + res.writeHead(400); + res.end('Bad Request'); + return; + } + if (!(req.headers['content-type'] ?? '').startsWith('application/json')) { + res.writeHead(400); + res.end('Bad Request'); + return; + } + const session = sessionRegistry.get(id); + if (!session) { + res.writeHead(404); + res.end('Not Found'); + return; + } + if (session.pty === null) { + res.writeHead(409); + res.end('PTY not running'); + return; + } + let buf = ''; + req.on('data', (chunk: Buffer) => { + buf += chunk.toString('utf8'); + const lines = buf.split('\n'); + buf = lines.pop() ?? ''; + if (buf.length > MAX_BODY) buf = ''; + for (const line of lines) { + const trimmed = line.replace(/\r$/, ''); + if (!trimmed) continue; + try { + JSON.parse(trimmed); + broadcastToSubscribers(session, trimmed); + } catch { + // skip invalid JSON lines + } + } + }); + req.on('end', () => { + const trimmed = buf.replace(/\r$/, ''); + if (trimmed) { + try { + JSON.parse(trimmed); + broadcastToSubscribers(session, trimmed); + } catch { + // skip invalid JSON + } + } + res.writeHead(204); + res.end(); + }); + req.on('error', () => { + res.writeHead(500); + res.end(); + }); + return; + } + const pidMatch = pathname.match(/^\/p\/(\d+)$/); if (req.method === 'GET' && pidMatch) { const pid = parseInt(pidMatch[1], 10); diff --git a/src/server/session.ts b/src/server/session.ts index 6fabaff..919d4cc 100644 --- a/src/server/session.ts +++ b/src/server/session.ts @@ -11,6 +11,8 @@ export interface Session { pty: PtyProcess | null; /** All currently connected WebSocket clients for this session. */ clients: Set; + /** All currently connected event subscribers for this session. */ + subscribers: Set; /** Accumulated PTY output retained for replay when a new client joins. */ scrollback: string; } @@ -61,6 +63,7 @@ export function createSession(id: string): Session { createdAt: Date.now(), pty: null, clients: new Set(), + subscribers: new Set(), scrollback: '', }; sessionRegistry.set(id, session); diff --git a/src/server/websocket.test.ts b/src/server/websocket.test.ts index 8cd61b8..7f05f9a 100644 --- a/src/server/websocket.test.ts +++ b/src/server/websocket.test.ts @@ -114,7 +114,7 @@ describe('websocket', () => { }); test('rejects connection for non-existent session with code 4001', async () => { - const ws = new WebSocket(`${wsBase}/ws/no-such-session`); + const ws = new WebSocket(`${wsBase}/ws/no-such-session/pty`); const code = await new Promise((resolve) => ws.on('close', (c) => resolve(c))); expect(code).toBe(4001); }); @@ -126,7 +126,7 @@ describe('websocket', () => { body: JSON.stringify({ id: 'ws-test-banner' }), }); - const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-banner?cols=80&rows=24`); + const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-banner/pty?cols=80&rows=24`); await waitForMessages(messages, 1); await closeWs(ws); @@ -141,13 +141,13 @@ describe('websocket', () => { }); const { ws: ws1, messages: m1 } = await connectWs( - `${wsBase}/ws/ws-test-replay?cols=80&rows=24`, + `${wsBase}/ws/ws-test-replay/pty?cols=80&rows=24`, ); await waitForMessages(m1, 1); await closeWs(ws1); const { ws: ws2, messages: m2 } = await connectWs( - `${wsBase}/ws/ws-test-replay?cols=80&rows=24`, + `${wsBase}/ws/ws-test-replay/pty?cols=80&rows=24`, ); await waitForMessages(m2, 1); await closeWs(ws2); @@ -165,12 +165,12 @@ describe('websocket', () => { }); const { ws: ws1, messages: m1 } = await connectWs( - `${wsBase}/ws/ws-test-fanout?cols=80&rows=24`, + `${wsBase}/ws/ws-test-fanout/pty?cols=80&rows=24`, ); await waitForMessages(m1, 1); const { ws: ws2, messages: m2 } = await connectWs( - `${wsBase}/ws/ws-test-fanout?cols=80&rows=24`, + `${wsBase}/ws/ws-test-fanout/pty?cols=80&rows=24`, ); await waitForMessages(m2, 1); @@ -194,7 +194,7 @@ describe('websocket', () => { body: JSON.stringify({ id: 'ws-test-exit' }), }); - const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-exit?cols=80&rows=24`); + const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-exit/pty?cols=80&rows=24`); await waitForMessages(messages, 1); const closeCode = new Promise((resolve) => ws.on('close', (code) => resolve(code))); @@ -212,7 +212,7 @@ describe('websocket', () => { body: JSON.stringify({ id: 'ws-test-resize' }), }); - const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-resize?cols=80&rows=24`); + const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-resize/pty?cols=80&rows=24`); await waitForMessages(messages, 1); await waitForPrompt(messages); @@ -232,7 +232,7 @@ describe('websocket', () => { body: JSON.stringify({ id: 'ws-test-pid-route' }), }); - const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-pid-route?cols=80&rows=24`); + const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-pid-route/pty?cols=80&rows=24`); await waitForMessages(messages, 1); await closeWs(ws); @@ -249,6 +249,130 @@ describe('websocket', () => { expect(res.headers.get('location')).toBe('/s/ws-test-pid-route'); }); + test('GET /ws/:id/events rejects with 4001 for non-existent session', async () => { + const ws = new WebSocket(`${wsBase}/ws/no-such-session/events`); + const code = await new Promise((resolve) => ws.on('close', (c) => resolve(c))); + expect(code).toBe(4001); + }); + + test('GET /ws/:id/events rejects with 4002 when PTY not running', async () => { + await fetch(`${baseUrl}/api/sessions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ id: 'events-no-pty' }), + }); + const ws = new WebSocket(`${wsBase}/ws/events-no-pty/events`); + const code = await new Promise((resolve) => ws.on('close', (c) => resolve(c))); + expect(code).toBe(4002); + }); + + test('subscriber receives published event', async () => { + await fetch(`${baseUrl}/api/sessions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ id: 'events-pubsub' }), + }); + const { ws: ptyWs, messages: ptyMessages } = await connectWs( + `${wsBase}/ws/events-pubsub/pty?cols=80&rows=24`, + ); + await waitForMessages(ptyMessages, 1); + + const { ws: subWs, messages: subMessages } = await connectWs( + `${wsBase}/ws/events-pubsub/events`, + ); + + await fetch(`${baseUrl}/s/events-pubsub/publish`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ type: 'test', value: 42 }), + }); + + await waitForMessages(subMessages, 1); + expect(JSON.parse(subMessages[0])).toEqual({ type: 'test', value: 42 }); + + await closeWs(subWs); + await closeWs(ptyWs); + }); + + test('multi-line publish delivers one frame per line', async () => { + await fetch(`${baseUrl}/api/sessions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ id: 'events-multiline' }), + }); + const { ws: ptyWs, messages: ptyMessages } = await connectWs( + `${wsBase}/ws/events-multiline/pty?cols=80&rows=24`, + ); + await waitForMessages(ptyMessages, 1); + + const { ws: subWs, messages: subMessages } = await connectWs( + `${wsBase}/ws/events-multiline/events`, + ); + + await fetch(`${baseUrl}/s/events-multiline/publish`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: [{ seq: 1 }, { seq: 2 }, { seq: 3 }].map((o) => JSON.stringify(o)).join('\n'), + }); + + await waitForMessages(subMessages, 3); + expect(JSON.parse(subMessages[0])).toEqual({ seq: 1 }); + expect(JSON.parse(subMessages[1])).toEqual({ seq: 2 }); + expect(JSON.parse(subMessages[2])).toEqual({ seq: 3 }); + + await closeWs(subWs); + await closeWs(ptyWs); + }); + + test('invalid JSON lines are silently skipped', async () => { + await fetch(`${baseUrl}/api/sessions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ id: 'events-skip-invalid' }), + }); + const { ws: ptyWs, messages: ptyMessages } = await connectWs( + `${wsBase}/ws/events-skip-invalid/pty?cols=80&rows=24`, + ); + await waitForMessages(ptyMessages, 1); + + const { ws: subWs, messages: subMessages } = await connectWs( + `${wsBase}/ws/events-skip-invalid/events`, + ); + + await fetch(`${baseUrl}/s/events-skip-invalid/publish`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: [JSON.stringify({ seq: 1 }), 'not valid json', JSON.stringify({ seq: 2 })].join('\n'), + }); + + await waitForMessages(subMessages, 2); + expect(JSON.parse(subMessages[0])).toEqual({ seq: 1 }); + expect(JSON.parse(subMessages[1])).toEqual({ seq: 2 }); + expect(subMessages.length).toBe(2); + + await closeWs(subWs); + await closeWs(ptyWs); + }); + + test('subscribers are closed with 4001 when shell exits', async () => { + await fetch(`${baseUrl}/api/sessions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ id: 'events-shell-exit' }), + }); + const { ws: ptyWs, messages: ptyMessages } = await connectWs( + `${wsBase}/ws/events-shell-exit/pty?cols=80&rows=24`, + ); + await waitForMessages(ptyMessages, 1); + await waitForPrompt(ptyMessages); + + const { ws: subWs } = await connectWs(`${wsBase}/ws/events-shell-exit/events`); + const closeCode = new Promise((resolve) => subWs.on('close', (c) => resolve(c))); + + ptyWs.send(`exit${NL}`); + expect(await closeCode).toBe(4001); + }); + test('server shuts down when last session exits', async () => { await fetch(`${baseUrl}/api/sessions`, { method: 'POST', @@ -263,7 +387,7 @@ describe('websocket', () => { } } - const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-last?cols=80&rows=24`); + const { ws, messages } = await connectWs(`${wsBase}/ws/ws-test-last/pty?cols=80&rows=24`); await waitForMessages(messages, 1); ws.send(`exit${NL}`); diff --git a/src/server/websocket.ts b/src/server/websocket.ts index b787c25..b1888b3 100644 --- a/src/server/websocket.ts +++ b/src/server/websocket.ts @@ -10,11 +10,14 @@ const WS_CLOSE = { SERVER_STOPPED: 1001 as const, // RFC 6455: server going away BAD_REQUEST: 1008 as const, // RFC 6455: policy violation / bad data SESSION_GONE: 4001 as const, // app-level: session deleted or shell exited + PTY_NOT_RUNNING: 4002 as const, // app-level: PTY not running } as const; function closeClients(session: Session, code: number, reason: string): void { session.pty?.kill(); for (const client of session.clients) client.close(code, reason); + for (const sub of session.subscribers) sub.close(code, reason); + session.subscribers.clear(); } /** @@ -33,6 +36,18 @@ export function closeAllSessions(): void { } } +/** + * Sends `line` to all open event subscribers of `session`. + * + * @param session - The session whose subscribers to notify. + * @param line - The line to send. + */ +export function broadcastToSubscribers(session: Session, line: string): void { + for (const sub of session.subscribers) { + if (sub.readyState === sub.OPEN) sub.send(line); + } +} + let onLastSessionClosed: (() => void) | null = null; /** @@ -87,9 +102,27 @@ function sessionBanner(): string { ].join(''); } +function handleSubscribe(ws: WS, id: string): void { + const session = sessionRegistry.get(id); + if (!session) { + ws.close(WS_CLOSE.SESSION_GONE, 'session deleted'); + return; + } + if (session.pty === null) { + ws.close(WS_CLOSE.PTY_NOT_RUNNING, 'PTY not running'); + return; + } + session.subscribers.add(ws); + ws.on('close', () => { + session.subscribers.delete(ws); + }); + ws.on('error', () => {}); +} + /** * Attaches a WebSocket server to `httpServer`, handling PTY I/O, session management, - * scrollback replay, and terminal resize for all `/ws/:id` connections. + * scrollback replay, and terminal resize for all `/ws/:id/pty` connections, + * and event subscriptions for `/ws/:id/events` connections. * * @param httpServer - The HTTP server to attach the WebSocket server to. * @returns The configured {@link WebSocketServer}. @@ -99,7 +132,10 @@ export function createWebSocketServer(httpServer: http.Server): WebSocketServer httpServer.on('upgrade', (req, socket, head) => { const url = new URL(req.url ?? '/', `http://${req.headers.host ?? '127.0.0.1'}`); - if (url.pathname.match(/^\/ws\/([^/]+)$/)) { + if ( + url.pathname.match(/^\/ws\/([^/]+)\/pty$/) || + url.pathname.match(/^\/ws\/([^/]+)\/events$/) + ) { wss.handleUpgrade(req, socket, head, (ws) => wss.emit('connection', ws, req)); } else { socket.destroy(); @@ -108,19 +144,28 @@ export function createWebSocketServer(httpServer: http.Server): WebSocketServer wss.on('connection', (ws: WS, req: http.IncomingMessage) => { const url = new URL(req.url ?? '/', `http://${req.headers.host ?? '127.0.0.1'}`); - const wsMatch = url.pathname.match(/^\/ws\/([^/]+)$/); - if (!wsMatch) { + const ptyMatch = url.pathname.match(/^\/ws\/([^/]+)\/pty$/); + const eventsMatch = url.pathname.match(/^\/ws\/([^/]+)\/events$/); + const match = ptyMatch ?? eventsMatch; + + if (!match) { ws.close(); return; } let id: string; try { - id = decodeURIComponent(wsMatch[1]); + id = decodeURIComponent(match[1]); } catch { ws.close(WS_CLOSE.BAD_REQUEST, 'Bad Request'); return; } + + if (eventsMatch) { + handleSubscribe(ws, id); + return; + } + const cols = Math.max( 1, Math.min(1000, Number.parseInt(url.searchParams.get('cols') ?? '80', 10) || 80), @@ -163,6 +208,12 @@ export function createWebSocketServer(httpServer: http.Server): WebSocketServer client.close(WS_CLOSE.SESSION_GONE, 'shell exited'); } } + for (const sub of session.subscribers) { + if (sub.readyState === sub.OPEN) { + sub.close(WS_CLOSE.SESSION_GONE, 'shell exited'); + } + } + session.subscribers.clear(); session.pty = null; if (sessionRegistry.size === 0) onLastSessionClosed?.(); });