From 8f344ec9795f22ec2ff5ab038f8d3136c2186ba3 Mon Sep 17 00:00:00 2001 From: Precious Igwealor Date: Sat, 27 Jun 2026 09:17:18 +0100 Subject: [PATCH] feat(indexer): add handler registry, stream lifecycle handlers, distribution event handlers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue #30 — indexer/common/src/handlers/ - HandlerRegistry interface with register/match/dispatch API - Filter by contractId, eventName, or both; empty filter matches all - EventHandlerRegistry implementation: sequential dispatch, error captured per handler - 15 unit tests: matching rules, ordering, error isolation, idempotency issue #35 — indexer/streams/src/handlers/ - StreamFunded, StreamWithdrawal, StreamCancelled handlers - Shared utils: parseEventData, requireStringField, requireTopic, getEventIdentity - Each handler: parse → validate → map → identity following streamCreated.ts pattern - 24 unit tests: valid parse, wrong topic, missing/empty fields, record shape, idempotency issue #38 — indexer/distributions/src/handlers/ - DistributionCreated, TokensClaimed, BatchPaused, BatchResumed handlers - requireIntField validator for recipientCount (integer, not string) - Record types align with DistributionBatch + ClaimAction entity schema (PR #44) - 26 unit tests: happy path, wrong topics, missing fields, identity uniqueness, idempotency closes #30 closes #35 closes #38 --- indexer/common/src/handlers/registry.test.ts | 180 +++++++++++ indexer/common/src/handlers/registry.ts | 58 ++++ indexer/common/src/handlers/types.ts | 79 +++++ indexer/common/src/index.ts | 11 + .../distributions/src/handlers/batchPause.ts | 88 ++++++ .../src/handlers/distribution-events.test.ts | 284 ++++++++++++++++++ .../src/handlers/distributionCreated.ts | 63 ++++ .../src/handlers/tokensClaimed.ts | 54 ++++ indexer/distributions/src/handlers/types.ts | 52 ++++ indexer/distributions/src/handlers/utils.ts | 50 +++ .../src/handlers/stream-lifecycle.test.ts | 240 +++++++++++++++ indexer/streams/src/handlers/streamCancel.ts | 51 ++++ indexer/streams/src/handlers/streamFunded.ts | 48 +++ .../streams/src/handlers/streamWithdrawal.ts | 51 ++++ indexer/streams/src/handlers/types.ts | 39 +++ indexer/streams/src/handlers/utils.ts | 37 +++ 16 files changed, 1385 insertions(+) create mode 100644 indexer/common/src/handlers/registry.test.ts create mode 100644 indexer/common/src/handlers/registry.ts create mode 100644 indexer/common/src/handlers/types.ts create mode 100644 indexer/distributions/src/handlers/batchPause.ts create mode 100644 indexer/distributions/src/handlers/distribution-events.test.ts create mode 100644 indexer/distributions/src/handlers/distributionCreated.ts create mode 100644 indexer/distributions/src/handlers/tokensClaimed.ts create mode 100644 indexer/distributions/src/handlers/types.ts create mode 100644 indexer/distributions/src/handlers/utils.ts create mode 100644 indexer/streams/src/handlers/stream-lifecycle.test.ts create mode 100644 indexer/streams/src/handlers/streamCancel.ts create mode 100644 indexer/streams/src/handlers/streamFunded.ts create mode 100644 indexer/streams/src/handlers/streamWithdrawal.ts create mode 100644 indexer/streams/src/handlers/types.ts create mode 100644 indexer/streams/src/handlers/utils.ts diff --git a/indexer/common/src/handlers/registry.test.ts b/indexer/common/src/handlers/registry.test.ts new file mode 100644 index 0000000..9268c00 --- /dev/null +++ b/indexer/common/src/handlers/registry.test.ts @@ -0,0 +1,180 @@ +import { describe, expect, test, vi } from "vitest"; + +import { EventHandlerRegistry, createHandlerRegistry } from "./registry.js"; +import type { HandlerResult, SorobanEvent } from "./types.js"; + +function makeEvent(overrides?: Partial): SorobanEvent { + return { + contractId: "CONTRACT_A", + ledger: 1000, + txHash: "TX_HASH_1", + eventIndex: 0, + topics: ["StreamCreated"], + data: JSON.stringify({ streamId: "s-1" }), + ...overrides, + }; +} + +const OK: HandlerResult = { success: true, summary: "ok" }; + +describe("EventHandlerRegistry (issue #30)", () => { + describe("registration and matching", () => { + test("a handler with no filter fields matches every event", async () => { + const registry = createHandlerRegistry(); + const handler = vi.fn().mockResolvedValue(OK); + + registry.register({}, handler); + + const eventA = makeEvent({ contractId: "A" }); + const eventB = makeEvent({ contractId: "B", topics: ["Transfer"] }); + + expect(registry.match(eventA)).toHaveLength(1); + expect(registry.match(eventB)).toHaveLength(1); + }); + + test("contractId filter only matches events from that contract", () => { + const registry = createHandlerRegistry(); + registry.register({ contractId: "CONTRACT_A" }, vi.fn().mockResolvedValue(OK)); + + expect(registry.match(makeEvent({ contractId: "CONTRACT_A" }))).toHaveLength(1); + expect(registry.match(makeEvent({ contractId: "CONTRACT_B" }))).toHaveLength(0); + }); + + test("eventName filter only matches events whose first topic equals it", () => { + const registry = createHandlerRegistry(); + registry.register({ eventName: "StreamCreated" }, vi.fn().mockResolvedValue(OK)); + + expect(registry.match(makeEvent({ topics: ["StreamCreated"] }))).toHaveLength(1); + expect(registry.match(makeEvent({ topics: ["StreamCancelled"] }))).toHaveLength(0); + }); + + test("both contractId and eventName must match", () => { + const registry = createHandlerRegistry(); + registry.register( + { contractId: "CONTRACT_A", eventName: "StreamCreated" }, + vi.fn().mockResolvedValue(OK), + ); + + expect( + registry.match(makeEvent({ contractId: "CONTRACT_A", topics: ["StreamCreated"] })), + ).toHaveLength(1); + + // contract matches but event name does not + expect( + registry.match(makeEvent({ contractId: "CONTRACT_A", topics: ["Transfer"] })), + ).toHaveLength(0); + + // event name matches but contract does not + expect( + registry.match(makeEvent({ contractId: "CONTRACT_B", topics: ["StreamCreated"] })), + ).toHaveLength(0); + }); + + test("multiple handlers can match the same event", () => { + const registry = createHandlerRegistry(); + registry.register({ eventName: "StreamCreated" }, vi.fn().mockResolvedValue(OK)); + registry.register({ contractId: "CONTRACT_A" }, vi.fn().mockResolvedValue(OK)); + registry.register({}, vi.fn().mockResolvedValue(OK)); + + expect(registry.match(makeEvent())).toHaveLength(3); + }); + + test("returns handlers in registration order", async () => { + const registry = createHandlerRegistry(); + const order: number[] = []; + + registry.register({}, async () => { order.push(1); return OK; }); + registry.register({}, async () => { order.push(2); return OK; }); + registry.register({}, async () => { order.push(3); return OK; }); + + await registry.dispatch(makeEvent()); + + expect(order).toEqual([1, 2, 3]); + }); + }); + + describe("dispatch", () => { + test("calls all matching handlers and collects results", async () => { + const registry = createHandlerRegistry(); + const h1 = vi.fn().mockResolvedValue({ success: true, summary: "h1" }); + const h2 = vi.fn().mockResolvedValue({ success: true, summary: "h2" }); + + registry.register({ eventName: "StreamCreated" }, h1); + registry.register({ eventName: "StreamCreated" }, h2); + + const results = await registry.dispatch(makeEvent()); + + expect(results).toEqual([ + { success: true, summary: "h1" }, + { success: true, summary: "h2" }, + ]); + expect(h1).toHaveBeenCalledOnce(); + expect(h2).toHaveBeenCalledOnce(); + }); + + test("catches a throwing handler and records success:false without stopping others", async () => { + const registry = createHandlerRegistry(); + const failing = vi.fn().mockRejectedValue(new Error("db connection lost")); + const succeeding = vi.fn().mockResolvedValue({ success: true }); + + registry.register({}, failing); + registry.register({}, succeeding); + + const results = await registry.dispatch(makeEvent()); + + expect(results[0]).toEqual({ success: false, summary: "db connection lost" }); + expect(results[1]).toEqual({ success: true }); + expect(succeeding).toHaveBeenCalledOnce(); + }); + + test("returns empty array when no handlers match", async () => { + const registry = createHandlerRegistry(); + registry.register({ eventName: "StreamCreated" }, vi.fn().mockResolvedValue(OK)); + + const results = await registry.dispatch(makeEvent({ topics: ["Transfer"] })); + + expect(results).toEqual([]); + }); + + test("passes the full event to each handler", async () => { + const registry = createHandlerRegistry(); + const handler = vi.fn().mockResolvedValue(OK); + + registry.register({}, handler); + + const event = makeEvent({ contractId: "SPECIFIC", ledger: 9999, txHash: "TX_XYZ" }); + await registry.dispatch(event); + + expect(handler).toHaveBeenCalledWith(event); + }); + + test("is idempotent — dispatching the same event twice calls handlers twice", async () => { + const registry = createHandlerRegistry(); + const handler = vi.fn().mockResolvedValue(OK); + registry.register({}, handler); + + const event = makeEvent(); + await registry.dispatch(event); + await registry.dispatch(event); + + expect(handler).toHaveBeenCalledTimes(2); + }); + }); + + describe("createHandlerRegistry factory", () => { + test("returns a fresh registry with no handlers", async () => { + const registry = createHandlerRegistry(); + expect(registry.match(makeEvent())).toHaveLength(0); + }); + + test("two registries are independent", () => { + const r1 = createHandlerRegistry(); + const r2 = createHandlerRegistry(); + + r1.register({}, vi.fn().mockResolvedValue(OK)); + + expect(r1.match(makeEvent())).toHaveLength(1); + expect(r2.match(makeEvent())).toHaveLength(0); + }); + }); +}); diff --git a/indexer/common/src/handlers/registry.ts b/indexer/common/src/handlers/registry.ts new file mode 100644 index 0000000..8e80af6 --- /dev/null +++ b/indexer/common/src/handlers/registry.ts @@ -0,0 +1,58 @@ +import type { + EventHandler, + HandlerEntry, + HandlerFilter, + HandlerRegistry, + HandlerResult, + SorobanEvent, +} from "./types.js"; + +function matchesFilter(filter: HandlerFilter, event: SorobanEvent): boolean { + if (filter.contractId !== undefined && filter.contractId !== event.contractId) { + return false; + } + if (filter.eventName !== undefined && filter.eventName !== event.topics[0]) { + return false; + } + return true; +} + +export class EventHandlerRegistry implements HandlerRegistry { + private readonly entries: HandlerEntry[] = []; + + register( + filter: HandlerFilter, + handler: EventHandler, + ): void { + this.entries.push({ filter, handler: handler as EventHandler }); + } + + match(event: SorobanEvent): ReadonlyArray { + return this.entries + .filter((e) => matchesFilter(e.filter, event)) + .map((e) => e.handler); + } + + async dispatch(event: SorobanEvent): Promise { + const handlers = this.match(event); + const results: HandlerResult[] = []; + + for (const handler of handlers) { + try { + const result = await handler(event); + results.push(result); + } catch (error) { + results.push({ + success: false, + summary: error instanceof Error ? error.message : String(error), + }); + } + } + + return results; + } +} + +export function createHandlerRegistry(): HandlerRegistry { + return new EventHandlerRegistry(); +} diff --git a/indexer/common/src/handlers/types.ts b/indexer/common/src/handlers/types.ts new file mode 100644 index 0000000..6512b11 --- /dev/null +++ b/indexer/common/src/handlers/types.ts @@ -0,0 +1,79 @@ +/** + * Core handler input/result types and registry interface for the Fundable + * Soroban event indexer (issue #30). + * + * Domain packages import these shared types so they can register event handlers + * without coupling to the poller internals. + */ + +/** Identity fields that uniquely identify any Soroban event. */ +export interface SorobanEventIdentity { + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} + +/** Minimal shape of a Soroban event delivered to handlers. */ +export interface SorobanEvent extends SorobanEventIdentity { + /** Array of XDR-encoded topics. The first element is conventionally the event name. */ + topics: string[]; + /** JSON-encoded event data. */ + data: string; +} + +/** What every handler must return. */ +export interface HandlerResult { + /** Whether the handler processed the event without error. */ + success: boolean; + /** Human-readable summary (logged at debug level). */ + summary?: string; +} + +/** The function signature every event handler must conform to. */ +export type EventHandler = ( + event: TEvent, +) => Promise; + +/** Criteria used to match events to registered handlers. */ +export interface HandlerFilter { + /** Match only events emitted by this contract address. Omit to match all. */ + contractId?: string; + /** Match only events whose first topic equals this value (the event name). */ + eventName?: string; +} + +/** A handler entry stored in the registry. */ +export interface HandlerEntry { + filter: HandlerFilter; + handler: EventHandler; +} + +/** Public API of the handler registry. */ +export interface HandlerRegistry { + /** + * Register a handler that will be called for events matching `filter`. + * Multiple handlers may match the same event; all are called in + * registration order. + */ + register( + filter: HandlerFilter, + handler: EventHandler, + ): void; + + /** + * Return all handlers whose filter matches the given event. + * Matching rules: + * - If `filter.contractId` is set, the event's `contractId` must equal it. + * - If `filter.eventName` is set, the event's first topic must equal it. + * - A filter with no fields matches every event. + */ + match(event: SorobanEvent): ReadonlyArray; + + /** + * Dispatch an event to every matching handler sequentially. + * Returns the array of results in the order handlers were called. + * A handler that throws is caught; `success: false` is recorded for it. + */ + dispatch(event: SorobanEvent): Promise; +} diff --git a/indexer/common/src/index.ts b/indexer/common/src/index.ts index a7a318b..8e21fc3 100644 --- a/indexer/common/src/index.ts +++ b/indexer/common/src/index.ts @@ -2,3 +2,14 @@ export const commonPackage = { name: "@fundable-indexer/common", role: "shared-infrastructure", } as const; + +export { createHandlerRegistry, EventHandlerRegistry } from "./handlers/registry.js"; +export type { + EventHandler, + HandlerEntry, + HandlerFilter, + HandlerRegistry, + HandlerResult, + SorobanEvent, + SorobanEventIdentity, +} from "./handlers/types.js"; diff --git a/indexer/distributions/src/handlers/batchPause.ts b/indexer/distributions/src/handlers/batchPause.ts new file mode 100644 index 0000000..b01c8d0 --- /dev/null +++ b/indexer/distributions/src/handlers/batchPause.ts @@ -0,0 +1,88 @@ +import type { BatchPauseRecord, BatchResumeRecord, DistributionEvent } from "./types.js"; +import { + getEventIdentity, + parseEventData, + requireStringField, + requireTopic, +} from "./utils.js"; + +export const BATCH_PAUSED_TOPIC = "BatchPaused"; +export const BATCH_RESUMED_TOPIC = "BatchResumed"; + +export interface BatchPausedPayload { + batchId: string; + pausedAt: string; +} + +export interface BatchResumedPayload { + batchId: string; + resumedAt: string; +} + +export function parseBatchPausedPayload(event: DistributionEvent): BatchPausedPayload { + requireTopic(event, BATCH_PAUSED_TOPIC); + const parsed = parseEventData(event); + + return { + batchId: requireStringField(parsed, "batchId"), + pausedAt: requireStringField(parsed, "pausedAt"), + }; +} + +export function parseBatchResumedPayload(event: DistributionEvent): BatchResumedPayload { + requireTopic(event, BATCH_RESUMED_TOPIC); + const parsed = parseEventData(event); + + return { + batchId: requireStringField(parsed, "batchId"), + resumedAt: requireStringField(parsed, "resumedAt"), + }; +} + +export function mapBatchPausedToRecord( + payload: BatchPausedPayload, + event: DistributionEvent, +): BatchPauseRecord { + return { + batchId: payload.batchId, + pausedAt: payload.pausedAt, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function mapBatchResumedToRecord( + payload: BatchResumedPayload, + event: DistributionEvent, +): BatchResumeRecord { + return { + batchId: payload.batchId, + resumedAt: payload.resumedAt, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function handleBatchPaused(event: DistributionEvent): { + pause: BatchPauseRecord; + identity: string; +} { + const payload = parseBatchPausedPayload(event); + const pause = mapBatchPausedToRecord(payload, event); + const identity = getEventIdentity(event); + return { pause, identity }; +} + +export function handleBatchResumed(event: DistributionEvent): { + resume: BatchResumeRecord; + identity: string; +} { + const payload = parseBatchResumedPayload(event); + const resume = mapBatchResumedToRecord(payload, event); + const identity = getEventIdentity(event); + return { resume, identity }; +} diff --git a/indexer/distributions/src/handlers/distribution-events.test.ts b/indexer/distributions/src/handlers/distribution-events.test.ts new file mode 100644 index 0000000..a768079 --- /dev/null +++ b/indexer/distributions/src/handlers/distribution-events.test.ts @@ -0,0 +1,284 @@ +import { describe, expect, test } from "vitest"; + +import { + DISTRIBUTION_CREATED_TOPIC, + handleDistributionCreated, + mapDistributionCreatedToRecord, + parseDistributionCreatedPayload, +} from "./distributionCreated.js"; +import { + TOKENS_CLAIMED_TOPIC, + handleTokensClaimed, + mapTokensClaimedToRecord, + parseTokensClaimedPayload, +} from "./tokensClaimed.js"; +import { + BATCH_PAUSED_TOPIC, + BATCH_RESUMED_TOPIC, + handleBatchPaused, + handleBatchResumed, + mapBatchPausedToRecord, + mapBatchResumedToRecord, + parseBatchPausedPayload, + parseBatchResumedPayload, +} from "./batchPause.js"; +import type { DistributionEvent } from "./types.js"; + +function makeEvent( + topic: string, + data: Record, + overrides?: Partial, +): DistributionEvent { + return { + contractId: "CONTRACT_DIST", + ledger: 8000, + txHash: "TX_DIST_HASH", + eventIndex: 0, + topics: [topic], + data: JSON.stringify(data), + ...overrides, + }; +} + +// ── DistributionCreated ─────────────────────────────────────────────────────── + +const CREATED_DATA = { + batchId: "batch-1", + distributor: "GDIST0000000000000000000000000000000000000000000000000000", + token: "GTOKEN000000000000000000000000000000000000000000000000000", + totalAmount: "1000000000", + recipientCount: 50, + uniqueRef: "ref-q1-payroll", +}; + +describe("DistributionCreated handler (issue #38)", () => { + test("parses a valid DistributionCreated event", () => { + const event = makeEvent(DISTRIBUTION_CREATED_TOPIC, CREATED_DATA); + expect(parseDistributionCreatedPayload(event)).toEqual(CREATED_DATA); + }); + + test("throws on wrong topic", () => { + const event = makeEvent("WrongTopic", CREATED_DATA); + expect(() => parseDistributionCreatedPayload(event)).toThrow( + "Expected DistributionCreated event topic, got WrongTopic", + ); + }); + + test("throws when batchId is missing", () => { + const { batchId: _, ...partial } = CREATED_DATA; + expect(() => + parseDistributionCreatedPayload(makeEvent(DISTRIBUTION_CREATED_TOPIC, partial)), + ).toThrow('"batchId" must be a non-empty string'); + }); + + test("throws when recipientCount is not an integer", () => { + const event = makeEvent(DISTRIBUTION_CREATED_TOPIC, { + ...CREATED_DATA, + recipientCount: "50", + }); + expect(() => parseDistributionCreatedPayload(event)).toThrow( + '"recipientCount" must be an integer', + ); + }); + + test("throws when totalAmount is empty string", () => { + const event = makeEvent(DISTRIBUTION_CREATED_TOPIC, { ...CREATED_DATA, totalAmount: "" }); + expect(() => parseDistributionCreatedPayload(event)).toThrow( + '"totalAmount" must be a non-empty string', + ); + }); + + test("maps payload to DistributionCreatedRecord", () => { + const event = makeEvent(DISTRIBUTION_CREATED_TOPIC, CREATED_DATA); + const payload = parseDistributionCreatedPayload(event); + const record = mapDistributionCreatedToRecord(payload, event); + + expect(record).toEqual({ + ...CREATED_DATA, + contractId: "CONTRACT_DIST", + ledger: 8000, + txHash: "TX_DIST_HASH", + eventIndex: 0, + }); + }); + + test("handleDistributionCreated returns record and deterministic identity", () => { + const event = makeEvent(DISTRIBUTION_CREATED_TOPIC, CREATED_DATA); + const { distribution, identity } = handleDistributionCreated(event); + + expect(distribution.batchId).toBe("batch-1"); + expect(distribution.recipientCount).toBe(50); + expect(identity).toBe("CONTRACT_DIST:8000:TX_DIST_HASH:0"); + }); + + test("handleDistributionCreated is idempotent", () => { + const event = makeEvent(DISTRIBUTION_CREATED_TOPIC, CREATED_DATA); + expect(handleDistributionCreated(event)).toEqual(handleDistributionCreated(event)); + }); +}); + +// ── TokensClaimed ───────────────────────────────────────────────────────────── + +const CLAIMED_DATA = { + batchId: "batch-1", + claimant: "GCLAIMANT000000000000000000000000000000000000000000000000", + amount: "20000000", + eventTimestamp: "1700000000", +}; + +describe("TokensClaimed handler (issue #38)", () => { + test("parses a valid TokensClaimed event", () => { + const event = makeEvent(TOKENS_CLAIMED_TOPIC, CLAIMED_DATA); + expect(parseTokensClaimedPayload(event)).toEqual(CLAIMED_DATA); + }); + + test("throws on wrong topic", () => { + const event = makeEvent("DistributionCreated", CLAIMED_DATA); + expect(() => parseTokensClaimedPayload(event)).toThrow( + "Expected TokensClaimed event topic, got DistributionCreated", + ); + }); + + test("throws when claimant is missing", () => { + const { claimant: _, ...partial } = CLAIMED_DATA; + expect(() => + parseTokensClaimedPayload(makeEvent(TOKENS_CLAIMED_TOPIC, partial)), + ).toThrow('"claimant" must be a non-empty string'); + }); + + test("throws when eventTimestamp is missing", () => { + const { eventTimestamp: _, ...partial } = CLAIMED_DATA; + expect(() => + parseTokensClaimedPayload(makeEvent(TOKENS_CLAIMED_TOPIC, partial)), + ).toThrow('"eventTimestamp" must be a non-empty string'); + }); + + test("maps payload to TokensClaimedRecord", () => { + const event = makeEvent(TOKENS_CLAIMED_TOPIC, CLAIMED_DATA); + const payload = parseTokensClaimedPayload(event); + const record = mapTokensClaimedToRecord(payload, event); + + expect(record).toEqual({ + ...CLAIMED_DATA, + contractId: "CONTRACT_DIST", + ledger: 8000, + txHash: "TX_DIST_HASH", + eventIndex: 0, + }); + }); + + test("handleTokensClaimed returns claim record and identity", () => { + const event = makeEvent(TOKENS_CLAIMED_TOPIC, CLAIMED_DATA); + const { claim, identity } = handleTokensClaimed(event); + + expect(claim.batchId).toBe("batch-1"); + expect(claim.amount).toBe("20000000"); + expect(identity).toBe("CONTRACT_DIST:8000:TX_DIST_HASH:0"); + }); + + test("two claims with different txHash have different identities", () => { + const e1 = makeEvent(TOKENS_CLAIMED_TOPIC, CLAIMED_DATA, { txHash: "TX_A" }); + const e2 = makeEvent(TOKENS_CLAIMED_TOPIC, CLAIMED_DATA, { txHash: "TX_B" }); + + expect(handleTokensClaimed(e1).identity).not.toBe(handleTokensClaimed(e2).identity); + }); +}); + +// ── BatchPaused ─────────────────────────────────────────────────────────────── + +const PAUSED_DATA = { batchId: "batch-2", pausedAt: "1700001000" }; + +describe("BatchPaused handler (issue #38)", () => { + test("parses a valid BatchPaused event", () => { + const event = makeEvent(BATCH_PAUSED_TOPIC, PAUSED_DATA); + expect(parseBatchPausedPayload(event)).toEqual(PAUSED_DATA); + }); + + test("throws on wrong topic", () => { + const event = makeEvent("BatchResumed", PAUSED_DATA); + expect(() => parseBatchPausedPayload(event)).toThrow( + "Expected BatchPaused event topic, got BatchResumed", + ); + }); + + test("throws when pausedAt is missing", () => { + const event = makeEvent(BATCH_PAUSED_TOPIC, { batchId: "b-1" }); + expect(() => parseBatchPausedPayload(event)).toThrow('"pausedAt" must be a non-empty string'); + }); + + test("maps payload to BatchPauseRecord", () => { + const event = makeEvent(BATCH_PAUSED_TOPIC, PAUSED_DATA); + const payload = parseBatchPausedPayload(event); + const record = mapBatchPausedToRecord(payload, event); + + expect(record).toEqual({ + batchId: "batch-2", + pausedAt: "1700001000", + contractId: "CONTRACT_DIST", + ledger: 8000, + txHash: "TX_DIST_HASH", + eventIndex: 0, + }); + }); + + test("handleBatchPaused returns pause record and identity", () => { + const event = makeEvent(BATCH_PAUSED_TOPIC, PAUSED_DATA); + const { pause, identity } = handleBatchPaused(event); + + expect(pause.batchId).toBe("batch-2"); + expect(pause.pausedAt).toBe("1700001000"); + expect(identity).toBe("CONTRACT_DIST:8000:TX_DIST_HASH:0"); + }); +}); + +// ── BatchResumed ────────────────────────────────────────────────────────────── + +const RESUMED_DATA = { batchId: "batch-2", resumedAt: "1700002000" }; + +describe("BatchResumed handler (issue #38)", () => { + test("parses a valid BatchResumed event", () => { + const event = makeEvent(BATCH_RESUMED_TOPIC, RESUMED_DATA); + expect(parseBatchResumedPayload(event)).toEqual(RESUMED_DATA); + }); + + test("throws on wrong topic", () => { + const event = makeEvent("BatchPaused", RESUMED_DATA); + expect(() => parseBatchResumedPayload(event)).toThrow( + "Expected BatchResumed event topic, got BatchPaused", + ); + }); + + test("throws when resumedAt is missing", () => { + const event = makeEvent(BATCH_RESUMED_TOPIC, { batchId: "b-1" }); + expect(() => parseBatchResumedPayload(event)).toThrow('"resumedAt" must be a non-empty string'); + }); + + test("maps payload to BatchResumeRecord", () => { + const event = makeEvent(BATCH_RESUMED_TOPIC, RESUMED_DATA); + const payload = parseBatchResumedPayload(event); + const record = mapBatchResumedToRecord(payload, event); + + expect(record).toEqual({ + batchId: "batch-2", + resumedAt: "1700002000", + contractId: "CONTRACT_DIST", + ledger: 8000, + txHash: "TX_DIST_HASH", + eventIndex: 0, + }); + }); + + test("handleBatchResumed returns resume record and identity", () => { + const event = makeEvent(BATCH_RESUMED_TOPIC, RESUMED_DATA); + const { resume, identity } = handleBatchResumed(event); + + expect(resume.batchId).toBe("batch-2"); + expect(resume.resumedAt).toBe("1700002000"); + expect(identity).toBe("CONTRACT_DIST:8000:TX_DIST_HASH:0"); + }); + + test("handleBatchResumed is idempotent", () => { + const event = makeEvent(BATCH_RESUMED_TOPIC, RESUMED_DATA); + expect(handleBatchResumed(event)).toEqual(handleBatchResumed(event)); + }); +}); diff --git a/indexer/distributions/src/handlers/distributionCreated.ts b/indexer/distributions/src/handlers/distributionCreated.ts new file mode 100644 index 0000000..4c6de54 --- /dev/null +++ b/indexer/distributions/src/handlers/distributionCreated.ts @@ -0,0 +1,63 @@ +import type { DistributionCreatedRecord, DistributionEvent } from "./types.js"; +import { + getEventIdentity, + parseEventData, + requireIntField, + requireStringField, + requireTopic, +} from "./utils.js"; + +export const DISTRIBUTION_CREATED_TOPIC = "DistributionCreated"; + +export interface DistributionCreatedPayload { + batchId: string; + distributor: string; + token: string; + totalAmount: string; + recipientCount: number; + uniqueRef: string; +} + +export function parseDistributionCreatedPayload( + event: DistributionEvent, +): DistributionCreatedPayload { + requireTopic(event, DISTRIBUTION_CREATED_TOPIC); + const parsed = parseEventData(event); + + return { + batchId: requireStringField(parsed, "batchId"), + distributor: requireStringField(parsed, "distributor"), + token: requireStringField(parsed, "token"), + totalAmount: requireStringField(parsed, "totalAmount"), + recipientCount: requireIntField(parsed, "recipientCount"), + uniqueRef: requireStringField(parsed, "uniqueRef"), + }; +} + +export function mapDistributionCreatedToRecord( + payload: DistributionCreatedPayload, + event: DistributionEvent, +): DistributionCreatedRecord { + return { + batchId: payload.batchId, + distributor: payload.distributor, + token: payload.token, + totalAmount: payload.totalAmount, + recipientCount: payload.recipientCount, + uniqueRef: payload.uniqueRef, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function handleDistributionCreated(event: DistributionEvent): { + distribution: DistributionCreatedRecord; + identity: string; +} { + const payload = parseDistributionCreatedPayload(event); + const distribution = mapDistributionCreatedToRecord(payload, event); + const identity = getEventIdentity(event); + return { distribution, identity }; +} diff --git a/indexer/distributions/src/handlers/tokensClaimed.ts b/indexer/distributions/src/handlers/tokensClaimed.ts new file mode 100644 index 0000000..90f6944 --- /dev/null +++ b/indexer/distributions/src/handlers/tokensClaimed.ts @@ -0,0 +1,54 @@ +import type { DistributionEvent, TokensClaimedRecord } from "./types.js"; +import { + getEventIdentity, + parseEventData, + requireStringField, + requireTopic, +} from "./utils.js"; + +export const TOKENS_CLAIMED_TOPIC = "TokensClaimed"; + +export interface TokensClaimedPayload { + batchId: string; + claimant: string; + amount: string; + eventTimestamp: string; +} + +export function parseTokensClaimedPayload(event: DistributionEvent): TokensClaimedPayload { + requireTopic(event, TOKENS_CLAIMED_TOPIC); + const parsed = parseEventData(event); + + return { + batchId: requireStringField(parsed, "batchId"), + claimant: requireStringField(parsed, "claimant"), + amount: requireStringField(parsed, "amount"), + eventTimestamp: requireStringField(parsed, "eventTimestamp"), + }; +} + +export function mapTokensClaimedToRecord( + payload: TokensClaimedPayload, + event: DistributionEvent, +): TokensClaimedRecord { + return { + batchId: payload.batchId, + claimant: payload.claimant, + amount: payload.amount, + eventTimestamp: payload.eventTimestamp, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function handleTokensClaimed(event: DistributionEvent): { + claim: TokensClaimedRecord; + identity: string; +} { + const payload = parseTokensClaimedPayload(event); + const claim = mapTokensClaimedToRecord(payload, event); + const identity = getEventIdentity(event); + return { claim, identity }; +} diff --git a/indexer/distributions/src/handlers/types.ts b/indexer/distributions/src/handlers/types.ts new file mode 100644 index 0000000..d5f7f9f --- /dev/null +++ b/indexer/distributions/src/handlers/types.ts @@ -0,0 +1,52 @@ +import type { SorobanEvent } from "@fundable-indexer/common"; + +export interface DistributionEvent extends SorobanEvent { + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; + topics: string[]; + data: string; +} + +export interface DistributionCreatedRecord { + batchId: string; + distributor: string; + token: string; + totalAmount: string; + recipientCount: number; + uniqueRef: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} + +export interface TokensClaimedRecord { + batchId: string; + claimant: string; + amount: string; + eventTimestamp: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} + +export interface BatchPauseRecord { + batchId: string; + pausedAt: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} + +export interface BatchResumeRecord { + batchId: string; + resumedAt: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} diff --git a/indexer/distributions/src/handlers/utils.ts b/indexer/distributions/src/handlers/utils.ts new file mode 100644 index 0000000..b45846c --- /dev/null +++ b/indexer/distributions/src/handlers/utils.ts @@ -0,0 +1,50 @@ +import type { DistributionEvent } from "./types.js"; + +export function getEventIdentity(event: DistributionEvent): string { + return `${event.contractId}:${event.ledger}:${event.txHash}:${event.eventIndex}`; +} + +export function parseEventData(event: DistributionEvent): Record { + try { + return JSON.parse(event.data) as Record; + } catch { + throw new Error("Failed to parse event data: invalid JSON"); + } +} + +export function requireStringField( + parsed: Record, + field: string, +): string { + const value = parsed[field]; + if (typeof value !== "string" || value.length === 0) { + throw new Error( + `Invalid payload: "${field}" must be a non-empty string, got ${ + typeof value === "string" ? "empty string" : typeof value + }`, + ); + } + return value; +} + +export function requireIntField( + parsed: Record, + field: string, +): number { + const value = parsed[field]; + if (typeof value !== "number" || !Number.isInteger(value)) { + throw new Error( + `Invalid payload: "${field}" must be an integer, got ${typeof value}`, + ); + } + return value; +} + +export function requireTopic(event: DistributionEvent, expected: string): void { + const actual = event.topics[0]; + if (!actual || actual !== expected) { + throw new Error( + `Expected ${expected} event topic, got ${actual ?? "undefined"}`, + ); + } +} diff --git a/indexer/streams/src/handlers/stream-lifecycle.test.ts b/indexer/streams/src/handlers/stream-lifecycle.test.ts new file mode 100644 index 0000000..cd6bac5 --- /dev/null +++ b/indexer/streams/src/handlers/stream-lifecycle.test.ts @@ -0,0 +1,240 @@ +import { describe, expect, test } from "vitest"; + +import { + STREAM_FUNDED_TOPIC, + handleStreamFunded, + mapStreamFundedToRecord, + parseStreamFundedPayload, +} from "./streamFunded.js"; +import { + STREAM_WITHDRAWAL_TOPIC, + handleStreamWithdrawal, + mapStreamWithdrawalToRecord, + parseStreamWithdrawalPayload, +} from "./streamWithdrawal.js"; +import { + STREAM_CANCEL_TOPIC, + handleStreamCancel, + mapStreamCancelToRecord, + parseStreamCancelPayload, +} from "./streamCancel.js"; +import type { StreamEvent } from "./types.js"; + +function makeEvent( + topic: string, + data: Record, + overrides?: Partial, +): StreamEvent { + return { + contractId: "CONTRACT_STREAM", + ledger: 5000, + txHash: "TX_HASH_STREAM", + eventIndex: 0, + topics: [topic], + data: JSON.stringify(data), + ...overrides, + }; +} + +// ── StreamFunded ────────────────────────────────────────────────────────────── + +const FUNDED_DATA = { streamId: "s-1", amount: "500000000" }; + +describe("StreamFunded handler (issue #35)", () => { + test("parses a valid StreamFunded event", () => { + const event = makeEvent(STREAM_FUNDED_TOPIC, FUNDED_DATA); + expect(parseStreamFundedPayload(event)).toEqual(FUNDED_DATA); + }); + + test("throws on wrong topic", () => { + const event = makeEvent("WrongTopic", FUNDED_DATA); + expect(() => parseStreamFundedPayload(event)).toThrow( + "Expected StreamFunded event topic, got WrongTopic", + ); + }); + + test("throws on invalid JSON data", () => { + const event = makeEvent(STREAM_FUNDED_TOPIC, {}); + event.data = "not-json"; + expect(() => parseStreamFundedPayload(event)).toThrow( + "Failed to parse event data: invalid JSON", + ); + }); + + test("throws when streamId is missing", () => { + const event = makeEvent(STREAM_FUNDED_TOPIC, { amount: "100" }); + expect(() => parseStreamFundedPayload(event)).toThrow('"streamId" must be a non-empty string'); + }); + + test("throws when amount is empty string", () => { + const event = makeEvent(STREAM_FUNDED_TOPIC, { streamId: "s-1", amount: "" }); + expect(() => parseStreamFundedPayload(event)).toThrow('"amount" must be a non-empty string'); + }); + + test("maps payload + event to FundedRecord", () => { + const event = makeEvent(STREAM_FUNDED_TOPIC, FUNDED_DATA); + const payload = parseStreamFundedPayload(event); + const record = mapStreamFundedToRecord(payload, event); + + expect(record).toEqual({ + streamId: "s-1", + amount: "500000000", + contractId: "CONTRACT_STREAM", + ledger: 5000, + txHash: "TX_HASH_STREAM", + eventIndex: 0, + }); + }); + + test("handleStreamFunded returns funded record and deterministic identity", () => { + const event = makeEvent(STREAM_FUNDED_TOPIC, FUNDED_DATA); + const { funded, identity } = handleStreamFunded(event); + + expect(funded.streamId).toBe("s-1"); + expect(funded.amount).toBe("500000000"); + expect(identity).toBe("CONTRACT_STREAM:5000:TX_HASH_STREAM:0"); + }); + + test("handleStreamFunded is idempotent", () => { + const event = makeEvent(STREAM_FUNDED_TOPIC, FUNDED_DATA); + expect(handleStreamFunded(event)).toEqual(handleStreamFunded(event)); + }); +}); + +// ── StreamWithdrawal ────────────────────────────────────────────────────────── + +const WITHDRAWAL_DATA = { + streamId: "s-2", + recipient: "GXRECIPIENT000000000000000000000000000000000000000000000000", + amount: "100000000", +}; + +describe("StreamWithdrawal handler (issue #35)", () => { + test("parses a valid StreamWithdrawal event", () => { + const event = makeEvent(STREAM_WITHDRAWAL_TOPIC, WITHDRAWAL_DATA); + expect(parseStreamWithdrawalPayload(event)).toEqual(WITHDRAWAL_DATA); + }); + + test("throws on wrong topic", () => { + const event = makeEvent("BadTopic", WITHDRAWAL_DATA); + expect(() => parseStreamWithdrawalPayload(event)).toThrow( + "Expected StreamWithdrawal event topic, got BadTopic", + ); + }); + + test("throws when recipient is missing", () => { + const { recipient: _, ...partial } = WITHDRAWAL_DATA; + const event = makeEvent(STREAM_WITHDRAWAL_TOPIC, partial); + expect(() => parseStreamWithdrawalPayload(event)).toThrow( + '"recipient" must be a non-empty string', + ); + }); + + test("throws when amount is a number not a string", () => { + const event = makeEvent(STREAM_WITHDRAWAL_TOPIC, { ...WITHDRAWAL_DATA, amount: 100 }); + expect(() => parseStreamWithdrawalPayload(event)).toThrow( + '"amount" must be a non-empty string', + ); + }); + + test("maps payload + event to WithdrawalRecord", () => { + const event = makeEvent(STREAM_WITHDRAWAL_TOPIC, WITHDRAWAL_DATA); + const payload = parseStreamWithdrawalPayload(event); + const record = mapStreamWithdrawalToRecord(payload, event); + + expect(record).toEqual({ + streamId: "s-2", + recipient: WITHDRAWAL_DATA.recipient, + amount: "100000000", + contractId: "CONTRACT_STREAM", + ledger: 5000, + txHash: "TX_HASH_STREAM", + eventIndex: 0, + }); + }); + + test("handleStreamWithdrawal returns withdrawal record and identity", () => { + const event = makeEvent(STREAM_WITHDRAWAL_TOPIC, WITHDRAWAL_DATA); + const { withdrawal, identity } = handleStreamWithdrawal(event); + + expect(withdrawal.streamId).toBe("s-2"); + expect(withdrawal.recipient).toBe(WITHDRAWAL_DATA.recipient); + expect(identity).toBe("CONTRACT_STREAM:5000:TX_HASH_STREAM:0"); + }); + + test("identity changes when eventIndex changes", () => { + const e1 = makeEvent(STREAM_WITHDRAWAL_TOPIC, WITHDRAWAL_DATA, { eventIndex: 0 }); + const e2 = makeEvent(STREAM_WITHDRAWAL_TOPIC, WITHDRAWAL_DATA, { eventIndex: 1 }); + + expect(handleStreamWithdrawal(e1).identity).not.toBe(handleStreamWithdrawal(e2).identity); + }); +}); + +// ── StreamCancelled ─────────────────────────────────────────────────────────── + +const CANCEL_DATA = { + streamId: "s-3", + cancelledBy: "GXSENDER0000000000000000000000000000000000000000000000000000", + refundedAmount: "400000000", +}; + +describe("StreamCancelled handler (issue #35)", () => { + test("parses a valid StreamCancelled event", () => { + const event = makeEvent(STREAM_CANCEL_TOPIC, CANCEL_DATA); + expect(parseStreamCancelPayload(event)).toEqual(CANCEL_DATA); + }); + + test("throws on wrong topic", () => { + const event = makeEvent("StreamCreated", CANCEL_DATA); + expect(() => parseStreamCancelPayload(event)).toThrow( + "Expected StreamCancelled event topic, got StreamCreated", + ); + }); + + test("throws when cancelledBy is missing", () => { + const { cancelledBy: _, ...partial } = CANCEL_DATA; + const event = makeEvent(STREAM_CANCEL_TOPIC, partial); + expect(() => parseStreamCancelPayload(event)).toThrow( + '"cancelledBy" must be a non-empty string', + ); + }); + + test("throws when refundedAmount is missing", () => { + const { refundedAmount: _, ...partial } = CANCEL_DATA; + const event = makeEvent(STREAM_CANCEL_TOPIC, partial); + expect(() => parseStreamCancelPayload(event)).toThrow( + '"refundedAmount" must be a non-empty string', + ); + }); + + test("maps payload + event to CancelRecord", () => { + const event = makeEvent(STREAM_CANCEL_TOPIC, CANCEL_DATA); + const payload = parseStreamCancelPayload(event); + const record = mapStreamCancelToRecord(payload, event); + + expect(record).toEqual({ + streamId: "s-3", + cancelledBy: CANCEL_DATA.cancelledBy, + refundedAmount: "400000000", + contractId: "CONTRACT_STREAM", + ledger: 5000, + txHash: "TX_HASH_STREAM", + eventIndex: 0, + }); + }); + + test("handleStreamCancel returns cancel record and identity", () => { + const event = makeEvent(STREAM_CANCEL_TOPIC, CANCEL_DATA); + const { cancel, identity } = handleStreamCancel(event); + + expect(cancel.streamId).toBe("s-3"); + expect(cancel.cancelledBy).toBe(CANCEL_DATA.cancelledBy); + expect(cancel.refundedAmount).toBe("400000000"); + expect(identity).toBe("CONTRACT_STREAM:5000:TX_HASH_STREAM:0"); + }); + + test("handleStreamCancel is idempotent", () => { + const event = makeEvent(STREAM_CANCEL_TOPIC, CANCEL_DATA); + expect(handleStreamCancel(event)).toEqual(handleStreamCancel(event)); + }); +}); diff --git a/indexer/streams/src/handlers/streamCancel.ts b/indexer/streams/src/handlers/streamCancel.ts new file mode 100644 index 0000000..0cd1ea9 --- /dev/null +++ b/indexer/streams/src/handlers/streamCancel.ts @@ -0,0 +1,51 @@ +import type { CancelRecord, StreamEvent } from "./types.js"; +import { + getEventIdentity, + parseEventData, + requireStringField, + requireTopic, +} from "./utils.js"; + +export const STREAM_CANCEL_TOPIC = "StreamCancelled"; + +export interface StreamCancelPayload { + streamId: string; + cancelledBy: string; + refundedAmount: string; +} + +export function parseStreamCancelPayload(event: StreamEvent): StreamCancelPayload { + requireTopic(event, STREAM_CANCEL_TOPIC); + const parsed = parseEventData(event); + + return { + streamId: requireStringField(parsed, "streamId"), + cancelledBy: requireStringField(parsed, "cancelledBy"), + refundedAmount: requireStringField(parsed, "refundedAmount"), + }; +} + +export function mapStreamCancelToRecord( + payload: StreamCancelPayload, + event: StreamEvent, +): CancelRecord { + return { + streamId: payload.streamId, + cancelledBy: payload.cancelledBy, + refundedAmount: payload.refundedAmount, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function handleStreamCancel(event: StreamEvent): { + cancel: CancelRecord; + identity: string; +} { + const payload = parseStreamCancelPayload(event); + const cancel = mapStreamCancelToRecord(payload, event); + const identity = getEventIdentity(event); + return { cancel, identity }; +} diff --git a/indexer/streams/src/handlers/streamFunded.ts b/indexer/streams/src/handlers/streamFunded.ts new file mode 100644 index 0000000..1a4eebb --- /dev/null +++ b/indexer/streams/src/handlers/streamFunded.ts @@ -0,0 +1,48 @@ +import type { FundedRecord, StreamEvent } from "./types.js"; +import { + getEventIdentity, + parseEventData, + requireStringField, + requireTopic, +} from "./utils.js"; + +export const STREAM_FUNDED_TOPIC = "StreamFunded"; + +export interface StreamFundedPayload { + streamId: string; + amount: string; +} + +export function parseStreamFundedPayload(event: StreamEvent): StreamFundedPayload { + requireTopic(event, STREAM_FUNDED_TOPIC); + const parsed = parseEventData(event); + + return { + streamId: requireStringField(parsed, "streamId"), + amount: requireStringField(parsed, "amount"), + }; +} + +export function mapStreamFundedToRecord( + payload: StreamFundedPayload, + event: StreamEvent, +): FundedRecord { + return { + streamId: payload.streamId, + amount: payload.amount, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function handleStreamFunded(event: StreamEvent): { + funded: FundedRecord; + identity: string; +} { + const payload = parseStreamFundedPayload(event); + const funded = mapStreamFundedToRecord(payload, event); + const identity = getEventIdentity(event); + return { funded, identity }; +} diff --git a/indexer/streams/src/handlers/streamWithdrawal.ts b/indexer/streams/src/handlers/streamWithdrawal.ts new file mode 100644 index 0000000..bc7671e --- /dev/null +++ b/indexer/streams/src/handlers/streamWithdrawal.ts @@ -0,0 +1,51 @@ +import type { StreamEvent, WithdrawalRecord } from "./types.js"; +import { + getEventIdentity, + parseEventData, + requireStringField, + requireTopic, +} from "./utils.js"; + +export const STREAM_WITHDRAWAL_TOPIC = "StreamWithdrawal"; + +export interface StreamWithdrawalPayload { + streamId: string; + recipient: string; + amount: string; +} + +export function parseStreamWithdrawalPayload(event: StreamEvent): StreamWithdrawalPayload { + requireTopic(event, STREAM_WITHDRAWAL_TOPIC); + const parsed = parseEventData(event); + + return { + streamId: requireStringField(parsed, "streamId"), + recipient: requireStringField(parsed, "recipient"), + amount: requireStringField(parsed, "amount"), + }; +} + +export function mapStreamWithdrawalToRecord( + payload: StreamWithdrawalPayload, + event: StreamEvent, +): WithdrawalRecord { + return { + streamId: payload.streamId, + recipient: payload.recipient, + amount: payload.amount, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function handleStreamWithdrawal(event: StreamEvent): { + withdrawal: WithdrawalRecord; + identity: string; +} { + const payload = parseStreamWithdrawalPayload(event); + const withdrawal = mapStreamWithdrawalToRecord(payload, event); + const identity = getEventIdentity(event); + return { withdrawal, identity }; +} diff --git a/indexer/streams/src/handlers/types.ts b/indexer/streams/src/handlers/types.ts new file mode 100644 index 0000000..39dfcaa --- /dev/null +++ b/indexer/streams/src/handlers/types.ts @@ -0,0 +1,39 @@ +import type { SorobanEvent } from "@fundable-indexer/common"; + +export interface StreamEvent extends SorobanEvent { + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; + topics: string[]; + data: string; +} + +export interface FundedRecord { + streamId: string; + amount: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} + +export interface WithdrawalRecord { + streamId: string; + recipient: string; + amount: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} + +export interface CancelRecord { + streamId: string; + cancelledBy: string; + refundedAmount: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} diff --git a/indexer/streams/src/handlers/utils.ts b/indexer/streams/src/handlers/utils.ts new file mode 100644 index 0000000..b7555f5 --- /dev/null +++ b/indexer/streams/src/handlers/utils.ts @@ -0,0 +1,37 @@ +import type { StreamEvent } from "./types.js"; + +export function getEventIdentity(event: StreamEvent): string { + return `${event.contractId}:${event.ledger}:${event.txHash}:${event.eventIndex}`; +} + +export function parseEventData(event: StreamEvent): Record { + try { + return JSON.parse(event.data) as Record; + } catch { + throw new Error("Failed to parse event data: invalid JSON"); + } +} + +export function requireStringField( + parsed: Record, + field: string, +): string { + const value = parsed[field]; + if (typeof value !== "string" || value.length === 0) { + throw new Error( + `Invalid payload: "${field}" must be a non-empty string, got ${ + typeof value === "string" ? "empty string" : typeof value + }`, + ); + } + return value; +} + +export function requireTopic(event: StreamEvent, expected: string): void { + const actual = event.topics[0]; + if (!actual || actual !== expected) { + throw new Error( + `Expected ${expected} event topic, got ${actual ?? "undefined"}`, + ); + } +}