Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions src/hooks/useContractState.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"use client";

import { useCallback, useEffect, useRef } from "react";
import { LamportClock } from "@/utils/lamportClock";
import { mergeEvents } from "@/utils/crdtMerge";
import {
crdtStore,
runStalenessWatch,
useCrdtState,
type CrdtState,
} from "@/store/slices/crdtSlice";
import {
RECONCILE_TIMEOUT_MS,
type ChainId,
type CrdtEvent,
} from "@/types/crdt";
import type {
CrdtMergeRequest,
CrdtMergeResponse,
} from "@/workers/crdtMerge.worker";

/**
* Subscribes the dashboard to multi-chain Soroban events. Each ingested event
* is stamped with the originating chain's Lamport clock, batched, and merged
* (off-thread when a worker is available, otherwise inline). Merged diffs are
* applied to {@link crdtStore}, and a staleness watch flags any chain that has
* gone quiet past the reconcile timeout.
*/

/** An event before it is Lamport-stamped (the clock supplies the timestamp). */
export type RawCrdtEvent = Omit<CrdtEvent, "timestamp">;

export interface UseContractStateDeps {
createWorker?: () => Worker;
now?: () => number;
/** How often to flush the event batch (ms). @default 16 */
flushIntervalMs?: number;
/** How often to run the staleness watch (ms). @default 5000 */
stalenessIntervalMs?: number;
reconcileTimeoutMs?: number;
/** Called when a chain must be fully re-fetched. */
onReconcile?: (chainId: ChainId) => void;
}

export interface UseContractStateResult {
state: CrdtState;
/** Stamp and enqueue an event from a chain. */
ingest: (chainId: ChainId, event: RawCrdtEvent) => void;
reconciling: ChainId[];
}

export function useContractState(
deps: UseContractStateDeps = {}
): UseContractStateResult {
const state = useCrdtState();

const clocksRef = useRef<Map<ChainId, LamportClock>>(new Map());
const batchRef = useRef<CrdtEvent[]>([]);
const workerRef = useRef<Worker | null>(null);
const batchSeqRef = useRef(0);
const depsRef = useRef(deps);
depsRef.current = deps;

const clockFor = useCallback((chainId: ChainId): LamportClock => {
let clock = clocksRef.current.get(chainId);
if (!clock) {
clock = new LamportClock(chainId);
clocksRef.current.set(chainId, clock);
}
return clock;
}, []);

const ingest = useCallback(
(chainId: ChainId, event: RawCrdtEvent) => {
const timestamp = clockFor(chainId).tick();
batchRef.current.push({ ...event, timestamp } as CrdtEvent);
},
[clockFor]
);

// Flush + merge loop, worker spawn, and staleness watch.
useEffect(() => {
const now = depsRef.current.now ?? Date.now;
const flushMs = depsRef.current.flushIntervalMs ?? 16;
const staleMs = depsRef.current.stalenessIntervalMs ?? 5000;
const reconcileTimeout =
depsRef.current.reconcileTimeoutMs ?? RECONCILE_TIMEOUT_MS;

// Try to run the merge off the main thread.
const createWorker = depsRef.current.createWorker;
if (createWorker) {
try {
const worker = createWorker();
worker.onmessage = (e: MessageEvent<CrdtMergeResponse>) => {
const { diffs, chainSeen } = e.data;
crdtStore.dispatch({
type: "APPLY_DIFFS",
payload: { diffs, chainSeen, at: now() },
});
};
workerRef.current = worker;
} catch {
workerRef.current = null;
}
}

const flush = () => {
if (batchRef.current.length === 0) return;
const events = batchRef.current;
batchRef.current = [];

const worker = workerRef.current;
if (worker) {
const request: CrdtMergeRequest = {
type: "merge",
batchId: ++batchSeqRef.current,
events,
};
worker.postMessage(request);
} else {
// Inline fallback: merge against the current store state.
const snapshot = crdtStore.getState();
const { diffs, chainSeen } = mergeEvents(
snapshot.resources,
snapshot.vectorClocks,
events
);
crdtStore.dispatch({
type: "APPLY_DIFFS",
payload: { diffs, chainSeen, at: now() },
});
}
};

const flushTimer = setInterval(flush, flushMs);
const staleTimer = setInterval(() => {
const flagged = runStalenessWatch(now(), reconcileTimeout);
for (const chainId of flagged) depsRef.current.onReconcile?.(chainId);
}, staleMs);

return () => {
clearInterval(flushTimer);
clearInterval(staleTimer);
const worker = workerRef.current;
if (worker) {
worker.postMessage({ type: "reset" } satisfies CrdtMergeRequest);
worker.terminate();
workerRef.current = null;
}
};
}, []);

return { state, ingest, reconciling: state.reconciling };
}
170 changes: 170 additions & 0 deletions src/store/slices/crdtSlice.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"use client";

