Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions apps/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type TokenOverride,
} from '@anarchitecture/summon/engine';
import {
createProtocolLineWriter,
resolveSurfaceGenerationPlan,
runSurfaceGeneration,
summarizeContractIssues,
Expand Down Expand Up @@ -436,8 +437,8 @@ app.get('/api/ghost-roots', (_req, res) => {
});

/**
* Streams LLM output as raw text — the client parses JSONL out of it. Each
* completed newline-terminated line should be one protocol message.
* Streams hardened Summon JSONL. Each completed newline-terminated line is one
* protocol message that has passed through the server generation lifecycle.
*/
app.post('/api/generate', async (req, res) => {
const prompt = typeof req.body?.prompt === 'string' ? req.body.prompt.trim() : '';
Expand Down Expand Up @@ -657,6 +658,16 @@ app.post('/api/generate', async (req, res) => {
});
}

const responseAbort = new AbortController();
res.once('close', () => {
if (!res.writableEnded) {
responseAbort.abort(new Error('client disconnected'));
}
});
const writeProtocolLine = createProtocolLineWriter(res, {
signal: responseAbort.signal,
});

await withConcurrencyCap(async () => {
try {
let usage: AnthropicUsageSnapshot | null = null;
Expand Down Expand Up @@ -689,12 +700,11 @@ app.post('/api/generate', async (req, res) => {
activeTokensCss: ghostContext?.tokenSource.css ?? direction?.tokensCss ?? null,
preludeLines,
repair,
signal: responseAbort.signal,
modelProvider: (request) => streamAnthropicGeneration(request, (nextUsage) => {
usage = nextUsage;
}),
}, (line) => {
res.write(`${JSON.stringify(line)}\n`);
});
}, writeProtocolLine);

if (ghostContext) {
const reviewLine: ProtocolLine = {
Expand All @@ -709,7 +719,7 @@ app.post('/api/generate', async (req, res) => {
prompt,
}),
};
res.write(`${JSON.stringify(reviewLine)}\n`);
await writeProtocolLine(reviewLine);
}
const finalUsage = usage ?? {
input_tokens: 0,
Expand All @@ -736,12 +746,18 @@ app.post('/api/generate', async (req, res) => {
` cache_read=${finalUsage.cache_read_input_tokens ?? 0}` +
` cache_write=${finalUsage.cache_creation_input_tokens ?? 0}`
);
res.end();
if (!res.writableEnded && !res.destroyed) res.end();
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error('[generate] error:', msg);
res.write(`${JSON.stringify({ op: 'meta', path: '/error', value: msg } satisfies ProtocolLine)}\n`);
res.end();
if (!res.writableEnded && !res.destroyed) {
try {
await writeProtocolLine({ op: 'meta', path: '/error', value: msg });
} catch {
// Response closed while reporting the error.
}
}
if (!res.writableEnded && !res.destroyed) res.end();
}
});
});
Expand Down
21 changes: 16 additions & 5 deletions docs/adoption/integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ streamed JSONL, optionally retry invalid sections, and emit diagnostics.

```ts
import {
createProtocolLineWriter,
runSurfaceGeneration,
type SummonModelProvider,
} from '@anarchitecture/summon-server';
Expand All @@ -156,6 +157,11 @@ const modelProvider: SummonModelProvider = async function* ({ prompt, promptBloc
yield* callYourModel({ prompt, promptBlocks });
};

const abortController = new AbortController();
// Wire this to your HTTP request/response close handling.
const signal = abortController.signal;
const writeProtocolLine = createProtocolLineWriter(response, { signal });

await runSurfaceGeneration({
prompt,
modelProvider,
Expand All @@ -170,11 +176,13 @@ await runSurfaceGeneration({
preludeLines: [
{ op: 'meta', path: '/shape', value: shape },
],
}, (line) => {
response.write(`${JSON.stringify(line)}\n`);
});
signal,
}, writeProtocolLine);
```

`createProtocolLineWriter()` serializes accepted Summon protocol lines as JSONL
and waits for writable backpressure before generation continues.

