diff --git a/src/components/spatial/LiveDataView.tsx b/src/components/spatial/LiveDataView.tsx index 6a708be..bc0edff 100644 --- a/src/components/spatial/LiveDataView.tsx +++ b/src/components/spatial/LiveDataView.tsx @@ -1,112 +1,140 @@ "use client"; -import { useRef, useEffect, useState, useCallback } from "react"; - -const MAX_POINTS = 200; -const BATCH_INTERVAL = 50; +import { useRef, useEffect, useState, useMemo } from "react"; +import { + RingBuffer, + StreamBatcher, + AdaptiveCapacityMonitor, +} from "@/utils/buffer"; +import { throttle } from "@/utils/helpers"; function generatePoint(): number { return 40 + Math.random() * 60; } +/** + * High-frequency live telemetry view. + * + * Ingestion and rendering are decoupled: incoming points are accumulated by a + * {@link StreamBatcher} and flushed into a fixed-capacity {@link RingBuffer}, and + * the canvas draw loop reads the ring buffer directly each animation frame (no + * React state on the hot path). An {@link AdaptiveCapacityMonitor} shrinks the + * retained window when frames run slower than ~30 FPS, so 50+ msg/s bursts no + * longer saturate the RAF loop. + */ export function LiveDataView() { const canvasRef = useRef(null); const containerRef = useRef(null); const [dimensions, setDimensions] = useState({ width: 800, height: 200 }); - const bufferRef = useRef([]); - const batchRef = useRef([]); + + // Streaming primitives (stable for the component's lifetime). + const bufferRef = useRef(null); + if (!bufferRef.current) bufferRef.current = new RingBuffer(); + const batcherRef = useRef(null); + if (!batcherRef.current) batcherRef.current = new StreamBatcher(bufferRef.current); + const monitorRef = useRef(null); + if (!monitorRef.current) monitorRef.current = new AdaptiveCapacityMonitor(); + + const dimsRef = useRef(dimensions); + dimsRef.current = dimensions; const rafRef = useRef(0); - const lastFlush = useRef(Date.now()); + const lastFrameRef = useRef(0); + + // Throttle resize updates so a stream of ResizeObserver events can't thrash + // React state during a burst. + const handleResize = useMemo( + () => + throttle((width: number, height: number) => { + setDimensions({ width, height }); + }, 100), + [] + ); useEffect(() => { const container = containerRef.current; if (!container) return; const observer = new ResizeObserver((entries) => { for (const e of entries) { - setDimensions({ - width: Math.floor(e.contentRect.width), - height: Math.floor(e.contentRect.height), - }); + handleResize( + Math.floor(e.contentRect.width), + Math.floor(e.contentRect.height) + ); } }); observer.observe(container); return () => observer.disconnect(); - }, []); - - const flushBatch = useCallback(() => { - if (batchRef.current.length === 0) return; - bufferRef.current = [ - ...bufferRef.current, - ...batchRef.current, - ].slice(-MAX_POINTS); - batchRef.current = []; - }, []); - - const pushPoint = useCallback((value: number) => { - batchRef.current.push(value); - const now = Date.now(); - if (now - lastFlush.current >= BATCH_INTERVAL) { - flushBatch(); - lastFlush.current = now; - } - }, [flushBatch]); + }, [handleResize]); + // Simulated WebSocket ingestion → routed through the batcher. useEffect(() => { + const batcher = batcherRef.current!; const interval = setInterval(() => { - pushPoint(generatePoint()); + batcher.write(generatePoint()); }, 100); return () => clearInterval(interval); - }, [pushPoint]); - - const draw = useCallback(() => { - const canvas = canvasRef.current; - if (!canvas) return; - const ctx = canvas.getContext("2d"); - if (!ctx) return; - - const data = bufferRef.current; - const { width, height } = dimensions; - - ctx.clearRect(0, 0, width, height); - ctx.fillStyle = "var(--background)"; - ctx.fillRect(0, 0, width, height); - - if (data.length < 2) return; - - ctx.strokeStyle = "#22c55e"; - ctx.lineWidth = 2; - ctx.beginPath(); - const step = width / Math.max(data.length - 1, 1); - for (let i = 0; i < data.length; i++) { - const x = i * step; - const y = height - (data[i] / 100) * height; - if (i === 0) ctx.moveTo(x, y); - else ctx.lineTo(x, y); - } - ctx.stroke(); - - const fillGradient = ctx.createLinearGradient(0, 0, 0, height); - fillGradient.addColorStop(0, "rgba(34,197,94,0.2)"); - fillGradient.addColorStop(1, "rgba(34,197,94,0)"); - ctx.fillStyle = fillGradient; - ctx.lineTo(width, height); - ctx.lineTo(0, height); - ctx.closePath(); - ctx.fill(); - - const avg = data.reduce((a, b) => a + b, 0) / data.length; - ctx.fillStyle = "var(--foreground)"; - ctx.font = "12px monospace"; - ctx.fillText(`${avg.toFixed(1)}% avg`, 8, 16); - }, [dimensions]); + }, []); + // Draw loop: decoupled from ingestion, reads the ring buffer directly. useEffect(() => { - rafRef.current = requestAnimationFrame(function loop() { - draw(); + const buffer = bufferRef.current!; + const batcher = batcherRef.current!; + const monitor = monitorRef.current!; + + const draw = (timestamp: number) => { + // Frame-to-frame time drives the adaptive capacity (CPU monitor). + const dt = lastFrameRef.current ? timestamp - lastFrameRef.current : 0; + lastFrameRef.current = timestamp; + if (dt > 0) buffer.setCapacity(monitor.record(dt)); + + batcher.flushDue(); + + const canvas = canvasRef.current; + const ctx = canvas?.getContext("2d"); + if (!canvas || !ctx) return; + + const data = buffer.toArray(); + const { width, height } = dimsRef.current; + + ctx.clearRect(0, 0, width, height); + ctx.fillStyle = "var(--background)"; + ctx.fillRect(0, 0, width, height); + + if (data.length >= 2) { + ctx.strokeStyle = "#22c55e"; + ctx.lineWidth = 2; + ctx.beginPath(); + const step = width / Math.max(data.length - 1, 1); + for (let i = 0; i < data.length; i++) { + const x = i * step; + const y = height - (data[i] / 100) * height; + if (i === 0) ctx.moveTo(x, y); + else ctx.lineTo(x, y); + } + ctx.stroke(); + + const fillGradient = ctx.createLinearGradient(0, 0, 0, height); + fillGradient.addColorStop(0, "rgba(34,197,94,0.2)"); + fillGradient.addColorStop(1, "rgba(34,197,94,0)"); + ctx.fillStyle = fillGradient; + ctx.lineTo(width, height); + ctx.lineTo(0, height); + ctx.closePath(); + ctx.fill(); + + const avg = data.reduce((a, b) => a + b, 0) / data.length; + ctx.fillStyle = "var(--foreground)"; + ctx.font = "12px monospace"; + ctx.fillText(`${avg.toFixed(1)}% avg`, 8, 16); + } + }; + + const loop = (timestamp: number) => { + draw(timestamp); rafRef.current = requestAnimationFrame(loop); - }); + }; + rafRef.current = requestAnimationFrame(loop); return () => cancelAnimationFrame(rafRef.current); - }, [draw]); + }, []); return (
this.cap) { + this.data = this.data.slice(-this.cap); + } + } + + /** Append many values at once, keeping only the most recent `capacity`. */ + pushBatch(values: number[]): void { + if (values.length === 0) return; + this.data = this.data.concat(values); + if (this.data.length > this.cap) { + this.data = this.data.slice(-this.cap); + } + } + + /** Snapshot of the retained values (oldest → newest). */ + toArray(): number[] { + return this.data.slice(); + } + + /** Resize the capacity, trimming to the newest values if it shrank. */ + setCapacity(capacity: number): void { + if (capacity <= 0) throw new Error("RingBuffer capacity must be positive"); + this.cap = capacity; + if (this.data.length > capacity) { + this.data = this.data.slice(-capacity); + } + } + + clear(): void { + this.data = []; + } +} + +export interface StreamBatcherOptions { + intervalMs?: number; + /** Injectable clock (ms) for deterministic tests. */ + now?: () => number; +} + +/** + * Accumulates incoming points and flushes them into a {@link RingBuffer} no + * more than once per `intervalMs`, smoothing high-frequency bursts before they + * reach the render path. + */ +export class StreamBatcher { + private batch: number[] = []; + private lastFlush: number; + private readonly intervalMs: number; + private readonly now: () => number; + + constructor( + private readonly buffer: RingBuffer, + options: StreamBatcherOptions = {} + ) { + this.intervalMs = options.intervalMs ?? BATCH_INTERVAL; + this.now = options.now ?? Date.now; + this.lastFlush = this.now(); + } + + /** Pending (un-flushed) point count. */ + get pending(): number { + return this.batch.length; + } + + /** Queue a point; flushes inline once the interval has elapsed. */ + write(value: number): void { + this.batch.push(value); + if (this.now() - this.lastFlush >= this.intervalMs) { + this.flush(); + } + } + + /** Flush if the interval has elapsed (called from the draw loop). */ + flushDue(): void { + if (this.batch.length > 0 && this.now() - this.lastFlush >= this.intervalMs) { + this.flush(); + } + } + + /** Force-flush the pending batch into the ring buffer. */ + flush(): void { + if (this.batch.length > 0) { + this.buffer.pushBatch(this.batch); + this.batch = []; + } + this.lastFlush = this.now(); + } +} + +export interface AdaptiveCapacityOptions { + max?: number; + min?: number; + thresholdMs?: number; + /** Points to add/remove per adjustment. */ + step?: number; + /** Consecutive slow frames before shrinking. */ + slowFramesToShrink?: number; + /** Consecutive healthy frames before growing back. */ + fastFramesToGrow?: number; +} + +/** + * Recommends a ring-buffer capacity from observed frame times: it shrinks the + * window after sustained slow frames (> {@link SLOW_FRAME_MS}) and grows it back + * once the frame rate recovers, keeping the draw loop above ~30 FPS. + */ +export class AdaptiveCapacityMonitor { + private current: number; + private slowStreak = 0; + private fastStreak = 0; + private readonly max: number; + private readonly min: number; + private readonly thresholdMs: number; + private readonly step: number; + private readonly slowFramesToShrink: number; + private readonly fastFramesToGrow: number; + + constructor(options: AdaptiveCapacityOptions = {}) { + this.max = options.max ?? MAX_POINTS; + this.min = options.min ?? MIN_POINTS; + this.thresholdMs = options.thresholdMs ?? SLOW_FRAME_MS; + this.step = options.step ?? 25; + this.slowFramesToShrink = options.slowFramesToShrink ?? 2; + this.fastFramesToGrow = options.fastFramesToGrow ?? 30; + this.current = this.max; + } + + get capacity(): number { + return this.current; + } + + /** Feed a frame's duration (ms); returns the recommended capacity. */ + record(frameMs: number): number { + if (frameMs > this.thresholdMs) { + this.slowStreak += 1; + this.fastStreak = 0; + if (this.slowStreak >= this.slowFramesToShrink) { + this.current = Math.max(this.min, this.current - this.step); + this.slowStreak = 0; + } + } else { + this.fastStreak += 1; + this.slowStreak = 0; + if (this.fastStreak >= this.fastFramesToGrow) { + this.current = Math.min(this.max, this.current + this.step); + this.fastStreak = 0; + } + } + return this.current; + } +} diff --git a/src/utils/helpers.ts b/src/utils/helpers.ts new file mode 100644 index 0000000..6a0d404 --- /dev/null +++ b/src/utils/helpers.ts @@ -0,0 +1,58 @@ +/** + * Small, dependency-free helper utilities. + */ + +/* eslint-disable @typescript-eslint/no-explicit-any */ + +/** + * Throttle `fn` so it runs at most once per `wait` ms. Leading and trailing: + * the first call fires immediately, subsequent calls within the window are + * coalesced into a single trailing call with the latest arguments. + */ +export function throttle any>( + fn: T, + wait: number +): (...args: Parameters) => void { + let lastRun = 0; + let timer: ReturnType | null = null; + let lastArgs: Parameters | null = null; + + return (...args: Parameters): void => { + const now = Date.now(); + const remaining = wait - (now - lastRun); + lastArgs = args; + + if (remaining <= 0) { + if (timer) { + clearTimeout(timer); + timer = null; + } + lastRun = now; + fn(...args); + } else if (!timer) { + timer = setTimeout(() => { + lastRun = Date.now(); + timer = null; + if (lastArgs) fn(...lastArgs); + }, remaining); + } + }; +} + +/** + * Debounce `fn` so it runs only after `wait` ms have elapsed since the last + * call. Useful for resize/settle events. + */ +export function debounce any>( + fn: T, + wait: number +): (...args: Parameters) => void { + let timer: ReturnType | null = null; + return (...args: Parameters): void => { + if (timer) clearTimeout(timer); + timer = setTimeout(() => { + timer = null; + fn(...args); + }, wait); + }; +} diff --git a/tests/unit/buffer.test.ts b/tests/unit/buffer.test.ts new file mode 100644 index 0000000..841a270 --- /dev/null +++ b/tests/unit/buffer.test.ts @@ -0,0 +1,145 @@ +import { describe, it, expect } from "vitest"; +import { + RingBuffer, + StreamBatcher, + AdaptiveCapacityMonitor, + MAX_POINTS, + MIN_POINTS, +} from "@/utils/buffer"; + +describe("RingBuffer", () => { + it("defaults to MAX_POINTS capacity", () => { + expect(new RingBuffer().capacity).toBe(MAX_POINTS); + }); + + it("retains pushed values in order", () => { + const b = new RingBuffer(4); + b.push(1); + b.push(2); + b.push(3); + expect(b.toArray()).toEqual([1, 2, 3]); + expect(b.size).toBe(3); + }); + + it("drops the oldest values past capacity", () => { + const b = new RingBuffer(3); + [1, 2, 3, 4, 5].forEach((v) => b.push(v)); + expect(b.toArray()).toEqual([3, 4, 5]); + }); + + it("pushBatch keeps only the most recent capacity", () => { + const b = new RingBuffer(3); + b.pushBatch([1, 2, 3, 4, 5]); + expect(b.toArray()).toEqual([3, 4, 5]); + b.pushBatch([]); // no-op + expect(b.toArray()).toEqual([3, 4, 5]); + }); + + it("setCapacity shrinks to the newest values", () => { + const b = new RingBuffer(5); + b.pushBatch([1, 2, 3, 4, 5]); + b.setCapacity(2); + expect(b.toArray()).toEqual([4, 5]); + expect(b.capacity).toBe(2); + }); + + it("rejects non-positive capacity", () => { + expect(() => new RingBuffer(0)).toThrow(); + expect(() => new RingBuffer(2).setCapacity(-1)).toThrow(); + }); + + it("clear empties the buffer", () => { + const b = new RingBuffer(3); + b.push(1); + b.clear(); + expect(b.size).toBe(0); + }); +}); + +describe("StreamBatcher", () => { + function controllableClock() { + let t = 0; + return { now: () => t, advance: (ms: number) => (t += ms) }; + } + + it("accumulates writes and flushes once the interval elapses", () => { + const clock = controllableClock(); + const buffer = new RingBuffer(100); + const batcher = new StreamBatcher(buffer, { intervalMs: 50, now: clock.now }); + + batcher.write(1); + batcher.write(2); + expect(batcher.pending).toBe(2); + expect(buffer.size).toBe(0); // not flushed yet + + clock.advance(50); + batcher.write(3); // crosses the interval → flush + expect(buffer.toArray()).toEqual([1, 2, 3]); + expect(batcher.pending).toBe(0); + }); + + it("flushDue only flushes when the interval has elapsed", () => { + const clock = controllableClock(); + const buffer = new RingBuffer(100); + const batcher = new StreamBatcher(buffer, { intervalMs: 50, now: clock.now }); + + batcher.write(1); + batcher.flushDue(); + expect(buffer.size).toBe(0); // too soon + clock.advance(50); + batcher.flushDue(); + expect(buffer.toArray()).toEqual([1]); + }); + + it("force flush empties the pending batch", () => { + const buffer = new RingBuffer(100); + const batcher = new StreamBatcher(buffer, { intervalMs: 50, now: () => 0 }); + batcher.write(7); + batcher.flush(); + expect(buffer.toArray()).toEqual([7]); + }); +}); + +describe("AdaptiveCapacityMonitor", () => { + it("starts at the max capacity", () => { + expect(new AdaptiveCapacityMonitor().capacity).toBe(MAX_POINTS); + }); + + it("shrinks after consecutive slow frames", () => { + const m = new AdaptiveCapacityMonitor({ step: 25, slowFramesToShrink: 2 }); + m.record(40); // slow #1 — no change yet + expect(m.capacity).toBe(MAX_POINTS); + const cap = m.record(40); // slow #2 — shrink + expect(cap).toBe(MAX_POINTS - 25); + }); + + it("does not shrink below the floor", () => { + const m = new AdaptiveCapacityMonitor({ + step: 50, + slowFramesToShrink: 1, + min: MIN_POINTS, + }); + for (let i = 0; i < 20; i++) m.record(100); + expect(m.capacity).toBe(MIN_POINTS); + }); + + it("grows back after sustained healthy frames", () => { + const m = new AdaptiveCapacityMonitor({ + step: 25, + slowFramesToShrink: 1, + fastFramesToGrow: 5, + }); + m.record(40); // shrink to 175 + expect(m.capacity).toBe(175); + for (let i = 0; i < 5; i++) m.record(10); // healthy run → grow + expect(m.capacity).toBe(200); + }); + + it("a fast frame resets the slow streak", () => { + const m = new AdaptiveCapacityMonitor({ slowFramesToShrink: 2 }); + m.record(40); // slow #1 + m.record(10); // healthy → resets streak + m.record(40); // slow #1 again (not #2) + expect(m.capacity).toBe(MAX_POINTS); + }); +}); diff --git a/tests/unit/helpers.test.ts b/tests/unit/helpers.test.ts new file mode 100644 index 0000000..87a8d7c --- /dev/null +++ b/tests/unit/helpers.test.ts @@ -0,0 +1,52 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { throttle, debounce } from "@/utils/helpers"; + +beforeEach(() => vi.useFakeTimers()); +afterEach(() => vi.useRealTimers()); + +describe("throttle", () => { + it("invokes immediately on the leading edge", () => { + const fn = vi.fn(); + const throttled = throttle(fn, 100); + throttled("a"); + expect(fn).toHaveBeenCalledTimes(1); + expect(fn).toHaveBeenLastCalledWith("a"); + }); + + it("suppresses calls within the window, then fires a trailing call", () => { + const fn = vi.fn(); + const throttled = throttle(fn, 100); + throttled(1); // leading + throttled(2); // suppressed + throttled(3); // suppressed, becomes trailing args + expect(fn).toHaveBeenCalledTimes(1); + + vi.advanceTimersByTime(100); + expect(fn).toHaveBeenCalledTimes(2); + expect(fn).toHaveBeenLastCalledWith(3); // latest args + }); + + it("allows another leading call after the window passes", () => { + const fn = vi.fn(); + const throttled = throttle(fn, 100); + throttled("first"); + vi.advanceTimersByTime(150); + throttled("second"); + expect(fn).toHaveBeenCalledTimes(2); + expect(fn).toHaveBeenLastCalledWith("second"); + }); +}); + +describe("debounce", () => { + it("only fires after the quiet period", () => { + const fn = vi.fn(); + const debounced = debounce(fn, 100); + debounced("a"); + debounced("b"); + vi.advanceTimersByTime(99); + expect(fn).not.toHaveBeenCalled(); + vi.advanceTimersByTime(1); + expect(fn).toHaveBeenCalledTimes(1); + expect(fn).toHaveBeenLastCalledWith("b"); + }); +});