Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions docs/adrs/025.server.channel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# ADR 025: Server — Session-scoped client integration channel

**SPEC:** [client-integration](../specs/client-integration.md)
**Status:** Accepted
**Date:** 2026-06-08

---

## Context

Agent tools and CLI scripts that produce structured output need a way to stream results to a browser UI in real-time. The pattern is simple: one publisher (a curl call or MCP tool), N subscribers (browser tabs).

The current workaround in the Fusion project is a standalone `sync-server.ts` — a separate Bun process on a separate port (`2347`) that must be started independently. This creates unnecessary friction: two processes to manage, a hardcoded port, and app-specific code that is really a generic utility.

webtty already runs an HTTP server when `webtty go <session>` is invoked. Adding publish/subscribe endpoints to that server eliminates the separate process entirely.

---

## Decision

Add a session-scoped integration channel to the existing webtty HTTP server.

### API

| Method | Path | Description |
|--------|------|-------------|
| `POST` | `/s/:id/publish` | Read body line by line; broadcast each valid JSON line to all current subscribers as a discrete WS text frame; `204` after publisher closes; `400` if `Content-Type` is not `application/json`; `404` if session does not exist; `409` if session exists but PTY is not running |
| `GET` | `/ws/:id/events` | WebSocket upgrade — joins `session.subscribers`; receives published payloads as JSON text frames; WS close `4001` if session does not exist; WS close `4002` if PTY is not running |

One channel per session. No separate channel name or creation step — the session ID is the channel. One-shot and streaming use the same endpoint: a one-shot publisher sends one line and closes; a streaming publisher keeps the connection open and writes lines over time.

### Channel lifecycle

The channel is only available while the session's PTY is running. This keeps the channel lifecycle consistent with the terminal session.

| Event | `session.clients` (PTY) | `session.subscribers` (channel) |
|-------|-------------------------|---------------------------------|
| PTY not yet spawned | n/a | publish → `409`; subscribe → WS close `4002` (session exists but PTY not running) |
| PTY running | connected normally | connected normally |
| Shell exits | closed; session deleted | closed; session deleted |
| `DELETE /api/sessions/:id` | closed | closed |
| Server stops | closed | closed |

### Channel flow

```
Publisher webtty server Subscriber(s)
─────────────────────────────────────────────────────────────────────────
POST /s/:id/publish
body line 1 ──────────► parse JSON
broadcast ───────────────────► ws.send(line1)
body line 2 ──────────► parse JSON
broadcast ───────────────────► ws.send(line2)
(invalid line) ────────► skip silently
body line N ──────────► parse JSON
broadcast ───────────────────► ws.send(lineN)
[close] ───────────────► respond 204
```

The `204` is returned after the publisher closes the connection — not before — because the server cannot know the stream is complete until the body ends. For one-shot publishers this is instantaneous.

### Publisher usage

**One-shot** (single JSON object, close immediately):

```sh
curl -X POST http://localhost:2346/s/my-session/publish \
-H 'Content-Type: application/json' \
-d '{"type":"result","items":[...]}'
```

**Streaming** (pipe agent output line by line):

```sh
my-agent --stream | curl -X POST http://localhost:2346/s/my-session/publish \
-H 'Content-Type: application/json' \
--data-binary @-
```

### Subscriber usage

```js
const ws = new WebSocket('ws://localhost:2346/ws/my-session/events');
ws.onmessage = (e) => {
const event = JSON.parse(e.data);
// render event in UI
};
```

### Implementation — files to touch

1. **`src/server/session.ts`** — add `subscribers: Set<WS>` to the `Session` interface; initialise as `new Set()` in `createSession`
2. **`src/server/routes.ts`** — add `POST /s/:id/publish` branch: validate session exists, check `session.pty !== null` (→ `409` if not), stream body line by line via `on('data')`, buffer incomplete lines, call `broadcastToSubscribers` per valid JSON line, respond `204` on `end`
3. **`src/server/websocket.ts`**:
- Rename the existing PTY upgrade path from `/ws/:id` to `/ws/:id/pty`
- Widen the `upgrade` handler to also accept `/ws/:id/events`, routing to a new `handleSubscribe` function
- In `handleSubscribe(ws, id)` — look up session, check `session.pty !== null` (→ WS close `4002` "PTY not running" if not), add `ws` to `session.subscribers`, remove on close
- Add `broadcastToSubscribers(session, line)` — iterates `session.subscribers`, calls `ws.send(line)` for each open socket
- Update `closeSession` / `closeAllSessions` to close and clear `session.subscribers` alongside `session.clients`