To enable validation retries, pass
`repair: { enabled: true, provider, maxAttempts, maxTargets }`. The provider
receives the compiled prompt blocks and a single replacement prompt; return one
Expand All @@ -183,8 +191,11 @@ replacement JSONL line for the same section path.
## 5. Render In The Sandbox

The client should let `@anarchitecture/summon` own chunk decoding, protocol
parsing, stream diagnostics, and render timing. Product hosts still own
fetching, aborts, request payloads, and product-specific meta interpretation.
parsing, stream diagnostics, and render timing for Summon-hardened JSONL
streams. Do not point `consumeSurfaceStream()` directly at raw model output;
the server runner is responsible for validation and hardening. Product hosts
still own fetching, aborts, request payloads, and product-specific meta
interpretation.

```ts
import { compileSurfacePolicy } from '@anarchitecture/summon';
Expand Down
16 changes: 11 additions & 5 deletions docs/adoption/package-consumption.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ import {
} from '@anarchitecture/summon/assets';
```

Use `consumeSurfaceStream()` to decode streamed chunks, parse accepted protocol
lines, maintain generated HTML, update stream diagnostics, and render through
the sandbox handle. Spawn the iframe with allowed host tools from host-owned
contracts.
Use `consumeSurfaceStream()` to decode Summon-hardened streamed chunks, parse
accepted protocol lines, maintain generated HTML, update stream diagnostics,
and render through the sandbox handle. Do not use it as a direct raw-model
parser; server-side hardening belongs in `runSurfaceGeneration()`. Spawn the
iframe with allowed host tools from host-owned contracts.

