Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions server/lib/http/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { getChatById } from "@/db/queries/chats.ts";
import { log } from "@/lib/utils/logger.ts";
import { isChatWsMessage, handleChatMessage } from "@/lib/http/ws-chat.ts";
import { subscribeBrowser, unsubscribeBrowser } from "@/lib/http/ws-browser.ts";
import { applyAndPersist } from "@/db/queries/canvas.ts";
import { applyAndPersist, getCanvasDoc } from "@/db/queries/canvas.ts";
import type { CanvasOp } from "@/lib/canvas/doc.ts";
import type { PiEventEnvelope } from "@/lib/pi/run-turn.ts";
import {
Expand Down Expand Up @@ -504,6 +504,10 @@ async function handleJoin(ws: WebSocket, meta: ConnectionMeta, projectId: string

wsLog.debug("user joined project", { userId: meta.userId, projectId });
send(ws, { type: "presence", users: getPresence(projectId) });
// Push the full board immediately so a (re)joining client renders the
// authoritative state at once — the same snapshot it'll receive on every
// subsequent change. This is what makes reconnects self-heal.
send(ws, { type: "canvas.state", doc: getCanvasDoc(projectId) });
broadcastPresence(projectId);
}

Expand Down Expand Up @@ -559,9 +563,32 @@ function handleTyping(ws: WebSocket, meta: ConnectionMeta, chatId: string) {
//
// The canvas is project-scoped, so it rides the existing project room.
// A client must have joined the project (membership checked in
// `handleJoin`) before its ops are accepted. Ops are applied to the
// canonical doc and rebroadcast to the room; the `origin` tag lets the
// sender ignore its own echo. Cursors are ephemeral and never persisted.
// `handleJoin`) before its commands are accepted. Each command is applied
// to the canonical doc, then the server broadcasts the *entire* board as a
// `canvas.state` snapshot. Clients render the snapshot directly, so a missed
// message, reconnect, or race can never leave anyone diverged — the next
// snapshot is the truth and overwrites them. Snapshots are coalesced on a
// short trailing delay so a burst of commands collapses into one broadcast.
// Cursors are ephemeral and never persisted.

const CANVAS_SNAPSHOT_DELAY = 40;
const canvasSnapshotTimers = new Map<string, ReturnType<typeof setTimeout>>();

/**
* Schedule a coalesced full-state snapshot broadcast for a project. Multiple
* commands within the delay window produce a single broadcast that reads the
* latest doc at fire time, so it always reflects current state.
*/
function scheduleCanvasBroadcast(projectId: string) {
if (canvasSnapshotTimers.has(projectId)) return;
const timer = setTimeout(() => {
canvasSnapshotTimers.delete(projectId);
const room = projectRooms.get(projectId);
if (!room || room.size === 0) return;
broadcastToProject(projectId, { type: "canvas.state", doc: getCanvasDoc(projectId) });
}, CANVAS_SNAPSHOT_DELAY);
canvasSnapshotTimers.set(projectId, timer);
}

function handleCanvasOp(meta: ConnectionMeta, msg: any) {
// Route by the room the client actually joined (membership was checked
Expand Down Expand Up @@ -599,23 +626,24 @@ function handleCanvasCursor(meta: ConnectionMeta, msg: any) {
}

/**
* Apply a canvas op to the project's doc, persist it, and broadcast to
* everyone in the project room. Exported so the agent-facing CLI handler
* can push edits through the exact same path — humans then see the agent
* draw live. `origin` distinguishes the author (e.g. "agent").
* Apply a canvas command to the project's doc, persist it, and schedule a
* full-state snapshot broadcast to the room. Exported so the agent-facing CLI
* handler can push edits through the exact same path — humans then see the
* agent's changes appear. `origin` is accepted for call-site compatibility but
* no longer used: snapshots are author-agnostic.
*/
export function applyCanvasOpAndBroadcast(
projectId: string,
op: CanvasOp,
origin: string,
): { changed: boolean; doc: import("@/lib/canvas/doc.ts").CanvasDoc } {
void origin;
const result = applyAndPersist(projectId, op);
if (result.changed) {
broadcastToProject(projectId, { type: "canvas.op", origin, op });
scheduleCanvasBroadcast(projectId);
}
wsLog.info("canvas.broadcast", {
projectId,
origin,
kind: (op as any).kind,
changed: result.changed,
roomSize: projectRooms.get(projectId)?.size ?? 0,
Expand Down
Loading
Loading