import { useSyncExternalStore } from "react";
import { orSetValues } from "@/utils/crdtMerge";
import {
RECONCILE_TIMEOUT_MS,
type ChainId,
type ResourceDiff,
type ResourceState,
type VectorClock,
} from "@/types/crdt";

/**
* Store holding the merged CRDT state, per-resource vector clocks, and the last
* time each chain produced an update. A staleness watcher flags any chain that
* has been silent past the reconcile timeout for a full state re-fetch.
*
* Custom singleton store, matching the codebase pattern.
*/

export interface CrdtState {
resources: Record<string, ResourceState>;
vectorClocks: Record<string, VectorClock>;
/** Wall-clock ms each chain was last seen. */
lastSeen: Partial<Record<ChainId, number>>;
/** Chains currently flagged for a full snapshot re-fetch. */
reconciling: ChainId[];
}

export type CrdtAction =
| {
type: "APPLY_DIFFS";
payload: {
diffs: ResourceDiff[];
chainSeen: Partial<Record<ChainId, number>>;
at: number;
};
}
| { type: "CHAIN_SEEN"; payload: { chainId: ChainId; at: number } }
| { type: "RECONCILE"; payload: { chainId: ChainId } }
| { type: "RECONCILE_DONE"; payload: { chainId: ChainId } }
| { type: "RESET" };

const initialState: CrdtState = {
resources: {},
vectorClocks: {},
lastSeen: {},
reconciling: [],
};

/** Chains whose last update is older than the reconcile timeout. */
export function findStaleChains(
lastSeen: Partial<Record<ChainId, number>>,
now: number,
timeoutMs: number = RECONCILE_TIMEOUT_MS
): ChainId[] {
const stale: ChainId[] = [];
for (const [chainId, seenAt] of Object.entries(lastSeen) as [ChainId, number][]) {
if (now - seenAt > timeoutMs) stale.push(chainId);
}
return stale;
}

type Listener = (state: CrdtState) => void;

class CrdtStore {
private state: CrdtState = initialState;
private listeners = new Set<Listener>();

getState = (): Readonly<CrdtState> => this.state;

subscribe = (listener: Listener): (() => void) => {
this.listeners.add(listener);
return () => this.listeners.delete(listener);
};

dispatch(action: CrdtAction): void {
const next = this.reducer(this.state, action);
if (next !== this.state) {
this.state = next;
this.notify();
}
}

private reducer(state: CrdtState, action: CrdtAction): CrdtState {
switch (action.type) {
case "APPLY_DIFFS": {
const { diffs, chainSeen, at } = action.payload;
if (diffs.length === 0 && Object.keys(chainSeen).length === 0) return state;
const resources = { ...state.resources };
const vectorClocks = { ...state.vectorClocks };
for (const diff of diffs) {
resources[diff.resourceId] = diff.state;
vectorClocks[diff.resourceId] = diff.vectorClock;
}
const lastSeen = { ...state.lastSeen };
for (const chainId of Object.keys(chainSeen) as ChainId[]) {
lastSeen[chainId] = at;
}
// Any chain we just heard from is no longer stale.
const reconciling = state.reconciling.filter(
(c) => !(c in chainSeen)
);
return { ...state, resources, vectorClocks, lastSeen, reconciling };
}
case "CHAIN_SEEN":
return {
...state,
lastSeen: { ...state.lastSeen, [action.payload.chainId]: action.payload.at },
reconciling: state.reconciling.filter((c) => c !== action.payload.chainId),
};
case "RECONCILE":
if (state.reconciling.includes(action.payload.chainId)) return state;
return {
...state,
reconciling: [...state.reconciling, action.payload.chainId],
};
case "RECONCILE_DONE":
return {
...state,
reconciling: state.reconciling.filter((c) => c !== action.payload.chainId),
};
case "RESET":
return initialState;
default:
return state;
}
}

private notify(): void {
for (const listener of this.listeners) listener(this.state);
}
}

/** Shared singleton CRDT store. */
export const crdtStore = new CrdtStore();

/** Flag any stale chains for reconciliation; returns the chains flagged. */
export function runStalenessWatch(
now: number,
timeoutMs: number = RECONCILE_TIMEOUT_MS
): ChainId[] {
const stale = findStaleChains(crdtStore.getState().lastSeen, now, timeoutMs);
for (const chainId of stale) {
crdtStore.dispatch({ type: "RECONCILE", payload: { chainId } });
}
return stale;
}

// --- Selectors / bindings ---------------------------------------------------

/** Materialised value of a resource (scalar for LWW, array for OR-set). */
export function selectResourceValue(
state: CrdtState,
resourceId: string
): unknown {
const resource = state.resources[resourceId];
if (!resource) return undefined;
return resource.kind === "lww"
? resource.value
: orSetValues(resource as Parameters<typeof orSetValues>[0]);
}

export function useCrdtState(): CrdtState {
return useSyncExternalStore(
crdtStore.subscribe,
crdtStore.getState,
crdtStore.getState
);
}
Loading
Loading