---

## Reasons

### Channel requires an active PTY

The channel is only available when `session.pty !== null`. This enforces a clear mental model: the channel is a sidecar to a running terminal session, not a standalone message bus. It also simplifies lifecycle — the channel always closes with the PTY, so there are no orphaned subscribers or dangling publish requests to reason about.

### Separate subscriber set from PTY clients

`session.clients` holds WebSocket connections for PTY terminal output. `session.subscribers` holds connections for the integration channel. Keeping them as separate `Set<WS>` fields on `Session` means cleanup on session delete is automatic and neither set interferes with the other.

### Line framing — one WS frame per line

WebSocket is message-based, so the server needs a framing rule. Line-by-line (newline as delimiter) is the simplest framing that works for both one-shot and streaming publishers without a content-type distinction. The subscriber always receives one complete JSON object per frame regardless of how the publisher sent it.

### No content-type distinction between one-shot and streaming

Requiring `application/x-ndjson` for streaming adds friction with no benefit — the server reads line-by-line either way. Both use `Content-Type: application/json`.

### 204 after publisher closes

Holding the HTTP response until the publisher closes is correct for streaming: the server cannot know the publish is complete until the body stream ends. For one-shot publishers this is instantaneous; for streaming publishers the caller's `fetch` / `curl` naturally awaits the response after the pipe closes.

### Silent skip on invalid JSON lines

Closing the publish connection on a single malformed line would interrupt an otherwise healthy stream. Agents occasionally emit partial or diagnostic lines that are not JSON. Skipping silently is more robust.

### Session as the natural channel namespace

Both publisher and subscriber already share the session ID. Session-scoped routing removes the need for out-of-band channel name coordination.

---

## Considered Options

### Option A: Named channels (`/channel/:name/*`) instead of session-scoped

More flexible — multiple independent channels per instance — but adds a second naming dimension and requires coordination between publisher and subscriber beyond the session ID.

Rejected — one channel per session covers all current use cases. Named channels can be layered on top later.

### Option B: Separate standalone server (current `sync-server.ts`)

Works but requires a second process, a second port, and duplicates the same ~90-line pattern across projects.

Rejected — the code is generic enough to live in webtty once.

### Option C: Content-type distinction for streaming vs one-shot

`application/json` for one-shot, `application/x-ndjson` for streaming. Adds publisher friction and a server branch with no benefit.

Rejected — one endpoint, one content-type.

### Option D: SSE instead of WebSocket for subscribe

Simpler for one-way streaming and works natively with `EventSource`. However, webtty already has WebSocket infrastructure and browser clients already use WebSocket.

Rejected for now — SSE can be added as an additional endpoint later if needed.

---

## Consequences

- `apps/fusion/sync-server.ts` can be deleted; Fusion points its agent push at `POST /s/:id/publish` and its UI at `ws://host/ws/:id/events`.
- The PTY WebSocket path changes from `/ws/:id` to `/ws/:id/pty` — server and browser client ship together so this is an internal refactor, not a breaking change.
- No auth on publish — callers must be on the same host or behind the same network boundary as webtty (consistent with the existing session API).
- The `upgrade` handler in `websocket.ts` must be updated to route `/ws/:id/pty` (PTY) and `/ws/:id/events` (channel) separately; anything else is destroyed as before.
- `session.subscribers` must be closed and cleared in `closeSession` and `closeAllSessions` alongside the existing `session.clients` handling.
69 changes: 69 additions & 0 deletions docs/specs/client-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# SPEC: Client Integration (CLI → Web)

**Last Updated:** 2026-06-08

---

## Description

