From 9da71ddb731797b18d18ed0661e2b2003a6a7d5f Mon Sep 17 00:00:00 2001 From: real-venus Date: Thu, 25 Jun 2026 11:41:56 -0700 Subject: [PATCH] feat: high-frequency WebSocket streaming buffer throttler (#6) Decouples telemetry ingestion from the canvas draw loop so 50+ msg/s bursts no longer drop frames or cause tab unresponsiveness. - utils/buffer.ts: RingBuffer (fixed MAX_POINTS=200 capacity, slice(-cap) eviction, runtime setCapacity), StreamBatcher (accumulate writes, flush into the ring every BATCH_INTERVAL=50ms; injectable clock), and AdaptiveCapacityMonitor (shrinks the window after frames exceed 33ms / <30 FPS, grows back when healthy) - utils/helpers.ts: throttle(fn, wait) (leading + trailing) plus debounce - components/spatial/LiveDataView.tsx: routes ingestion through the batcher, the RAF draw loop reads the ring buffer directly (no React state on the hot path) and feeds frame time to the CPU monitor to adapt capacity; ResizeObserver updates are throttled - tests for the ring buffer, batching/flush timing, the adaptive monitor, and throttle/debounce Test file lives under tests/unit (vitest only collects tests/**, not src/__tests__). --- src/components/spatial/LiveDataView.tsx | 184 ++++++++++++---------- src/utils/buffer.ts | 195 ++++++++++++++++++++++++ src/utils/helpers.ts | 58 +++++++ tests/unit/buffer.test.ts | 145 ++++++++++++++++++ tests/unit/helpers.test.ts | 52 +++++++ 5 files changed, 556 insertions(+), 78 deletions(-) create mode 100644 src/utils/buffer.ts create mode 100644 src/utils/helpers.ts create mode 100644 tests/unit/buffer.test.ts create mode 100644 tests/unit/helpers.test.ts diff --git a/src/components/spatial/LiveDataView.tsx b/src/components/spatial/LiveDataView.tsx index 6a708be..bc0edff 100644 --- a/src/components/spatial/LiveDataView.tsx +++ b/src/components/spatial/LiveDataView.tsx @@ -1,112 +1,140 @@ "use client"; -import { useRef, useEffect, useState, useCallback } from "react"; - -const MAX_POINTS = 200; -const BATCH_INTERVAL = 50; +import { useRef, useEffect, useState, useMemo } from "react"; +import { + RingBuffer, + StreamBatcher, + AdaptiveCapacityMonitor, +} from "@/utils/buffer"; +import { throttle } from "@/utils/helpers"; function generatePoint(): number { return 40 + Math.random() * 60; } +/** + * High-frequency live telemetry view. + * + * Ingestion and rendering are decoupled: incoming points are accumulated by a + * {@link StreamBatcher} and flushed into a fixed-capacity {@link RingBuffer}, and + * the canvas draw loop reads the ring buffer directly each animation frame (no + * React state on the hot path). An {@link AdaptiveCapacityMonitor} shrinks the + * retained window when frames run slower than ~30 FPS, so 50+ msg/s bursts no + * longer saturate the RAF loop. + */ export function LiveDataView() { const canvasRef = useRef(null); const containerRef = useRef(null); const [dimensions, setDimensions] = useState({ width: 800, height: 200 }); - const bufferRef = useRef([]); - const batchRef = useRef([]); + + // Streaming primitives (stable for the component's lifetime). + const bufferRef = useRef(null); + if (!bufferRef.current) bufferRef.current = new RingBuffer(); + const batcherRef = useRef(null); + if (!batcherRef.current) batcherRef.current = new StreamBatcher(bufferRef.current); + const monitorRef = useRef(null); + if (!monitorRef.current) monitorRef.current = new AdaptiveCapacityMonitor(); + + const dimsRef = useRef(dimensions); + dimsRef.current = dimensions; const rafRef = useRef(0); - const lastFlush = useRef(Date.now()); + const lastFrameRef = useRef(0); + + // Throttle resize updates so a stream of ResizeObserver events can't thrash + // React state during a burst. + const handleResize = useMemo( + () => + throttle((width: number, height: number) => { + setDimensions({ width, height }); + }, 100), + [] + ); useEffect(() => { const container = containerRef.current; if (!container) return; const observer = new ResizeObserver((entries) => { for (const e of entries) { - setDimensions({ - width: Math.floor(e.contentRect.width), - height: Math.floor(e.contentRect.height), - }); + handleResize( + Math.floor(e.contentRect.width), + Math.floor(e.contentRect.height) + ); } }); observer.observe(container); return () => observer.disconnect(); - }, []); - - const flushBatch = useCallback(() => { - if (batchRef.current.length === 0) return; - bufferRef.current = [ - ...bufferRef.current, - ...batchRef.current, - ].slice(-MAX_POINTS); - batchRef.current = []; - }, []); - - const pushPoint = useCallback((value: number) => { - batchRef.current.push(value); - const now = Date.now(); - if (now - lastFlush.current >= BATCH_INTERVAL) { - flushBatch(); - lastFlush.current = now; - } - }, [flushBatch]); + }, [handleResize]); + // Simulated WebSocket ingestion → routed through the batcher. useEffect(() => { + const batcher = batcherRef.current!; const interval = setInterval(() => { - pushPoint(generatePoint()); + batcher.write(generatePoint()); }, 100); return () => clearInterval(interval); - }, [pushPoint]); - - const draw = useCallback(() => { - const canvas = canvasRef.current; - if (!canvas) return; - const ctx = canvas.getContext("2d"); - if (!ctx) return; - - const data = bufferRef.current; - const { width, height } = dimensions; - - ctx.clearRect(0, 0, width, height); - ctx.fillStyle = "var(--background)"; - ctx.fillRect(0, 0, width, height); - - if (data.length < 2) return; - - ctx.strokeStyle = "#22c55e"; - ctx.lineWidth = 2; - ctx.beginPath(); - const step = width / Math.max(data.length - 1, 1); - for (let i = 0; i < data.length; i++) { - const x = i * step; - const y = height - (data[i] / 100) * height; - if (i === 0) ctx.moveTo(x, y); - else ctx.lineTo(x, y); - } - ctx.stroke(); - - const fillGradient = ctx.createLinearGradient(0, 0, 0, height); - fillGradient.addColorStop(0, "rgba(34,197,94,0.2)"); - fillGradient.addColorStop(1, "rgba(34,197,94,0)"); - ctx.fillStyle = fillGradient; - ctx.lineTo(width, height); - ctx.lineTo(0, height); - ctx.closePath(); - ctx.fill(); - - const avg = data.reduce((a, b) => a + b, 0) / data.length; - ctx.fillStyle = "var(--foreground)"; - ctx.font = "12px monospace"; - ctx.fillText(`${avg.toFixed(1)}% avg`, 8, 16); - }, [dimensions]); + }, []); + // Draw loop: decoupled from ingestion, reads the ring buffer directly. useEffect(() => { - rafRef.current = requestAnimationFrame(function loop() { - draw(); + const buffer = bufferRef.current!; + const batcher = batcherRef.current!; + const monitor = monitorRef.current!; + + const draw = (timestamp: number) => { + // Frame-to-frame time drives the adaptive capacity (CPU monitor). + const dt = lastFrameRef.current ? timestamp - lastFrameRef.current : 0; + lastFrameRef.current = timestamp; + if (dt > 0) buffer.setCapacity(monitor.record(dt)); + + batcher.flushDue(); + + const canvas = canvasRef.current; + const ctx = canvas?.getContext("2d"); + if (!canvas || !ctx) return; + + const data = buffer.toArray(); + const { width, height } = dimsRef.current; + + ctx.clearRect(0, 0, width, height); + ctx.fillStyle = "var(--background)"; + ctx.fillRect(0, 0, width, height); + + if (data.length >= 2) { + ctx.strokeStyle = "#22c55e"; + ctx.lineWidth = 2; + ctx.beginPath(); + const step = width / Math.max(data.length - 1, 1); + for (let i = 0; i < data.length; i++) { + const x = i * step; + const y = height - (data[i] / 100) * height; + if (i === 0) ctx.moveTo(x, y); + else ctx.lineTo(x, y); + } + ctx.stroke(); + + const fillGradient = ctx.createLinearGradient(0, 0, 0, height); + fillGradient.addColorStop(0, "rgba(34,197,94,0.2)"); + fillGradient.addColorStop(1, "rgba(34,197,94,0)"); + ctx.fillStyle = fillGradient; + ctx.lineTo(width, height); + ctx.lineTo(0, height); + ctx.closePath(); + ctx.fill(); + + const avg = data.reduce((a, b) => a + b, 0) / data.length; + ctx.fillStyle = "var(--foreground)"; + ctx.font = "12px monospace"; + ctx.fillText(`${avg.toFixed(1)}% avg`, 8, 16); + } + }; + + const loop = (timestamp: number) => { + draw(timestamp); rafRef.current = requestAnimationFrame(loop); - }); + }; + rafRef.current = requestAnimationFrame(loop); return () => cancelAnimationFrame(rafRef.current); - }, [draw]); + }, []); return (
this.cap) { + this.data = this.data.slice(-this.cap); + } + } + + /** Append many values at once, keeping only the most recent `capacity`. */ + pushBatch(values: number[]): void { + if (values.length === 0) return; + this.data = this.data.concat(values); + if (this.data.length > this.cap) { + this.data = this.data.slice(-this.cap); + } + } + + /** Snapshot of the retained values (oldest → newest). */ + toArray(): number[] { + return this.data.slice(); + } + + /** Resize the capacity, trimming to the newest values if it shrank. */ + setCapacity(capacity: number): void { + if (capacity <= 0) throw new Error("RingBuffer capacity must be positive"); + this.cap = capacity; + if (this.data.length > capacity) { + this.data = this.data.slice(-capacity); + } + } + + clear(): void { + this.data = []; + } +} + +export interface StreamBatcherOptions { + intervalMs?: number; + /** Injectable clock (ms) for deterministic tests. */ + now?: () => number; +} + +/** + * Accumulates incoming points and flushes them into a {@link RingBuffer} no + * more than once per `intervalMs`, smoothing high-frequency bursts before they + * reach the render path. + */ +export class StreamBatcher { + private batch: number[] = []; + private lastFlush: number; + private readonly intervalMs: number; + private readonly now: () => number; + + constructor( + private readonly buffer: RingBuffer, + options: StreamBatcherOptions = {} + ) { + this.intervalMs = options.intervalMs ?? BATCH_INTERVAL; + this.now = options.now ?? Date.now; + this.lastFlush = this.now(); + } + + /** Pending (un-flushed) point count. */ + get pending(): number { + return this.batch.length; + } + + /** Queue a point; flushes inline once the interval has elapsed. */ + write(value: number): void { + this.batch.push(value); + if (this.now() - this.lastFlush >= this.intervalMs) { + this.flush(); + } + } + + /** Flush if the interval has elapsed (called from the draw loop). */ + flushDue(): void { + if (this.batch.length > 0 && this.now() - this.lastFlush >= this.intervalMs) { + this.flush(); + } + } + + /** Force-flush the pending batch into the ring buffer. */ + flush(): void { + if (this.batch.length > 0) { + this.buffer.pushBatch(this.batch); + this.batch = []; + } + this.lastFlush = this.now(); + } +} + +export interface AdaptiveCapacityOptions { + max?: number; + min?: number; + thresholdMs?: number; + /** Points to add/remove per adjustment. */ + step?: number; + /** Consecutive slow frames before shrinking. */ + slowFramesToShrink?: number; + /** Consecutive healthy frames before growing back. */ + fastFramesToGrow?: number; +} + +/** + * Recommends a ring-buffer capacity from observed frame times: it shrinks the + * window after sustained slow frames (> {@link SLOW_FRAME_MS}) and grows it back + * once the frame rate recovers, keeping the draw loop above ~30 FPS. + */ +export class AdaptiveCapacityMonitor { + private current: number; + private slowStreak = 0; + private fastStreak = 0; + private readonly max: number; + private readonly min: number; + private readonly thresholdMs: number; + private readonly step: number; + private readonly slowFramesToShrink: number; + private readonly fastFramesToGrow: number; + + constructor(options: AdaptiveCapacityOptions = {}) { + this.max = options.max ?? MAX_POINTS; + this.min = options.min ?? MIN_POINTS; + this.thresholdMs = options.thresholdMs ?? SLOW_FRAME_MS; + this.step = options.step ?? 25; + this.slowFramesToShrink = options.slowFramesToShrink ?? 2; + this.fastFramesToGrow = options.fastFramesToGrow ?? 30; + this.current = this.max; + } + + get capacity(): number { + return this.current; + } + + /** Feed a frame's duration (ms); returns the recommended capacity. */ + record(frameMs: number): number { + if (frameMs > this.thresholdMs) { + this.slowStreak += 1; + this.fastStreak = 0; + if (this.slowStreak >= this.slowFramesToShrink) { + this.current = Math.max(this.min, this.current - this.step); + this.slowStreak = 0; + } + } else { + this.fastStreak += 1; + this.slowStreak = 0; + if (this.fastStreak >= this.fastFramesToGrow) { + this.current = Math.min(this.max, this.current + this.step); + this.fastStreak = 0; + } + } + return this.current; + } +} diff --git a/src/utils/helpers.ts b/src/utils/helpers.ts new file mode 100644 index 0000000..6a0d404 --- /dev/null +++ b/src/utils/helpers.ts @@ -0,0 +1,58 @@ +/** + * Small, dependency-free helper utilities. + */ + +/* eslint-disable @typescript-eslint/no-explicit-any */ + +/** + * Throttle `fn` so it runs at most once per `wait` ms. Leading and trailing: + * the first call fires immediately, subsequent calls within the window are + * coalesced into a single trailing call with the latest arguments. + */ +export function throttle any>( + fn: T, + wait: number +): (...args: Parameters) => void { + let lastRun = 0; + let timer: ReturnType | null = null; + let lastArgs: Parameters | null = null; + + return (...args: Parameters): void => { + const now = Date.now(); + const remaining = wait - (now - lastRun); + lastArgs = args; + + if (remaining <= 0) { + if (timer) { + clearTimeout(timer); + timer = null; + } + lastRun = now; + fn(...args); + } else if (!timer) { + timer = setTimeout(() => { + lastRun = Date.now(); + timer = null; + if (lastArgs) fn(...lastArgs); + }, remaining); + } + }; +} + +/** + * Debounce `fn` so it runs only after `wait` ms have elapsed since the last + * call. Useful for resize/settle events. + */ +export function debounce any>( + fn: T, + wait: number +): (...args: Parameters) => void { + let timer: ReturnType | null = null; + return (...args: Parameters): void => { + if (timer) clearTimeout(timer); + timer = setTimeout(() => { + timer = null; + fn(...args); + }, wait); + }; +} diff --git a/tests/unit/buffer.test.ts b/tests/unit/buffer.test.ts new file mode 100644 index 0000000..841a270 --- /dev/null +++ b/tests/unit/buffer.test.ts @@ -0,0 +1,145 @@ +import { describe, it, expect } from "vitest"; +import { + RingBuffer, + StreamBatcher, + AdaptiveCapacityMonitor, + MAX_POINTS, + MIN_POINTS, +} from "@/utils/buffer"; + +describe("RingBuffer", () => { + it("defaults to MAX_POINTS capacity", () => { + expect(new RingBuffer().capacity).toBe(MAX_POINTS); + }); + + it("retains pushed values in order", () => { + const b = new RingBuffer(4); + b.push(1); + b.push(2); + b.push(3); + expect(b.toArray()).toEqual([1, 2, 3]); + expect(b.size).toBe(3); + }); + + it("drops the oldest values past capacity", () => { + const b = new RingBuffer(3); + [1, 2, 3, 4, 5].forEach((v) => b.push(v)); + expect(b.toArray()).toEqual([3, 4, 5]); + }); + + it("pushBatch keeps only the most recent capacity", () => { + const b = new RingBuffer(3); + b.pushBatch([1, 2, 3, 4, 5]); + expect(b.toArray()).toEqual([3, 4, 5]); + b.pushBatch([]); // no-op + expect(b.toArray()).toEqual([3, 4, 5]); + }); + + it("setCapacity shrinks to the newest values", () => { + const b = new RingBuffer(5); + b.pushBatch([1, 2, 3, 4, 5]); + b.setCapacity(2); + expect(b.toArray()).toEqual([4, 5]); + expect(b.capacity).toBe(2); + }); + + it("rejects non-positive capacity", () => { + expect(() => new RingBuffer(0)).toThrow(); + expect(() => new RingBuffer(2).setCapacity(-1)).toThrow(); + }); + + it("clear empties the buffer", () => { + const b = new RingBuffer(3); + b.push(1); + b.clear(); + expect(b.size).toBe(0); + }); +}); + +describe("StreamBatcher", () => { + function controllableClock() { + let t = 0; + return { now: () => t, advance: (ms: number) => (t += ms) }; + } + + it("accumulates writes and flushes once the interval elapses", () => { + const clock = controllableClock(); + const buffer = new RingBuffer(100); + const batcher = new StreamBatcher(buffer, { intervalMs: 50, now: clock.now }); + + batcher.write(1); + batcher.write(2); + expect(batcher.pending).toBe(2); + expect(buffer.size).toBe(0); // not flushed yet + + clock.advance(50); + batcher.write(3); // crosses the interval → flush + expect(buffer.toArray()).toEqual([1, 2, 3]); + expect(batcher.pending).toBe(0); + }); + + it("flushDue only flushes when the interval has elapsed", () => { + const clock = controllableClock(); + const buffer = new RingBuffer(100); + const batcher = new StreamBatcher(buffer, { intervalMs: 50, now: clock.now }); + + batcher.write(1); + batcher.flushDue(); + expect(buffer.size).toBe(0); // too soon + clock.advance(50); + batcher.flushDue(); + expect(buffer.toArray()).toEqual([1]); + }); + + it("force flush empties the pending batch", () => { + const buffer = new RingBuffer(100); + const batcher = new StreamBatcher(buffer, { intervalMs: 50, now: () => 0 }); + batcher.write(7); + batcher.flush(); + expect(buffer.toArray()).toEqual([7]); + }); +}); + +describe("AdaptiveCapacityMonitor", () => { + it("starts at the max capacity", () => { + expect(new AdaptiveCapacityMonitor().capacity).toBe(MAX_POINTS); + }); + + it("shrinks after consecutive slow frames", () => { + const m = new AdaptiveCapacityMonitor({ step: 25, slowFramesToShrink: 2 }); + m.record(40); // slow #1 — no change yet + expect(m.capacity).toBe(MAX_POINTS); + const cap = m.record(40); // slow #2 — shrink + expect(cap).toBe(MAX_POINTS - 25); + }); + + it("does not shrink below the floor", () => { + const m = new AdaptiveCapacityMonitor({ + step: 50, + slowFramesToShrink: 1, + min: MIN_POINTS, + }); + for (let i = 0; i < 20; i++) m.record(100); + expect(m.capacity).toBe(MIN_POINTS); + }); + + it("grows back after sustained healthy frames", () => { + const m = new AdaptiveCapacityMonitor({ + step: 25, + slowFramesToShrink: 1, + fastFramesToGrow: 5, + }); + m.record(40); // shrink to 175 + expect(m.capacity).toBe(175); + for (let i = 0; i < 5; i++) m.record(10); // healthy run → grow + expect(m.capacity).toBe(200); + }); + + it("a fast frame resets the slow streak", () => { + const m = new AdaptiveCapacityMonitor({ slowFramesToShrink: 2 }); + m.record(40); // slow #1 + m.record(10); // healthy → resets streak + m.record(40); // slow #1 again (not #2) + expect(m.capacity).toBe(MAX_POINTS); + }); +}); diff --git a/tests/unit/helpers.test.ts b/tests/unit/helpers.test.ts new file mode 100644 index 0000000..87a8d7c --- /dev/null +++ b/tests/unit/helpers.test.ts @@ -0,0 +1,52 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { throttle, debounce } from "@/utils/helpers"; + +beforeEach(() => vi.useFakeTimers()); +afterEach(() => vi.useRealTimers()); + +describe("throttle", () => { + it("invokes immediately on the leading edge", () => { + const fn = vi.fn(); + const throttled = throttle(fn, 100); + throttled("a"); + expect(fn).toHaveBeenCalledTimes(1); + expect(fn).toHaveBeenLastCalledWith("a"); + }); + + it("suppresses calls within the window, then fires a trailing call", () => { + const fn = vi.fn(); + const throttled = throttle(fn, 100); + throttled(1); // leading + throttled(2); // suppressed + throttled(3); // suppressed, becomes trailing args + expect(fn).toHaveBeenCalledTimes(1); + + vi.advanceTimersByTime(100); + expect(fn).toHaveBeenCalledTimes(2); + expect(fn).toHaveBeenLastCalledWith(3); // latest args + }); + + it("allows another leading call after the window passes", () => { + const fn = vi.fn(); + const throttled = throttle(fn, 100); + throttled("first"); + vi.advanceTimersByTime(150); + throttled("second"); + expect(fn).toHaveBeenCalledTimes(2); + expect(fn).toHaveBeenLastCalledWith("second"); + }); +}); + +describe("debounce", () => { + it("only fires after the quiet period", () => { + const fn = vi.fn(); + const debounced = debounce(fn, 100); + debounced("a"); + debounced("b"); + vi.advanceTimersByTime(99); + expect(fn).not.toHaveBeenCalled(); + vi.advanceTimersByTime(1); + expect(fn).toHaveBeenCalledTimes(1); + expect(fn).toHaveBeenLastCalledWith("b"); + }); +});