`compileSurfacePolicy(surfacePolicy, catalogs)` gives the client the stream
mode and narrowed contracts that the server will enforce. Generation authority
Expand Down Expand Up @@ -188,6 +189,7 @@ await consumeSurfaceStream(response.body!, {

```ts
import {
createProtocolLineWriter,
runSurfaceGeneration,
type SummonModelProvider,
} from '@anarchitecture/summon-server';
Expand All @@ -197,10 +199,14 @@ import {
prompt blocks and returns text chunks. The runner applies the surface config,
validates streamed JSONL, optionally runs targeted validation retries, emits
accepted Summon lines and diagnostics, and returns a replay summary.
Use `createProtocolLineWriter()` when writing the stream to an HTTP response so
the server waits for writable backpressure and aborts cleanly when the response
signal aborts.

`generateSurfaceStream()` remains available for existing integrations that
consume an async generator, but new servers should prefer
`runSurfaceGeneration(input, emit)`.
`runSurfaceGeneration(input, emit)`. The generator compatibility path buffers
lines internally, so it is less suitable for high-throughput HTTP serving.

## Package Gate

Expand Down
35 changes: 32 additions & 3 deletions packages/host/src/surface-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export interface SurfaceStreamOptions {
accumulator?: SectionAccumulator;
streamGraph?: StreamGraph;
renderMode?: SurfaceStreamRenderMode;
cancelOnStop?: boolean;
shouldApplyLine?: (
line: ProtocolLine,
context: SurfaceStreamContext,
Expand Down Expand Up @@ -98,6 +99,7 @@ export async function consumeSurfaceStream(
let acceptedStructuralLines = 0;
let stopped = false;
let discarded = false;
const shouldCancelSource = () => stopped && options.cancelOnStop !== false;

const context = (
raw?: string,
Expand Down Expand Up @@ -178,7 +180,7 @@ export async function consumeSurfaceStream(
};

try {
for await (const chunk of chunksFromSource(source)) {
for await (const chunk of chunksFromSource(source, shouldCancelSource)) {
if (stopped) break;
buffer += decodeChunk(chunk, decoder);
let nl = buffer.indexOf('\n');
Expand All @@ -188,6 +190,7 @@ export async function consumeSurfaceStream(
if (stopped) break;
nl = buffer.indexOf('\n');
}
if (stopped) break;
}

if (!stopped) buffer += decoder.decode();
Expand Down Expand Up @@ -219,6 +222,7 @@ export async function consumeSurfaceStream(

async function* chunksFromSource(
source: SurfaceStreamSource,
shouldCancel: () => boolean,
): AsyncGenerator<SurfaceStreamChunk, void, void> {
if (isReadableStream(source)) {
const reader = source.getReader();
Expand All @@ -229,17 +233,42 @@ async function* chunksFromSource(
if (value !== undefined) yield value;
}
} finally {
if (shouldCancel()) {
await reader.cancel().catch(() => {});
}
reader.releaseLock();
}
return;
}

if (isAsyncIterable(source)) {
for await (const chunk of source) yield chunk;
const iterator = source[Symbol.asyncIterator]();
try {
while (true) {
const next = await iterator.next();
if (next.done) return;
yield next.value;
}
} finally {
if (shouldCancel()) {
await iterator.return?.();
}
}
return;
}

for (const chunk of source) yield chunk;
const iterator = source[Symbol.iterator]();
try {
while (true) {
const next = iterator.next();
if (next.done) return;
yield next.value;
}
} finally {
if (shouldCancel()) {
iterator.return?.();
}
}
}

function decodeChunk(chunk: SurfaceStreamChunk, decoder: TextDecoder): string {
Expand Down
80 changes: 80 additions & 0 deletions packages/host/test/surface-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,86 @@ test('consumeSurfaceStream can stop before applying a line', async () => {
assert.equal(result.html, '');
});

test('consumeSurfaceStream cancels a ReadableStream when stop is returned', async () => {
let canceled = false;
const source = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(encoder.encode(
'{"op":"set","path":"/screen","value":{"sections":["hero"]}}\n' +
'{"op":"add","path":"/section/hero","html":"<p>Stop</p>"}\n' +
'{"op":"add","path":"/section/hero","html":"<p>Ignored</p>"}\n',
));
},
cancel() {
canceled = true;
},
});

const result = await consumeSurfaceStream(source, {
mode: 'static',
shouldApplyLine: (line) => line.op === 'add' ? 'stop' : 'apply',
});

assert.equal(result.stopped, true);
assert.equal(canceled, true);
});

test('consumeSurfaceStream can preserve source when cancelOnStop is false', async () => {
let canceled = false;
const source = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(encoder.encode(
'{"op":"set","path":"/screen","value":{"sections":["hero"]}}\n' +
'{"op":"add","path":"/section/hero","html":"<p>Stop</p>"}\n',
));
},
cancel() {
canceled = true;
},
});

const result = await consumeSurfaceStream(source, {
mode: 'static',
cancelOnStop: false,
shouldApplyLine: (line) => line.op === 'add' ? 'stop' : 'apply',
});

assert.equal(result.stopped, true);
assert.equal(canceled, false);
});

test('consumeSurfaceStream calls async iterator return when stop is returned', async () => {
let returned = false;
const chunks = [
'{"op":"set","path":"/screen","value":{"sections":["hero"]}}\n' +
'{"op":"add","path":"/section/hero","html":"<p>Stop</p>"}\n',
'{"op":"add","path":"/section/hero","html":"<p>Ignored</p>"}\n',
];
const source: AsyncIterable<string> = {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index >= chunks.length) return { done: true, value: undefined };
return { done: false, value: chunks[index++]! };
},
async return() {
returned = true;
return { done: true, value: undefined };
},
};
},
};

const result = await consumeSurfaceStream(source, {
mode: 'static',
shouldApplyLine: (line) => line.op === 'add' ? 'stop' : 'apply',
});

assert.equal(result.stopped, true);
assert.equal(returned, true);
});

test('consumeSurfaceStream can use supplied accumulator and graph instances', async () => {
const accumulator = new SectionAccumulator();
const streamGraph = new StreamGraph();
Expand Down
6 changes: 6 additions & 0 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export { generateSurfaceStream } from './compat.js';
export { buildEditBlock } from './edit.js';
export { resolveSurfaceGenerationPlan } from './plan.js';
export { createProtocolLineWriter } from './protocol-line-writer.js';
export { runSurfaceGeneration } from './runner.js';
export { summarizeContractIssues } from './summary.js';

Expand All @@ -21,6 +22,11 @@ export type {
SurfaceGenerationSummary,
} from './types.js';

export type {
ProtocolLineWritableTarget,
ProtocolLineWriterOptions,
} from './protocol-line-writer.js';

export type {
ContractIssue,
ContractPromptBlock,
Expand Down
Loading