A pattern for pushing live updates from a CLI tool or agent into a browser UI — without running a separate server process.

**Persona:** Developers building CLI agents or tools that produce structured output and want to surface that output in a browser UI in real-time.

webtty's HTTP server is already running whenever a session is open. The integration channel piggybacks on that server so any CLI process can publish structured events and any number of browser tabs receive them instantly — no extra port, no extra process.

---

## Architecture

```
┌─────────────────┐ ws /ws/:id/pty ┌──────────────────────┐
│ Browser tab │ ◄───────────────────────────────── │ │
│ (terminal) │ ─────────────────────────────────► │ session.clients │
└─────────────────┘ keyboard / resize └──────────────────────┘

┌─────────────────┐ POST /s/:id/publish ┌──────────────────────┐
│ CLI / Agent │ ─────────────────────────────────► │ │
│ (publisher) │ │ webtty server │
└─────────────────┘ │ │
│ session.subscribers │
┌─────────────────┐ ws /ws/:id/events │ │
│ Browser panel │ ◄───────────────────────────────── │ │
│ (subscriber) │ one WS frame per event │ │
└─────────────────┘ └──────────────────────┘
```

Subscribers (integration channel) and terminal clients (PTY) are independent. A browser tab can be one, the other, or both.

---

## Use Cases

### Agent streaming results to a UI

An AI agent or search tool runs in the terminal and emits structured results — search hits, status updates, token streams — that a browser panel renders as they arrive. The agent publishes to the session channel; the browser subscribes.

### Replacing a bespoke sync server

Projects like Fusion today ship a standalone `sync-server.ts` on a separate port that must be started independently. The session channel replaces it: one webtty process, one port, zero extra setup.

---

## How It Works

1. `bunx webtty go my-session` — the only process to start
2. A terminal client connects to `/ws/:id/pty` — this spawns the PTY and activates the channel
3. Browser panels subscribe via `/ws/:id/events` (requires an active PTY)
4. CLI tools or agents POST JSON to `/s/:id/publish` — one-shot or as a stream of lines
5. Each JSON line is broadcast to all subscribers as a discrete WebSocket frame as it arrives

For interface details, channel flow, and API reference see [ADR 025](../adrs/025.server.channel.md).

---

## Features

| Feature | Description | ADR | Done? |
|---------|-------------|-----|-------|
| Session channel — publish | CLI tools POST JSON (one-shot or streaming) to the session; each event broadcast to subscribers in real-time | [ADR 025](../adrs/025.server.channel.md) | ✅ |
| Session channel — subscribe | Browser panels subscribe via WebSocket and receive one JSON object per frame | [ADR 025](../adrs/025.server.channel.md) | ✅ |
1 change: 1 addition & 0 deletions docs/specs/webtty.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ Session IDs appear directly in the URL path (`/s/:id`), so they must be valid UR
| Default session | `GET /` redirects to last-used session, or creates `main` if none exists | — | ✅ |
| Multi-client sessions | Multiple browser tabs can attach to the same session; PTY output broadcast to all; scrollback replayed on reconnect | [ADR 007](../adrs/007.webtty.session-client.md) | ✅ |
| Config file | Shell, port, font, theme from `~/.config/webtty/config.json`; hot-reload on tab reload | [ADR 008](../adrs/008.webtty.config.md) | ✅ |
| Client integration (CLI → Web) | `POST /s/:id/publish` (one-shot or streaming JSON) + `ws /ws/:id/events` subscribe; channel active only while PTY is running; no extra process or port | [ADR 025](../adrs/025.server.channel.md) | ✅ |
101 changes: 101 additions & 0 deletions src/cli/http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,107 @@ describe('startServer', () => {
configSpy.mockRestore();
});

