diff --git a/src/hooks/useContractState.ts b/src/hooks/useContractState.ts new file mode 100644 index 0000000..beba195 --- /dev/null +++ b/src/hooks/useContractState.ts @@ -0,0 +1,154 @@ +"use client"; + +import { useCallback, useEffect, useRef } from "react"; +import { LamportClock } from "@/utils/lamportClock"; +import { mergeEvents } from "@/utils/crdtMerge"; +import { + crdtStore, + runStalenessWatch, + useCrdtState, + type CrdtState, +} from "@/store/slices/crdtSlice"; +import { + RECONCILE_TIMEOUT_MS, + type ChainId, + type CrdtEvent, +} from "@/types/crdt"; +import type { + CrdtMergeRequest, + CrdtMergeResponse, +} from "@/workers/crdtMerge.worker"; + +/** + * Subscribes the dashboard to multi-chain Soroban events. Each ingested event + * is stamped with the originating chain's Lamport clock, batched, and merged + * (off-thread when a worker is available, otherwise inline). Merged diffs are + * applied to {@link crdtStore}, and a staleness watch flags any chain that has + * gone quiet past the reconcile timeout. + */ + +/** An event before it is Lamport-stamped (the clock supplies the timestamp). */ +export type RawCrdtEvent = Omit; + +export interface UseContractStateDeps { + createWorker?: () => Worker; + now?: () => number; + /** How often to flush the event batch (ms). @default 16 */ + flushIntervalMs?: number; + /** How often to run the staleness watch (ms). @default 5000 */ + stalenessIntervalMs?: number; + reconcileTimeoutMs?: number; + /** Called when a chain must be fully re-fetched. */ + onReconcile?: (chainId: ChainId) => void; +} + +export interface UseContractStateResult { + state: CrdtState; + /** Stamp and enqueue an event from a chain. */ + ingest: (chainId: ChainId, event: RawCrdtEvent) => void; + reconciling: ChainId[]; +} + +export function useContractState( + deps: UseContractStateDeps = {} +): UseContractStateResult { + const state = useCrdtState(); + + const clocksRef = useRef>(new Map()); + const batchRef = useRef([]); + const workerRef = useRef(null); + const batchSeqRef = useRef(0); + const depsRef = useRef(deps); + depsRef.current = deps; + + const clockFor = useCallback((chainId: ChainId): LamportClock => { + let clock = clocksRef.current.get(chainId); + if (!clock) { + clock = new LamportClock(chainId); + clocksRef.current.set(chainId, clock); + } + return clock; + }, []); + + const ingest = useCallback( + (chainId: ChainId, event: RawCrdtEvent) => { + const timestamp = clockFor(chainId).tick(); + batchRef.current.push({ ...event, timestamp } as CrdtEvent); + }, + [clockFor] + ); + + // Flush + merge loop, worker spawn, and staleness watch. + useEffect(() => { + const now = depsRef.current.now ?? Date.now; + const flushMs = depsRef.current.flushIntervalMs ?? 16; + const staleMs = depsRef.current.stalenessIntervalMs ?? 5000; + const reconcileTimeout = + depsRef.current.reconcileTimeoutMs ?? RECONCILE_TIMEOUT_MS; + + // Try to run the merge off the main thread. + const createWorker = depsRef.current.createWorker; + if (createWorker) { + try { + const worker = createWorker(); + worker.onmessage = (e: MessageEvent) => { + const { diffs, chainSeen } = e.data; + crdtStore.dispatch({ + type: "APPLY_DIFFS", + payload: { diffs, chainSeen, at: now() }, + }); + }; + workerRef.current = worker; + } catch { + workerRef.current = null; + } + } + + const flush = () => { + if (batchRef.current.length === 0) return; + const events = batchRef.current; + batchRef.current = []; + + const worker = workerRef.current; + if (worker) { + const request: CrdtMergeRequest = { + type: "merge", + batchId: ++batchSeqRef.current, + events, + }; + worker.postMessage(request); + } else { + // Inline fallback: merge against the current store state. + const snapshot = crdtStore.getState(); + const { diffs, chainSeen } = mergeEvents( + snapshot.resources, + snapshot.vectorClocks, + events + ); + crdtStore.dispatch({ + type: "APPLY_DIFFS", + payload: { diffs, chainSeen, at: now() }, + }); + } + }; + + const flushTimer = setInterval(flush, flushMs); + const staleTimer = setInterval(() => { + const flagged = runStalenessWatch(now(), reconcileTimeout); + for (const chainId of flagged) depsRef.current.onReconcile?.(chainId); + }, staleMs); + + return () => { + clearInterval(flushTimer); + clearInterval(staleTimer); + const worker = workerRef.current; + if (worker) { + worker.postMessage({ type: "reset" } satisfies CrdtMergeRequest); + worker.terminate(); + workerRef.current = null; + } + }; + }, []); + + return { state, ingest, reconciling: state.reconciling }; +} diff --git a/src/store/slices/crdtSlice.ts b/src/store/slices/crdtSlice.ts new file mode 100644 index 0000000..7bfc4c9 --- /dev/null +++ b/src/store/slices/crdtSlice.ts @@ -0,0 +1,170 @@ +"use client"; + +import { useSyncExternalStore } from "react"; +import { orSetValues } from "@/utils/crdtMerge"; +import { + RECONCILE_TIMEOUT_MS, + type ChainId, + type ResourceDiff, + type ResourceState, + type VectorClock, +} from "@/types/crdt"; + +/** + * Store holding the merged CRDT state, per-resource vector clocks, and the last + * time each chain produced an update. A staleness watcher flags any chain that + * has been silent past the reconcile timeout for a full state re-fetch. + * + * Custom singleton store, matching the codebase pattern. + */ + +export interface CrdtState { + resources: Record; + vectorClocks: Record; + /** Wall-clock ms each chain was last seen. */ + lastSeen: Partial>; + /** Chains currently flagged for a full snapshot re-fetch. */ + reconciling: ChainId[]; +} + +export type CrdtAction = + | { + type: "APPLY_DIFFS"; + payload: { + diffs: ResourceDiff[]; + chainSeen: Partial>; + at: number; + }; + } + | { type: "CHAIN_SEEN"; payload: { chainId: ChainId; at: number } } + | { type: "RECONCILE"; payload: { chainId: ChainId } } + | { type: "RECONCILE_DONE"; payload: { chainId: ChainId } } + | { type: "RESET" }; + +const initialState: CrdtState = { + resources: {}, + vectorClocks: {}, + lastSeen: {}, + reconciling: [], +}; + +/** Chains whose last update is older than the reconcile timeout. */ +export function findStaleChains( + lastSeen: Partial>, + now: number, + timeoutMs: number = RECONCILE_TIMEOUT_MS +): ChainId[] { + const stale: ChainId[] = []; + for (const [chainId, seenAt] of Object.entries(lastSeen) as [ChainId, number][]) { + if (now - seenAt > timeoutMs) stale.push(chainId); + } + return stale; +} + +type Listener = (state: CrdtState) => void; + +class CrdtStore { + private state: CrdtState = initialState; + private listeners = new Set(); + + getState = (): Readonly => this.state; + + subscribe = (listener: Listener): (() => void) => { + this.listeners.add(listener); + return () => this.listeners.delete(listener); + }; + + dispatch(action: CrdtAction): void { + const next = this.reducer(this.state, action); + if (next !== this.state) { + this.state = next; + this.notify(); + } + } + + private reducer(state: CrdtState, action: CrdtAction): CrdtState { + switch (action.type) { + case "APPLY_DIFFS": { + const { diffs, chainSeen, at } = action.payload; + if (diffs.length === 0 && Object.keys(chainSeen).length === 0) return state; + const resources = { ...state.resources }; + const vectorClocks = { ...state.vectorClocks }; + for (const diff of diffs) { + resources[diff.resourceId] = diff.state; + vectorClocks[diff.resourceId] = diff.vectorClock; + } + const lastSeen = { ...state.lastSeen }; + for (const chainId of Object.keys(chainSeen) as ChainId[]) { + lastSeen[chainId] = at; + } + // Any chain we just heard from is no longer stale. + const reconciling = state.reconciling.filter( + (c) => !(c in chainSeen) + ); + return { ...state, resources, vectorClocks, lastSeen, reconciling }; + } + case "CHAIN_SEEN": + return { + ...state, + lastSeen: { ...state.lastSeen, [action.payload.chainId]: action.payload.at }, + reconciling: state.reconciling.filter((c) => c !== action.payload.chainId), + }; + case "RECONCILE": + if (state.reconciling.includes(action.payload.chainId)) return state; + return { + ...state, + reconciling: [...state.reconciling, action.payload.chainId], + }; + case "RECONCILE_DONE": + return { + ...state, + reconciling: state.reconciling.filter((c) => c !== action.payload.chainId), + }; + case "RESET": + return initialState; + default: + return state; + } + } + + private notify(): void { + for (const listener of this.listeners) listener(this.state); + } +} + +/** Shared singleton CRDT store. */ +export const crdtStore = new CrdtStore(); + +/** Flag any stale chains for reconciliation; returns the chains flagged. */ +export function runStalenessWatch( + now: number, + timeoutMs: number = RECONCILE_TIMEOUT_MS +): ChainId[] { + const stale = findStaleChains(crdtStore.getState().lastSeen, now, timeoutMs); + for (const chainId of stale) { + crdtStore.dispatch({ type: "RECONCILE", payload: { chainId } }); + } + return stale; +} + +// --- Selectors / bindings --------------------------------------------------- + +/** Materialised value of a resource (scalar for LWW, array for OR-set). */ +export function selectResourceValue( + state: CrdtState, + resourceId: string +): unknown { + const resource = state.resources[resourceId]; + if (!resource) return undefined; + return resource.kind === "lww" + ? resource.value + : orSetValues(resource as Parameters[0]); +} + +export function useCrdtState(): CrdtState { + return useSyncExternalStore( + crdtStore.subscribe, + crdtStore.getState, + crdtStore.getState + ); +} diff --git a/src/types/crdt.ts b/src/types/crdt.ts new file mode 100644 index 0000000..0997968 --- /dev/null +++ b/src/types/crdt.ts @@ -0,0 +1,113 @@ +/** + * Types for the multi-chain Soroban state synchronizer. + * + * The same logical resource (e.g. a meter reading) can be emitted by several + * chains at different latencies. CRDTs merge those concurrent updates + * deterministically so the local store always converges: + * - scalar readings → Last-Writer-Wins register + * - collections → Observed-Remove set (tombstone-based) + * + * Ordering uses a Lamport timestamp `(chainId, counter)` with chain priority + * (mainnet > testnet > futurenet) as the tie-breaker. + */ + +export type ChainId = "mainnet" | "testnet" | "futurenet"; + +/** Higher wins ties. */ +export const CHAIN_PRIORITY: Record = { + mainnet: 3, + testnet: 2, + futurenet: 1, +}; + +/** Lamport timestamp: a logical counter tagged with its originating chain. */ +export interface LamportTimestamp { + chainId: ChainId; + counter: number; +} + +/** Last-Writer-Wins register for a scalar value. */ +export interface LWWRegister { + kind: "lww"; + value: T; + timestamp: LamportTimestamp; +} + +/** + * Observed-Remove set state. Each element maps to the set of unique tags that + * added it; `tombstones` holds tags that have been removed. An element is + * present iff it has at least one add-tag not in `tombstones`. + */ +export interface ORSet { + kind: "or-set"; + /** element (serialized) → add tags. */ + adds: Record; + /** removed add-tags. */ + tombstones: string[]; + /** Original element values keyed by their serialized form. */ + values: Record; +} + +export type ResourceState = LWWRegister | ORSet; + +/** Per-resource vector clock: highest counter observed from each chain. */ +export type VectorClock = Partial>; + +// --- Events ----------------------------------------------------------------- + +interface BaseEvent { + resourceId: string; + timestamp: LamportTimestamp; +} + +/** Set a scalar register value. */ +export interface RegisterSetEvent extends BaseEvent { + type: "register-set"; + value: T; +} + +/** Add an element to an OR-set (carries the unique add-tag). */ +export interface OrSetAddEvent extends BaseEvent { + type: "or-set-add"; + element: T; + tag: string; +} + +/** + * Remove an element from an OR-set. Carries the exact add-tags the remover + * observed, so the operation is commutative regardless of merge order (only + * those tags are tombstoned; concurrent adds with fresh tags survive). + */ +export interface OrSetRemoveEvent extends BaseEvent { + type: "or-set-remove"; + element: T; + tags: string[]; +} + +export type CrdtEvent = + | RegisterSetEvent + | OrSetAddEvent + | OrSetRemoveEvent; + +// --- Diffs (worker → store) ------------------------------------------------- + +/** A merged resource patch produced by the merge worker. */ +export interface ResourceDiff { + resourceId: string; + state: ResourceState; + /** Vector clock after applying the batch. */ + vectorClock: VectorClock; +} + +export interface MergeResult { + diffs: ResourceDiff[]; + /** Highest counter seen per chain in this batch (for the staleness watcher). */ + chainSeen: Partial>; +} + +// --- Invariants ------------------------------------------------------------- + +/** Ledger closes (~5 s each) before a chain is force-reconciled. */ +export const MAX_DRIFT_LEDGERS = 12; +/** Approx wall-clock equivalent of {@link MAX_DRIFT_LEDGERS} (ms). */ +export const RECONCILE_TIMEOUT_MS = 60_000; diff --git a/src/utils/crdtMerge.ts b/src/utils/crdtMerge.ts new file mode 100644 index 0000000..0dfff3b --- /dev/null +++ b/src/utils/crdtMerge.ts @@ -0,0 +1,187 @@ +/** + * Pure CRDT merge engine. Shared by the merge worker and the tests. + * + * - LWW register: keep the value with the greater Lamport timestamp. + * - OR-set: tag-based add / tombstone remove; an element is present iff it has + * an add-tag that is not tombstoned. Both operations are commutative, + * associative and idempotent, so any interleaving of a batch converges. + */ + +import { compareTimestamp } from "@/utils/lamportClock"; +import { + type ChainId, + type CrdtEvent, + type LWWRegister, + type LamportTimestamp, + type MergeResult, + type ORSet, + type ResourceDiff, + type ResourceState, + type VectorClock, +} from "@/types/crdt"; + +const unique = (arr: T[]): T[] => [...new Set(arr)]; + +// --- LWW register ----------------------------------------------------------- + +export function mergeRegister( + a: LWWRegister, + b: LWWRegister +): LWWRegister { + return compareTimestamp(a.timestamp, b.timestamp) >= 0 ? a : b; +} + +// --- OR-set ----------------------------------------------------------------- + +export function emptyOrSet(): ORSet { + return { kind: "or-set", adds: {}, tombstones: [], values: {} }; +} + +export function orSetAdd( + set: ORSet, + element: T, + tag: string +): ORSet { + return { + kind: "or-set", + adds: { ...set.adds, [element]: unique([...(set.adds[element] ?? []), tag]) }, + tombstones: set.tombstones, + values: { ...set.values, [element]: element }, + }; +} + +/** The add-tags of `element` that are not yet tombstoned (what a remove sees). */ +export function observedTags( + set: ORSet, + element: T +): string[] { + const tombstoned = new Set(set.tombstones); + return (set.adds[element] ?? []).filter((t) => !tombstoned.has(t)); +} + +/** Tombstone a specific set of add-tags (commutative / idempotent). */ +export function orSetRemoveTags( + set: ORSet, + tags: string[] +): ORSet { + if (tags.length === 0) return set; + return { + kind: "or-set", + adds: set.adds, + tombstones: unique([...set.tombstones, ...tags]), + values: set.values, + }; +} + +/** Convenience: remove an element by tombstoning its currently-observed tags. */ +export function orSetRemove( + set: ORSet, + element: T +): ORSet { + return orSetRemoveTags(set, observedTags(set, element)); +} + +export function orSetMerge( + a: ORSet, + b: ORSet +): ORSet { + const adds: Record = { ...a.adds }; + for (const [el, tags] of Object.entries(b.adds)) { + adds[el] = unique([...(adds[el] ?? []), ...tags]); + } + return { + kind: "or-set", + adds, + tombstones: unique([...a.tombstones, ...b.tombstones]), + values: { ...a.values, ...b.values }, + }; +} + +/** Materialise the present elements of an OR-set. */ +export function orSetValues(set: ORSet): T[] { + const tombstoned = new Set(set.tombstones); + const out: T[] = []; + for (const [el, tags] of Object.entries(set.adds)) { + if (tags.some((t) => !tombstoned.has(t))) out.push(set.values[el]); + } + return out; +} + +// --- Vector clock ----------------------------------------------------------- + +function bumpClock(clock: VectorClock, ts: LamportTimestamp): VectorClock { + const current = clock[ts.chainId] ?? 0; + if (ts.counter <= current) return clock; + return { ...clock, [ts.chainId]: ts.counter }; +} + +// --- Event application ------------------------------------------------------- + +function applyEvent( + state: ResourceState | undefined, + event: CrdtEvent +): ResourceState { + switch (event.type) { + case "register-set": { + const incoming: LWWRegister = { + kind: "lww", + value: event.value, + timestamp: event.timestamp, + }; + if (state && state.kind === "lww") return mergeRegister(state, incoming); + return incoming; + } + case "or-set-add": { + const set = + state && state.kind === "or-set" + ? (state as ORSet) + : emptyOrSet(); + return orSetAdd(set, event.element, event.tag); + } + case "or-set-remove": { + const set = + state && state.kind === "or-set" + ? (state as ORSet) + : emptyOrSet(); + // Tombstone exactly the observed tags → order-independent merge. + return orSetRemoveTags(set, event.tags); + } + } +} + +/** + * Merge a batch of incoming events into the current state, grouped by resource. + * Returns the per-resource diffs (final state + vector clock) and the highest + * counter seen per chain (for the staleness watcher). Inputs are not mutated. + */ +export function mergeEvents( + states: Record, + clocks: Record, + events: CrdtEvent[] +): MergeResult { + const touched = new Map(); + const touchedClocks = new Map(); + const chainSeen: Partial> = {}; + + for (const event of events) { + const id = event.resourceId; + const current = touched.get(id) ?? states[id]; + touched.set(id, applyEvent(current, event)); + + const clock = touchedClocks.get(id) ?? clocks[id] ?? {}; + touchedClocks.set(id, bumpClock(clock, event.timestamp)); + + const { chainId, counter } = event.timestamp; + if ((chainSeen[chainId] ?? 0) < counter) chainSeen[chainId] = counter; + } + + const diffs: ResourceDiff[] = []; + for (const [resourceId, state] of touched) { + diffs.push({ + resourceId, + state, + vectorClock: touchedClocks.get(resourceId) ?? {}, + }); + } + return { diffs, chainSeen }; +} diff --git a/src/utils/lamportClock.ts b/src/utils/lamportClock.ts new file mode 100644 index 0000000..6c32582 --- /dev/null +++ b/src/utils/lamportClock.ts @@ -0,0 +1,78 @@ +/** + * Lamport timestamps with chain-priority tie-breaking. + * + * A timestamp is `(chainId, counter)`. Ordering is by `counter`, and ties (the + * "concurrent" case) are broken deterministically by chain priority + * (mainnet > testnet > futurenet) so every replica converges to the same winner. + */ + +import { + CHAIN_PRIORITY, + type ChainId, + type LamportTimestamp, +} from "@/types/crdt"; + +/** + * Total order over Lamport timestamps. Returns -1 / 0 / 1. Two timestamps are + * equal only when they share both chain and counter. + */ +export function compareTimestamp( + a: LamportTimestamp, + b: LamportTimestamp +): -1 | 0 | 1 { + if (a.counter !== b.counter) return a.counter < b.counter ? -1 : 1; + const pa = CHAIN_PRIORITY[a.chainId]; + const pb = CHAIN_PRIORITY[b.chainId]; + if (pa !== pb) return pa < pb ? -1 : 1; + return 0; +} + +/** True when `a` strictly dominates `b` in the total order. */ +export function dominates(a: LamportTimestamp, b: LamportTimestamp): boolean { + return compareTimestamp(a, b) === 1; +} + +/** The greater of two timestamps (ties → the higher-priority chain). */ +export function maxTimestamp( + a: LamportTimestamp, + b: LamportTimestamp +): LamportTimestamp { + return compareTimestamp(a, b) >= 0 ? a : b; +} + +/** Per-chain Lamport clock. */ +export class LamportClock { + constructor( + readonly chainId: ChainId, + private counter = 0 + ) {} + + /** Current counter value (without advancing). */ + get value(): number { + return this.counter; + } + + /** Advance the clock and stamp a new local event. */ + tick(): LamportTimestamp { + this.counter += 1; + return { chainId: this.chainId, counter: this.counter }; + } + + /** Merge in a remote timestamp (Lamport receive rule). */ + observe(remote: LamportTimestamp): void { + if (remote.counter > this.counter) this.counter = remote.counter; + } + + /** + * Observe a remote timestamp and immediately stamp a derived local event + * (receive-then-send): `counter = max(local, remote) + 1`. + */ + tickAfter(remote: LamportTimestamp): LamportTimestamp { + this.observe(remote); + return this.tick(); + } + + peek(): LamportTimestamp { + return { chainId: this.chainId, counter: this.counter }; + } +} diff --git a/src/workers/crdtMerge.worker.ts b/src/workers/crdtMerge.worker.ts new file mode 100644 index 0000000..7e92568 --- /dev/null +++ b/src/workers/crdtMerge.worker.ts @@ -0,0 +1,55 @@ +/** + * CRDT merge worker. Holds the authoritative merged state off the main thread + * and folds each incoming event batch into it, posting back the resulting + * resource diffs. Keeping the accumulated state here means a 150 ev/s burst is + * merged without blocking the render thread. + */ + +import { mergeEvents } from "@/utils/crdtMerge"; +import type { + CrdtEvent, + ResourceState, + VectorClock, +} from "@/types/crdt"; + +export type CrdtMergeRequest = + | { type: "merge"; batchId: number; events: CrdtEvent[] } + | { type: "reset" }; + +export interface CrdtMergeResponse { + type: "merged"; + batchId: number; + diffs: ReturnType["diffs"]; + chainSeen: ReturnType["chainSeen"]; +} + +const worker = self as unknown as Worker; + +let states: Record = {}; +let clocks: Record = {}; + +worker.addEventListener("message", (event: MessageEvent) => { + const msg = event.data; + + if (msg.type === "reset") { + states = {}; + clocks = {}; + return; + } + + const result = mergeEvents(states, clocks, msg.events); + + // Fold the diffs back into the worker's accumulated state. + for (const diff of result.diffs) { + states[diff.resourceId] = diff.state; + clocks[diff.resourceId] = diff.vectorClock; + } + + const response: CrdtMergeResponse = { + type: "merged", + batchId: msg.batchId, + diffs: result.diffs, + chainSeen: result.chainSeen, + }; + worker.postMessage(response); +}); diff --git a/tests/unit/crdtMerge.test.ts b/tests/unit/crdtMerge.test.ts new file mode 100644 index 0000000..b849019 --- /dev/null +++ b/tests/unit/crdtMerge.test.ts @@ -0,0 +1,148 @@ +import { describe, it, expect } from "vitest"; +import { + mergeRegister, + emptyOrSet, + orSetAdd, + orSetRemove, + orSetRemoveTags, + orSetMerge, + orSetValues, + observedTags, + mergeEvents, +} from "@/utils/crdtMerge"; +import type { + ChainId, + CrdtEvent, + LWWRegister, + LamportTimestamp, + ResourceState, + VectorClock, +} from "@/types/crdt"; + +const ts = (chainId: ChainId, counter: number): LamportTimestamp => ({ chainId, counter }); +const reg = (value: T, t: LamportTimestamp): LWWRegister => ({ + kind: "lww", + value, + timestamp: t, +}); + +describe("mergeRegister (LWW)", () => { + it("keeps the value with the greater timestamp", () => { + expect(mergeRegister(reg("a", ts("testnet", 1)), reg("b", ts("testnet", 2))).value).toBe("b"); + }); + + it("breaks concurrent ties by chain priority", () => { + const winner = mergeRegister(reg("t", ts("testnet", 5)), reg("m", ts("mainnet", 5))); + expect(winner.value).toBe("m"); + }); + + it("is commutative", () => { + const a = reg("a", ts("testnet", 5)); + const b = reg("b", ts("mainnet", 5)); + expect(mergeRegister(a, b)).toEqual(mergeRegister(b, a)); + }); +}); + +describe("OR-set", () => { + it("adds and materialises elements", () => { + let s = emptyOrSet(); + s = orSetAdd(s, "deviceA", "t1"); + s = orSetAdd(s, "deviceB", "t2"); + expect(orSetValues(s).sort()).toEqual(["deviceA", "deviceB"]); + }); + + it("removes by tombstoning observed tags", () => { + let s = orSetAdd(emptyOrSet(), "x", "t1"); + s = orSetRemove(s, "x"); + expect(orSetValues(s)).toEqual([]); + }); + + it("supports add → remove → re-add with a fresh tag", () => { + let s = orSetAdd(emptyOrSet(), "x", "t1"); + s = orSetRemove(s, "x"); // tombstones t1 + expect(orSetValues(s)).toEqual([]); + s = orSetAdd(s, "x", "t2"); // fresh tag survives + expect(orSetValues(s)).toEqual(["x"]); + }); + + it("concurrent add wins over a remove that didn't observe its tag", () => { + // Replica A adds with tag tA; replica B removes (observing only tB). + const base = orSetAdd(emptyOrSet(), "x", "tB"); + const a = orSetAdd(base, "x", "tA"); // A adds a fresh tag + const observed = observedTags(base, "x"); // B only saw [tB] + const b = orSetRemoveTags(base, observed); // B removes tB + const merged = orSetMerge(a, b); + // tA was never tombstoned → element survives (add-wins). + expect(orSetValues(merged)).toEqual(["x"]); + }); + + it("merge is commutative", () => { + const a = orSetAdd(orSetAdd(emptyOrSet(), "x", "t1"), "y", "t2"); + const b = orSetRemoveTags(orSetAdd(emptyOrSet(), "z", "t3"), ["t1"]); + expect(orSetValues(orSetMerge(a, b)).sort()).toEqual( + orSetValues(orSetMerge(b, a)).sort() + ); + }); +}); + +describe("mergeEvents convergence", () => { + const events: CrdtEvent[] = [ + { type: "register-set", resourceId: "meter:1", value: 10, timestamp: ts("testnet", 1) }, + { type: "register-set", resourceId: "meter:1", value: 20, timestamp: ts("mainnet", 3) }, + { type: "register-set", resourceId: "meter:1", value: 15, timestamp: ts("futurenet", 2) }, + { type: "or-set-add", resourceId: "devices", element: "d1", tag: "g1", timestamp: ts("testnet", 1) }, + { type: "or-set-add", resourceId: "devices", element: "d2", tag: "g2", timestamp: ts("mainnet", 2) }, + { type: "or-set-remove", resourceId: "devices", element: "d1", tags: ["g1"], timestamp: ts("mainnet", 4) }, + ]; + + function materialise(state: ResourceState | undefined): unknown { + if (!state) return undefined; + return state.kind === "lww" ? state.value : orSetValues(state).sort(); + } + + function runOrder(order: CrdtEvent[]): Record { + const { diffs } = mergeEvents({}, {}, order); + const out: Record = {}; + for (const d of diffs) out[d.resourceId] = materialise(d.state); + return out; + } + + it("converges to the same state under arbitrary interleavings", () => { + const original = runOrder(events); + // The LWW winner is the highest timestamp (mainnet,3) → 20. + expect(original["meter:1"]).toBe(20); + // d1 removed (g1 tombstoned), d2 present. + expect(original["devices"]).toEqual(["d2"]); + + // Several permutations must yield identical materialised state. + const reversed = runOrder([...events].reverse()); + const rotated = runOrder([...events.slice(3), ...events.slice(0, 3)]); + expect(reversed).toEqual(original); + expect(rotated).toEqual(original); + }); + + it("reports vector clocks and chain-seen counters", () => { + const { diffs, chainSeen } = mergeEvents({}, {}, events); + const meter = diffs.find((d) => d.resourceId === "meter:1")!; + const expectedClock: VectorClock = { testnet: 1, mainnet: 3, futurenet: 2 }; + expect(meter.vectorClock).toEqual(expectedClock); + expect(chainSeen).toEqual({ testnet: 1, mainnet: 4, futurenet: 2 }); + }); + + it("folds incrementally into prior state like the worker does", () => { + const first = mergeEvents({}, {}, events.slice(0, 3)); + const states: Record = {}; + const clocks: Record = {}; + for (const d of first.diffs) { + states[d.resourceId] = d.state; + clocks[d.resourceId] = d.vectorClock; + } + const second = mergeEvents(states, clocks, events.slice(3)); + const all: Record = {}; + for (const d of [...first.diffs, ...second.diffs]) { + all[d.resourceId] = materialise(d.state); + } + expect(all["meter:1"]).toBe(20); + expect(all["devices"]).toEqual(["d2"]); + }); +}); diff --git a/tests/unit/crdtSlice.test.ts b/tests/unit/crdtSlice.test.ts new file mode 100644 index 0000000..304760a --- /dev/null +++ b/tests/unit/crdtSlice.test.ts @@ -0,0 +1,89 @@ +import { describe, it, expect, beforeEach } from "vitest"; +import { + crdtStore, + findStaleChains, + runStalenessWatch, + selectResourceValue, +} from "@/store/slices/crdtSlice"; +import { RECONCILE_TIMEOUT_MS, type ResourceDiff } from "@/types/crdt"; + +beforeEach(() => crdtStore.dispatch({ type: "RESET" })); + +const registerDiff = (id: string, value: unknown): ResourceDiff => ({ + resourceId: id, + state: { kind: "lww", value, timestamp: { chainId: "mainnet", counter: 1 } }, + vectorClock: { mainnet: 1 }, +}); + +describe("findStaleChains", () => { + it("flags chains silent past the timeout", () => { + const lastSeen = { mainnet: 0, testnet: 1000 }; + expect(findStaleChains(lastSeen, RECONCILE_TIMEOUT_MS + 500, RECONCILE_TIMEOUT_MS)).toEqual([ + "mainnet", + ]); + }); + + it("returns nothing when all chains are fresh", () => { + expect(findStaleChains({ mainnet: 1000 }, 1500, RECONCILE_TIMEOUT_MS)).toEqual([]); + }); +}); + +describe("crdtStore", () => { + it("applies diffs and records the chains seen", () => { + crdtStore.dispatch({ + type: "APPLY_DIFFS", + payload: { + diffs: [registerDiff("meter:1", 42)], + chainSeen: { mainnet: 1 }, + at: 5000, + }, + }); + const state = crdtStore.getState(); + expect(selectResourceValue(state, "meter:1")).toBe(42); + expect(state.lastSeen.mainnet).toBe(5000); + }); + + it("materialises an OR-set resource as an array", () => { + crdtStore.dispatch({ + type: "APPLY_DIFFS", + payload: { + diffs: [ + { + resourceId: "devices", + state: { + kind: "or-set", + adds: { d1: ["t1"], d2: ["t2"] }, + tombstones: ["t2"], + values: { d1: "d1", d2: "d2" }, + }, + vectorClock: { mainnet: 2 }, + }, + ], + chainSeen: { mainnet: 2 }, + at: 1, + }, + }); + expect(selectResourceValue(crdtStore.getState(), "devices")).toEqual(["d1"]); + }); + + it("flags stale chains for reconciliation, then clears on next sighting", () => { + crdtStore.dispatch({ type: "CHAIN_SEEN", payload: { chainId: "testnet", at: 0 } }); + const flagged = runStalenessWatch(RECONCILE_TIMEOUT_MS + 1000, RECONCILE_TIMEOUT_MS); + expect(flagged).toEqual(["testnet"]); + expect(crdtStore.getState().reconciling).toEqual(["testnet"]); + + // A fresh sighting clears the reconcile flag. + crdtStore.dispatch({ type: "CHAIN_SEEN", payload: { chainId: "testnet", at: 999999 } }); + expect(crdtStore.getState().reconciling).toEqual([]); + }); + + it("applying a diff clears reconcile for chains heard from", () => { + crdtStore.dispatch({ type: "RECONCILE", payload: { chainId: "futurenet" } }); + expect(crdtStore.getState().reconciling).toContain("futurenet"); + crdtStore.dispatch({ + type: "APPLY_DIFFS", + payload: { diffs: [], chainSeen: { futurenet: 5 }, at: 10 }, + }); + expect(crdtStore.getState().reconciling).toEqual([]); + }); +}); diff --git a/tests/unit/lamportClock.test.ts b/tests/unit/lamportClock.test.ts new file mode 100644 index 0000000..e09646c --- /dev/null +++ b/tests/unit/lamportClock.test.ts @@ -0,0 +1,70 @@ +import { describe, it, expect } from "vitest"; +import { + LamportClock, + compareTimestamp, + maxTimestamp, + dominates, +} from "@/utils/lamportClock"; +import type { LamportTimestamp } from "@/types/crdt"; + +const ts = (chainId: LamportTimestamp["chainId"], counter: number): LamportTimestamp => ({ + chainId, + counter, +}); + +describe("compareTimestamp", () => { + it("orders by counter first", () => { + expect(compareTimestamp(ts("testnet", 1), ts("testnet", 2))).toBe(-1); + expect(compareTimestamp(ts("testnet", 3), ts("testnet", 2))).toBe(1); + }); + + it("breaks ties by chain priority (mainnet > testnet > futurenet)", () => { + expect(compareTimestamp(ts("mainnet", 5), ts("testnet", 5))).toBe(1); + expect(compareTimestamp(ts("futurenet", 5), ts("testnet", 5))).toBe(-1); + }); + + it("is equal only for the same chain and counter", () => { + expect(compareTimestamp(ts("mainnet", 5), ts("mainnet", 5))).toBe(0); + }); + + it("is a deterministic total order regardless of argument order", () => { + const a = ts("testnet", 5); + const b = ts("mainnet", 5); + expect(compareTimestamp(a, b)).toBe(-compareTimestamp(b, a)); + }); + + it("maxTimestamp and dominates agree with compare", () => { + expect(maxTimestamp(ts("futurenet", 4), ts("mainnet", 4))).toEqual( + ts("mainnet", 4) + ); + expect(dominates(ts("mainnet", 4), ts("testnet", 4))).toBe(true); + expect(dominates(ts("testnet", 4), ts("mainnet", 4))).toBe(false); + }); +}); + +describe("LamportClock", () => { + it("ticks monotonically", () => { + const clock = new LamportClock("testnet"); + expect(clock.tick()).toEqual(ts("testnet", 1)); + expect(clock.tick()).toEqual(ts("testnet", 2)); + expect(clock.value).toBe(2); + }); + + it("observes remote timestamps and advances past them", () => { + const clock = new LamportClock("testnet", 3); + clock.observe(ts("mainnet", 10)); + expect(clock.value).toBe(10); + expect(clock.tick()).toEqual(ts("testnet", 11)); + }); + + it("does not regress on a lower remote timestamp", () => { + const clock = new LamportClock("testnet", 7); + clock.observe(ts("mainnet", 2)); + expect(clock.value).toBe(7); + }); + + it("tickAfter implements receive-then-send", () => { + const clock = new LamportClock("testnet", 4); + expect(clock.tickAfter(ts("mainnet", 9))).toEqual(ts("testnet", 10)); + }); +});