diff --git a/server/lib/http/ws.ts b/server/lib/http/ws.ts index 460191a..fa435bd 100644 --- a/server/lib/http/ws.ts +++ b/server/lib/http/ws.ts @@ -9,7 +9,7 @@ import { getChatById } from "@/db/queries/chats.ts"; import { log } from "@/lib/utils/logger.ts"; import { isChatWsMessage, handleChatMessage } from "@/lib/http/ws-chat.ts"; import { subscribeBrowser, unsubscribeBrowser } from "@/lib/http/ws-browser.ts"; -import { applyAndPersist } from "@/db/queries/canvas.ts"; +import { applyAndPersist, getCanvasDoc } from "@/db/queries/canvas.ts"; import type { CanvasOp } from "@/lib/canvas/doc.ts"; import type { PiEventEnvelope } from "@/lib/pi/run-turn.ts"; import { @@ -504,6 +504,10 @@ async function handleJoin(ws: WebSocket, meta: ConnectionMeta, projectId: string wsLog.debug("user joined project", { userId: meta.userId, projectId }); send(ws, { type: "presence", users: getPresence(projectId) }); + // Push the full board immediately so a (re)joining client renders the + // authoritative state at once — the same snapshot it'll receive on every + // subsequent change. This is what makes reconnects self-heal. + send(ws, { type: "canvas.state", doc: getCanvasDoc(projectId) }); broadcastPresence(projectId); } @@ -559,9 +563,32 @@ function handleTyping(ws: WebSocket, meta: ConnectionMeta, chatId: string) { // // The canvas is project-scoped, so it rides the existing project room. // A client must have joined the project (membership checked in -// `handleJoin`) before its ops are accepted. Ops are applied to the -// canonical doc and rebroadcast to the room; the `origin` tag lets the -// sender ignore its own echo. Cursors are ephemeral and never persisted. +// `handleJoin`) before its commands are accepted. Each command is applied +// to the canonical doc, then the server broadcasts the *entire* board as a +// `canvas.state` snapshot. Clients render the snapshot directly, so a missed +// message, reconnect, or race can never leave anyone diverged — the next +// snapshot is the truth and overwrites them. Snapshots are coalesced on a +// short trailing delay so a burst of commands collapses into one broadcast. +// Cursors are ephemeral and never persisted. + +const CANVAS_SNAPSHOT_DELAY = 40; +const canvasSnapshotTimers = new Map>(); + +/** + * Schedule a coalesced full-state snapshot broadcast for a project. Multiple + * commands within the delay window produce a single broadcast that reads the + * latest doc at fire time, so it always reflects current state. + */ +function scheduleCanvasBroadcast(projectId: string) { + if (canvasSnapshotTimers.has(projectId)) return; + const timer = setTimeout(() => { + canvasSnapshotTimers.delete(projectId); + const room = projectRooms.get(projectId); + if (!room || room.size === 0) return; + broadcastToProject(projectId, { type: "canvas.state", doc: getCanvasDoc(projectId) }); + }, CANVAS_SNAPSHOT_DELAY); + canvasSnapshotTimers.set(projectId, timer); +} function handleCanvasOp(meta: ConnectionMeta, msg: any) { // Route by the room the client actually joined (membership was checked @@ -599,23 +626,24 @@ function handleCanvasCursor(meta: ConnectionMeta, msg: any) { } /** - * Apply a canvas op to the project's doc, persist it, and broadcast to - * everyone in the project room. Exported so the agent-facing CLI handler - * can push edits through the exact same path — humans then see the agent - * draw live. `origin` distinguishes the author (e.g. "agent"). + * Apply a canvas command to the project's doc, persist it, and schedule a + * full-state snapshot broadcast to the room. Exported so the agent-facing CLI + * handler can push edits through the exact same path — humans then see the + * agent's changes appear. `origin` is accepted for call-site compatibility but + * no longer used: snapshots are author-agnostic. */ export function applyCanvasOpAndBroadcast( projectId: string, op: CanvasOp, origin: string, ): { changed: boolean; doc: import("@/lib/canvas/doc.ts").CanvasDoc } { + void origin; const result = applyAndPersist(projectId, op); if (result.changed) { - broadcastToProject(projectId, { type: "canvas.op", origin, op }); + scheduleCanvasBroadcast(projectId); } wsLog.info("canvas.broadcast", { projectId, - origin, kind: (op as any).kind, changed: result.changed, roomSize: projectRooms.get(projectId)?.size ?? 0, diff --git a/web/src/pages/CanvasPage.tsx b/web/src/pages/CanvasPage.tsx index c2f95c8..8dda24b 100644 --- a/web/src/pages/CanvasPage.tsx +++ b/web/src/pages/CanvasPage.tsx @@ -26,13 +26,7 @@ import { } from "@/components/ui/alert-dialog"; import { useAuthStore } from "@/stores/auth"; import { subscribe, sendCanvasOp, sendCanvasCursor } from "@/lib/ws"; -import { - useCanvas, - applyOpToShapes, - type CanvasShape, - type CanvasShapeType, - type CanvasOp, -} from "@/api/canvas"; +import { useCanvas, type CanvasShape, type CanvasShapeType, type CanvasOp } from "@/api/canvas"; // ── Palette ────────────────────────────────────────────────────────── @@ -103,8 +97,6 @@ interface Cursor { workingTs?: number; } -const AGENT_COLOR = "oklch(0.62 0.2 295)"; // violet — distinct from human cursors - // Handwritten font for the Excalidraw-style sketchy look. const HAND_FONT = "'Patrick Hand', 'Comic Sans MS', cursive"; @@ -130,8 +122,7 @@ function roundRectPath(x: number, y: number, w: number, h: number, r: number): s ); } -// Anchor point (top-left) + bounding box of a shape, for op-derived cursors -// and the new-shape flash highlight. +// Anchor point (top-left) + bounding box of a shape, used for marquee hit-testing. function shapeBox(s: CanvasShape) { if (s.type === "arrow") { const x2 = s.x2 ?? s.x; @@ -149,6 +140,28 @@ function defaultShape(type: CanvasShapeType, x: number, y: number, color: ColorK return { id, type, x, y, w: 170, h: 110, text: "", color }; // rect } +// Shallow field-equality for two shapes — used to decide when a server +// snapshot has "caught up" to a local optimistic override so it can be dropped. +function shapeEq(a: CanvasShape | undefined, b: CanvasShape | undefined): boolean { + if (!a || !b) return false; + return ( + a.type === b.type && + a.x === b.x && + a.y === b.y && + a.w === b.w && + a.h === b.h && + a.x2 === b.x2 && + a.y2 === b.y2 && + a.text === b.text && + a.color === b.color + ); +} + +// Safety backstop: if a committed override isn't confirmed by a snapshot within +// this window (e.g. another user edited the same shape concurrently, so the +// server value never matches ours), drop it anyway so we can't wedge. +const OVERRIDE_TTL = 800; + // ── Component ──────────────────────────────────────────────────────── export function CanvasPage() { @@ -159,7 +172,25 @@ export function CanvasPage() { const { data: doc } = useCanvas(pid); const origin = useRef(nanoid()); - const [shapes, setShapes] = useState>({}); + // ── Sync model: authoritative server state + a thin local override layer ── + // The server is the single source of truth and pushes the *entire* board as a + // `canvas.state` snapshot on every change (and on join). We render that + // snapshot directly, so a missed message, a reconnect, or a race can never + // leave us diverged — the next snapshot is the truth and overwrites us. + // + // `overrides` is the only local state: shapes the user is *actively* + // manipulating (so their own drag/type stays instant instead of waiting a + // round-trip) or has just committed but the snapshot hasn't confirmed yet. + // `shape: null` is an optimistic-delete tombstone. `committedAt === null` + // means "still being manipulated" (never auto-cleared); once committed, the + // override is dropped as soon as a snapshot confirms it (or after a timeout). + const [serverShapes, setServerShapes] = useState>({}); + const serverShapesRef = useRef(serverShapes); + serverShapesRef.current = serverShapes; + const overridesRef = useRef>({}); + const [renderTick, setRenderTick] = useState(0); + const bump = useCallback(() => setRenderTick((t) => (t + 1) & 0xffff), []); + const [tool, setTool] = useState("select"); const [color, setColor] = useState("yellow"); const [selectedIds, setSelectedIds] = useState([]); @@ -171,14 +202,17 @@ export function CanvasPage() { const [editing, setEditing] = useState(null); const [view, setView] = useState({ tx: 0, ty: 0, scale: 1 }); const [cursors, setCursors] = useState>({}); - // shapeId -> { ts, color } for the brief "just drawn/changed" flash. - const [flash, setFlash] = useState>({}); - // shapeId -> ts driving the scale/fade entrance of a remote-added shape. - const [entering, setEntering] = useState>({}); - // Remote ADD ops are queued and replayed one-at-a-time so a burst of - // parallel agent tool-calls reveals as a smooth drawing sequence (cursor - // leading) instead of every shape popping in at once. - const pendingAddsRef = useRef; key: string }>>([]); + + // Rendered board = the server snapshot with local overrides laid on top. + const shapes = useMemo(() => { + const merged: Record = { ...serverShapes }; + for (const [id, o] of Object.entries(overridesRef.current)) { + if (o.shape === null) delete merged[id]; + else merged[id] = o.shape; + } + return merged; + // renderTick bumps whenever overridesRef mutates. + }, [serverShapes, renderTick]); const svgRef = useRef(null); const shapesRef = useRef(shapes); @@ -188,6 +222,7 @@ export function CanvasPage() { const dragRef = useRef(null); const lastCursorRef = useRef(0); const lastMoveRef = useRef(0); + const lastTextRef = useRef(0); const hydratedRef = useRef(false); // Cursor position smoothing. `cursors` holds the *target* positions; a rAF // loop eases the *rendered* positions toward them so every contributor's @@ -198,91 +233,91 @@ export function CanvasPage() { const cursorPosRef = useRef>({}); const [, setCursorFrame] = useState(0); - // Hydrate once from the REST snapshot; the WS stream keeps it live after. + // Seed from the REST snapshot for instant first paint; the WS `canvas.state` + // stream (which also fires on join) takes over as the authority immediately. useEffect(() => { if (hydratedRef.current || !doc) return; const map: Record = {}; for (const s of doc.shapes) map[s.id] = s; - setShapes(map); + setServerShapes(map); hydratedRef.current = true; }, [doc]); - const applyLocal = useCallback((op: CanvasOp) => { - setShapes((s) => applyOpToShapes(s, op)); - }, []); - - // Move a contributor's cursor onto a shape + flash it. - const markActivity = useCallback((key: string, target: CanvasShape) => { - const isAgent = key === "agent"; - const box = shapeBox(target); - const col = isAgent ? AGENT_COLOR : originColor(key); - const now = Date.now(); - setCursors((c) => { - const prev = c[key]; - // Humans have a live pointer cursor; only the agent (no pointer) is - // anchored to the shape it just touched. - const pos = isAgent || !prev ? { x: box.x, y: box.y } : { x: prev.x, y: prev.y }; - return { - ...c, - [key]: { ...pos, name: isAgent ? "Agent" : prev?.name ?? "Someone", color: col, ts: now, workingTs: now }, - }; - }); - setFlash((f) => ({ ...f, [target.id]: { ts: now, color: col } })); - }, []); - - const revealAdd = useCallback( - (op: Extract, key: string) => { - applyLocal(op); - markActivity(key, op.shape); - setEntering((e) => ({ ...e, [op.shape.id]: Date.now() })); + // Apply an op to the local override layer. `committed=false` marks it as an + // in-progress gesture (rendered locally, never auto-cleared); `committed=true` + // means we've sent it to the server and it can be dropped once a snapshot + // confirms it. Reads the current rendered shape as the base for updates. + const localOp = useCallback( + (op: CanvasOp, committed: boolean) => { + const ov = overridesRef.current; + const at = committed ? Date.now() : null; + if (op.kind === "add") { + ov[op.shape.id] = { shape: op.shape, committedAt: at }; + } else if (op.kind === "update") { + const cur = ov[op.id]?.shape ?? serverShapesRef.current[op.id]; + if (cur) ov[op.id] = { shape: { ...cur, ...op.props }, committedAt: at }; + } else if (op.kind === "delete") { + ov[op.id] = { shape: null, committedAt: Date.now() }; + } + bump(); }, - [applyLocal, markActivity], + [bump], ); - // Drain queued remote adds one at a time — the "drawing playback". Reveal - // strictly one shape per tick so it reads as the agent placing them in - // sequence; only compress the gap when a big batch is backed up so a huge - // diagram doesn't take minutes. Self-scheduling so the delay can adapt. - useEffect(() => { - let timer: ReturnType; - const tick = () => { - const q = pendingAddsRef.current; - const item = q.shift(); - if (item) revealAdd(item.op, item.key); - const n = pendingAddsRef.current.length; - // Comfortable, watchable pace for normal diagrams; speed up under heavy - // backlog. Idle polling falls back to the slow cadence. - const delay = n > 40 ? 110 : n > 15 ? 240 : 420; - timer = setTimeout(tick, delay); - }; - timer = setTimeout(tick, 420); - return () => clearTimeout(timer); - }, [revealAdd]); + // Mark an in-progress override as committed (sent to the server) so the next + // confirming snapshot can retire it. + const markCommitted = useCallback((id: string) => { + const o = overridesRef.current[id]; + if (o) o.committedAt = Date.now(); + }, []); + // Apply locally (optimistically) + send to the server in one step. Used for + // instant, non-gesture mutations (color, delete, clear). const commit = useCallback( (op: CanvasOp) => { - setShapes((s) => applyOpToShapes(s, op)); + if (op.kind === "clear") { + // Optimistically tombstone everything currently shown; snapshots + // confirm the empty board. Tombstones survive a stale pre-clear + // snapshot and retire once the post-clear (empty) snapshot lands. + const ov: typeof overridesRef.current = {}; + const now = Date.now(); + for (const id of Object.keys(shapesRef.current)) ov[id] = { shape: null, committedAt: now }; + overridesRef.current = ov; + bump(); + } else { + localOp(op, true); + } sendCanvasOp(pid, op, origin.current); }, - [pid], + [pid, localOp, bump], ); - // Receive remote ops + cursors. + // Receive authoritative state snapshots + cursors. useEffect(() => { const unsub = subscribe((msg: any) => { - if (msg.type === "canvas.op" && msg.origin !== origin.current && msg.op) { - const op = msg.op as CanvasOp; - const key: string = msg.origin || "agent"; - if (op.kind === "add") { - // Queue for sequenced playback (drain timer above). - pendingAddsRef.current.push({ op, key }); - } else { - // Updates/deletes/clears apply immediately so live drags stay smooth. - const target = op.kind === "update" || op.kind === "delete" ? shapesRef.current[op.id] : undefined; - applyLocal(op); - if (op.kind === "update" && target) markActivity(key, target); + if (msg.type === "canvas.state" && msg.doc) { + const map: Record = {}; + for (const s of (msg.doc.shapes ?? []) as CanvasShape[]) map[s.id] = s; + // Retire committed overrides the snapshot has caught up to (or that have + // outlived the confirm window, so a concurrent edit can't wedge one). + // In-progress overrides (committedAt null) are kept until the gesture ends. + const ov = overridesRef.current; + const now = Date.now(); + let changed = false; + for (const [id, o] of Object.entries(ov)) { + if (o.committedAt === null) continue; + const confirmed = o.shape === null ? map[id] === undefined : shapeEq(map[id], o.shape); + if (confirmed || now - o.committedAt > OVERRIDE_TTL) { + delete ov[id]; + changed = true; + } } - } else if (msg.type === "canvas.cursor" && msg.origin !== origin.current) { + setServerShapes(map); + if (changed) bump(); + hydratedRef.current = true; + return; + } + if (msg.type === "canvas.cursor" && msg.origin !== origin.current) { const key = msg.origin || msg.userId; if (!key) return; if (!msg.cursor) { @@ -306,9 +341,9 @@ export function CanvasPage() { } }); return unsub; - }, [applyLocal, markActivity]); + }, []); - // Prune stale cursors + finished flashes. + // Prune stale cursors. useEffect(() => { const t = setInterval(() => { const now = Date.now(); @@ -324,24 +359,6 @@ export function CanvasPage() { } return changed ? next : c; }); - setFlash((f) => { - let changed = false; - const next: Record = {}; - for (const [k, v] of Object.entries(f)) { - if (now - v.ts < 1000) next[k] = v; - else changed = true; - } - return changed ? next : f; - }); - setEntering((e) => { - let changed = false; - const next: Record = {}; - for (const [k, v] of Object.entries(e)) { - if (now - v < 500) next[k] = v; - else changed = true; - } - return changed ? next : e; - }); }, 500); return () => clearInterval(t); }, []); @@ -427,10 +444,13 @@ export function CanvasPage() { drag.shapeType === "arrow" ? { x2: drag.def.x2, y2: drag.def.y2 } : { w: drag.def.w, h: drag.def.h }; - applyLocal({ kind: "update", id: drag.id, props }); + localOp({ kind: "update", id: drag.id, props }, false); shape = shapesRef.current[drag.id]; } - if (shape) sendCanvasOp(pid, { kind: "add", shape }, origin.current); + if (shape) { + sendCanvasOp(pid, { kind: "add", shape }, origin.current); + markCommitted(drag.id); + } } } else if (drag.mode === "move") { // Persist the final position of every shape in the (possibly multi-) drag. @@ -442,6 +462,7 @@ export function CanvasPage() { { kind: "update", id, props: { x: shape.x, y: shape.y, x2: shape.x2, y2: shape.y2 } }, origin.current, ); + markCommitted(id); } } } else if (drag.mode === "resize") { @@ -449,6 +470,7 @@ export function CanvasPage() { if (shape) { const props: Partial = { x: shape.x, y: shape.y, w: shape.w, h: shape.h }; sendCanvasOp(pid, { kind: "update", id: drag.id, props }, origin.current); + markCommitted(drag.id); } } else if (drag.mode === "marquee") { // Select every shape the final box touches. @@ -464,7 +486,7 @@ export function CanvasPage() { dragRef.current = null; window.removeEventListener("pointermove", onWindowMove); window.removeEventListener("pointerup", endGesture); - }, [pid, applyLocal]); + }, [pid, localOp, markCommitted]); const onWindowMove = useCallback( (e: PointerEvent) => { @@ -491,15 +513,18 @@ export function CanvasPage() { } if (drag.mode === "create") { if (drag.shapeType === "arrow") { - applyLocal({ kind: "update", id: drag.id, props: { x2: w.x, y2: w.y } }); + localOp({ kind: "update", id: drag.id, props: { x2: w.x, y2: w.y } }, false); } else if (drag.shapeType === "rect" || drag.shapeType === "ellipse") { const x = Math.min(drag.ox, w.x); const y = Math.min(drag.oy, w.y); - applyLocal({ - kind: "update", - id: drag.id, - props: { x, y, w: Math.abs(w.x - drag.ox), h: Math.abs(w.y - drag.oy) }, - }); + localOp( + { + kind: "update", + id: drag.id, + props: { x, y, w: Math.abs(w.x - drag.ox), h: Math.abs(w.y - drag.oy) }, + }, + false, + ); } return; } @@ -511,20 +536,23 @@ export function CanvasPage() { orig.type === "arrow" ? { x: orig.x + dx, y: orig.y + dy, x2: (orig.x2 ?? 0) + dx, y2: (orig.y2 ?? 0) + dy } : { x: orig.x + dx, y: orig.y + dy }; - applyLocal({ kind: "update", id, props }); + localOp({ kind: "update", id, props }, false); } return; } if (drag.mode === "resize") { const o = drag.orig as CanvasShape; - applyLocal({ - kind: "update", - id: drag.id, - props: { w: Math.max(24, w.x - o.x), h: Math.max(24, w.y - o.y) }, - }); + localOp( + { + kind: "update", + id: drag.id, + props: { w: Math.max(24, w.x - o.x), h: Math.max(24, w.y - o.y) }, + }, + false, + ); } }, - [toWorld, applyLocal], + [toWorld, localOp], ); const beginGesture = (drag: any) => { @@ -555,12 +583,12 @@ export function CanvasPage() { // avoids the "appears full-size then snaps small" flash. const shape: CanvasShape = tool === "arrow" ? { ...def, x2: w.x, y2: w.y } : { ...def, w: 0, h: 0 }; - applyLocal({ kind: "add", shape }); + localOp({ kind: "add", shape }, false); setSelectedIds([shape.id]); beginGesture({ mode: "create", id: shape.id, shapeType: tool, ox: w.x, oy: w.y, def }); } else { // text — placed at click, no drag sizing; commit immediately. - applyLocal({ kind: "add", shape: def }); + localOp({ kind: "add", shape: def }, true); setSelectedIds([def.id]); sendCanvasOp(pid, { kind: "add", shape: def }, origin.current); setTool("select"); @@ -644,11 +672,23 @@ export function CanvasPage() { }; const onEditText = (id: string, text: string) => { - applyLocal({ kind: "update", id, props: { text } }); + // Active override while editing (held until the editor closes), plus a + // throttled live send so others watch the text appear as it's typed. + localOp({ kind: "update", id, props: { text } }, false); + const now = Date.now(); + if (now - lastTextRef.current > 80) { + lastTextRef.current = now; + sendCanvasOp(pid, { kind: "update", id, props: { text } }, origin.current); + } }; const onEditTextCommit = (id: string) => { const s = shapesRef.current[id]; - if (s) sendCanvasOp(pid, { kind: "update", id, props: { text: s.text ?? "" } }, origin.current); + // Always send the final value — the last throttled keystroke may have been + // skipped — then let the next snapshot retire the override. + if (s) { + sendCanvasOp(pid, { kind: "update", id, props: { text: s.text ?? "" } }, origin.current); + markCommitted(id); + } }; // Download the server-rendered PNG of the current board. The endpoint streams @@ -760,7 +800,6 @@ export function CanvasPage() { shape={s} selected={selectedSet.has(s.id)} resizable={selectedIds.length === 1 && selectedSet.has(s.id)} - entering={entering[s.id] !== undefined} onPointerDown={(e) => onShapePointerDown(e, s.id)} onDoubleClick={() => { if (s.type !== "arrow") setEditing(s.id); @@ -785,37 +824,6 @@ export function CanvasPage() { /> )} - {/* Flash ring on shapes a remote contributor just drew/changed */} - {Object.entries(flash).map(([id, fl]) => { - const s = shapes[id]; - if (!s) return null; - const b = shapeBox(s); - const inv = 1 / view.scale; - const pad = 6 * inv; - return ( - - - - - ); - })} - {/* Live contributor cursors (humans by pointer, agent by op anchor) */} {Object.entries(cursors).map(([k, c]) => { const inv = 1 / view.scale; @@ -913,7 +921,6 @@ function ShapeView({ shape, selected, resizable, - entering, onPointerDown, onDoubleClick, onResizeDown, @@ -921,7 +928,6 @@ function ShapeView({ shape: CanvasShape; selected: boolean; resizable: boolean; - entering: boolean; onPointerDown: (e: React.PointerEvent) => void; onDoubleClick: () => void; onResizeDown: (e: React.PointerEvent) => void; @@ -972,17 +978,11 @@ function ShapeView({ // eslint-disable-next-line react-hooks/exhaustive-deps }, [shape.type, shape.x, shape.y, shape.w, shape.h, shape.x2, shape.y2, shape.color, seed]); - // Fade a remote-added shape in (plays once on mount via SMIL). - const enter = entering ? ( - - ) : null; - if (shape.type === "arrow") { const x2 = shape.x2 ?? shape.x + 100; const y2 = shape.y2 ?? shape.y; return ( - {enter} {/* fat invisible hit line */} {paths.map((p, i) => ( @@ -1006,7 +1006,6 @@ function ShapeView({ return ( - {enter} {/* invisible hit area so the whole bounding box is grabbable */} {paths.map((p, i) => (