test('uses node executor on win32 with Bun', async () => {
const existsSpy = spyOn(fs, 'existsSync').mockReturnValue(true);
const fakeChild = { unref: mock(() => {}), on: mock(() => {}) };
const spawnMock = mock(() => fakeChild);

globalThis.fetch = mock(
async () => new Response('[]', { status: 200 }),
) as unknown as typeof fetch;

const configModule = await import('../config');
const configSpy = spyOn(configModule, 'loadConfig').mockReturnValue({
...configModule.DEFAULT_CONFIG,
});

const origPlatform = process.platform;
Object.defineProperty(process, 'platform', { value: 'win32', configurable: true });

const { startServer } = await import('./http');
await startServer(10000, spawnMock as never);

const [exec] = spawnMock.mock.calls[0] as unknown as [string, string[]];
expect(exec).toBe('node');

Object.defineProperty(process, 'platform', { value: origPlatform, configurable: true });
existsSpy.mockRestore();
configSpy.mockRestore();
});

test('logs error and exits when spawn emits error (non-windows hint)', async () => {
const existsSpy = spyOn(fs, 'existsSync').mockReturnValue(true);
const exitSpy = spyOn(process, 'exit').mockImplementation((() => {}) as () => never);
const errorSpy = spyOn(console, 'error').mockImplementation(() => {});

const fakeChild = {
unref: mock(() => {}),
on: mock((event: string, cb: (err: Error) => void) => {
if (event === 'error') cb(new Error('spawn ENOENT'));
}),
};
const spawnMock = mock(() => fakeChild);

globalThis.fetch = mock(
async () => new Response('[]', { status: 200 }),
) as unknown as typeof fetch;

const configModule = await import('../config');
const configSpy = spyOn(configModule, 'loadConfig').mockReturnValue({
...configModule.DEFAULT_CONFIG,
});

const { startServer } = await import('./http');
await startServer(10000, spawnMock as never);

expect(errorSpy).toHaveBeenCalledWith(expect.stringContaining('failed to start server'));
expect(errorSpy).toHaveBeenCalledWith(expect.not.stringContaining('Node.js must be on PATH'));
expect(exitSpy).toHaveBeenCalledWith(1);

existsSpy.mockRestore();
exitSpy.mockRestore();
errorSpy.mockRestore();
configSpy.mockRestore();
});

test('logs error with win32 hint when spawn emits error on win32', async () => {
const existsSpy = spyOn(fs, 'existsSync').mockReturnValue(true);
const exitSpy = spyOn(process, 'exit').mockImplementation((() => {}) as () => never);
const errorSpy = spyOn(console, 'error').mockImplementation(() => {});

const fakeChild = {
unref: mock(() => {}),
on: mock((event: string, cb: (err: Error) => void) => {
if (event === 'error') cb(new Error('spawn ENOENT'));
}),
};
const spawnMock = mock(() => fakeChild);

globalThis.fetch = mock(
async () => new Response('[]', { status: 200 }),
) as unknown as typeof fetch;

const configModule = await import('../config');
const configSpy = spyOn(configModule, 'loadConfig').mockReturnValue({
...configModule.DEFAULT_CONFIG,
});

const origPlatform = process.platform;
Object.defineProperty(process, 'platform', { value: 'win32', configurable: true });

const { startServer } = await import('./http');
await startServer(10000, spawnMock as never);

expect(errorSpy).toHaveBeenCalledWith(expect.stringContaining('Node.js must be on PATH'));
expect(exitSpy).toHaveBeenCalledWith(1);

Object.defineProperty(process, 'platform', { value: origPlatform, configurable: true });
existsSpy.mockRestore();
exitSpy.mockRestore();
errorSpy.mockRestore();
configSpy.mockRestore();
});

test('exits with error when server does not start within timeout', async () => {
const existsSpy = spyOn(fs, 'existsSync').mockReturnValue(true);
const spawnMock = mock(() => ({ unref: () => {}, on: () => {} }));
Expand Down
2 changes: 1 addition & 1 deletion src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
let ws: WebSocket;

function connect(): void {
const wsUrl = `${protocol}//${window.location.host}/ws/${sessionId}?cols=${term.cols}&rows=${term.rows}`;
const wsUrl = `${protocol}//${window.location.host}/ws/${sessionId}/pty?cols=${term.cols}&rows=${term.rows}`;
ws = new WebSocket(wsUrl);

const DIM = '\x1b[2m',
Expand Down
Loading
Loading