From 6e5b3c82199b6c66cf342040ede920b1a295eaeb Mon Sep 17 00:00:00 2001 From: d3vobed Date: Fri, 26 Jun 2026 02:14:49 +0100 Subject: [PATCH 1/2] feat: implement StreamCreated handler with tests - Add StreamCreated event handler for parsing Soroban events - Map event payload to Stream record with event identity tracking - Add deterministic event identity for idempotency checks - Add mocked payload tests covering: - Payload parsing and validation - Event identity derivation - Record mapping - Handler idempotency - Multiple stream ID handling --- .../src/handlers/streamCreated.test.ts | 142 ++++++++++++++++++ indexer/streams/src/handlers/streamCreated.ts | 65 ++++++++ indexer/streams/src/handlers/types.ts | 21 +++ indexer/streams/src/index.ts | 7 + 4 files changed, 235 insertions(+) create mode 100644 indexer/streams/src/handlers/streamCreated.test.ts create mode 100644 indexer/streams/src/handlers/streamCreated.ts create mode 100644 indexer/streams/src/handlers/types.ts diff --git a/indexer/streams/src/handlers/streamCreated.test.ts b/indexer/streams/src/handlers/streamCreated.test.ts new file mode 100644 index 0000000..3e1dedc --- /dev/null +++ b/indexer/streams/src/handlers/streamCreated.test.ts @@ -0,0 +1,142 @@ +import { describe, expect, test } from "vitest"; + +import { + STREAM_CREATED_TOPIC, + getEventIdentity, + handleStreamCreated, + mapStreamCreatedToRecord, + parseStreamCreatedPayload, +} from "./streamCreated.js"; +import type { StreamCreatedEvent } from "./types.js"; + +function createMockEvent(overrides?: Partial): StreamCreatedEvent { + return { + contractId: "0x123", + ledger: 12345, + txHash: "0xabc", + eventIndex: 0, + topics: [STREAM_CREATED_TOPIC], + data: JSON.stringify({ + streamId: "stream-1", + sender: "0xsender", + recipient: "0xrecipient", + amount: "1000000000", + startTime: "1000000", + endTime: "2000000", + }), + ...overrides, + }; +} + +const mockPayload = { + streamId: "stream-1", + sender: "0xsender", + recipient: "0xrecipient", + amount: "1000000000", + startTime: "1000000", + endTime: "2000000", +}; + +describe("parseStreamCreatedPayload", () => { + test("parses a valid StreamCreated event", () => { + const event = createMockEvent(); + const payload = parseStreamCreatedPayload(event); + + expect(payload).toEqual(mockPayload); + }); + + test("throws when event topic does not match", () => { + const event = createMockEvent({ topics: ["WrongTopic"] }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + "Expected StreamCreated event topic, got WrongTopic", + ); + }); + + test("throws when topics array is empty", () => { + const event = createMockEvent({ topics: [] }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + "Expected StreamCreated event topic, got undefined", + ); + }); +}); + +describe("getEventIdentity", () => { + test("produces a deterministic identity string", () => { + const event = createMockEvent(); + const identity = getEventIdentity(event); + + expect(identity).toBe("0x123:12345:0xabc:0"); + }); + + test("changes when any identity field changes", () => { + const base = createMockEvent(); + const differentContract = createMockEvent({ contractId: "0x456" }); + const differentLedger = createMockEvent({ ledger: 99999 }); + const differentTxHash = createMockEvent({ txHash: "0xdef" }); + const differentIndex = createMockEvent({ eventIndex: 1 }); + + const baseId = getEventIdentity(base); + expect(getEventIdentity(differentContract)).not.toBe(baseId); + expect(getEventIdentity(differentLedger)).not.toBe(baseId); + expect(getEventIdentity(differentTxHash)).not.toBe(baseId); + expect(getEventIdentity(differentIndex)).not.toBe(baseId); + }); +}); + +describe("mapStreamCreatedToRecord", () => { + test("maps payload and event to a Stream record", () => { + const event = createMockEvent(); + const record = mapStreamCreatedToRecord(mockPayload, event); + + expect(record).toEqual({ + id: "stream-1", + sender: "0xsender", + recipient: "0xrecipient", + amount: "1000000000", + startTime: "1000000", + endTime: "2000000", + contractId: "0x123", + ledger: 12345, + txHash: "0xabc", + eventIndex: 0, + }); + }); +}); + +describe("handleStreamCreated", () => { + test("returns stream record and identity", () => { + const event = createMockEvent(); + const result = handleStreamCreated(event); + + expect(result.stream.id).toBe("stream-1"); + expect(result.stream.contractId).toBe("0x123"); + expect(result.stream.ledger).toBe(12345); + expect(result.identity).toBe("0x123:12345:0xabc:0"); + }); + + test("is idempotent (same input produces same output)", () => { + const event = createMockEvent(); + const result1 = handleStreamCreated(event); + const result2 = handleStreamCreated(event); + + expect(result1).toEqual(result2); + }); + + test("handles different stream IDs correctly", () => { + const event1 = createMockEvent({ + data: JSON.stringify({ ...mockPayload, streamId: "stream-1" }), + }); + const event2 = createMockEvent({ + data: JSON.stringify({ ...mockPayload, streamId: "stream-2" }), + }); + + const result1 = handleStreamCreated(event1); + const result2 = handleStreamCreated(event2); + + expect(result1.stream.id).toBe("stream-1"); + expect(result2.stream.id).toBe("stream-2"); + expect(result1.stream.id).not.toBe(result2.stream.id); + }); +}); diff --git a/indexer/streams/src/handlers/streamCreated.ts b/indexer/streams/src/handlers/streamCreated.ts new file mode 100644 index 0000000..48af9aa --- /dev/null +++ b/indexer/streams/src/handlers/streamCreated.ts @@ -0,0 +1,65 @@ +import type { StreamCreatedEvent, StreamRecord } from "./types.js"; + +export const STREAM_CREATED_TOPIC = "StreamCreated"; + +export interface StreamCreatedPayload { + streamId: string; + sender: string; + recipient: string; + amount: string; + startTime: string; + endTime: string; +} + +export function parseStreamCreatedPayload(event: StreamCreatedEvent): StreamCreatedPayload { + const eventName = event.topics[0]; + if (!eventName || eventName !== STREAM_CREATED_TOPIC) { + throw new Error( + `Expected ${STREAM_CREATED_TOPIC} event topic, got ${eventName ?? "undefined"}`, + ); + } + + const parsed = JSON.parse(event.data) as StreamCreatedPayload; + + return { + streamId: parsed.streamId, + sender: parsed.sender, + recipient: parsed.recipient, + amount: parsed.amount, + startTime: parsed.startTime, + endTime: parsed.endTime, + }; +} + +export function getEventIdentity(event: StreamCreatedEvent): string { + return `${event.contractId}:${event.ledger}:${event.txHash}:${event.eventIndex}`; +} + +export function mapStreamCreatedToRecord( + payload: StreamCreatedPayload, + event: StreamCreatedEvent, +): StreamRecord { + return { + id: payload.streamId, + sender: payload.sender, + recipient: payload.recipient, + amount: payload.amount, + startTime: payload.startTime, + endTime: payload.endTime, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function handleStreamCreated(event: StreamCreatedEvent): { + stream: StreamRecord; + identity: string; +} { + const payload = parseStreamCreatedPayload(event); + const stream = mapStreamCreatedToRecord(payload, event); + const identity = getEventIdentity(event); + + return { stream, identity }; +} diff --git a/indexer/streams/src/handlers/types.ts b/indexer/streams/src/handlers/types.ts new file mode 100644 index 0000000..8eddafb --- /dev/null +++ b/indexer/streams/src/handlers/types.ts @@ -0,0 +1,21 @@ +export interface StreamCreatedEvent { + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; + topics: string[]; + data: string; +} + +export interface StreamRecord { + id: string; + sender: string; + recipient: string; + amount: string; + startTime: string; + endTime: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} diff --git a/indexer/streams/src/index.ts b/indexer/streams/src/index.ts index a52945c..961cb7f 100644 --- a/indexer/streams/src/index.ts +++ b/indexer/streams/src/index.ts @@ -5,3 +5,10 @@ export const streamsPackage = { role: "payment-stream-indexer", common: commonPackage.name, } as const; + +export { + handleStreamCreated, + parseStreamCreatedPayload, + STREAM_CREATED_TOPIC, +} from "./handlers/streamCreated.js"; +export type { StreamCreatedEvent, StreamRecord } from "./handlers/types.js"; From 23ce54e45b5afeb8178a4019424a9a0d593030ba Mon Sep 17 00:00:00 2001 From: d3vobed Date: Sat, 27 Jun 2026 01:54:29 +0100 Subject: [PATCH 2/2] fix: add runtime payload validation in parseStreamCreatedPayload Add runtime validation for all required fields (streamId, sender, recipient, amount, startTime, endTime) per PR review feedback. Previously relied solely on TypeScript type assertion which does not enforce shape at runtime. Now validates field presence, type, and non-empty constraint with clear error messages. --- .../src/handlers/streamCreated.test.ts | 37 +++++++++++++++++++ indexer/streams/src/handlers/streamCreated.ts | 37 +++++++++++++++---- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/indexer/streams/src/handlers/streamCreated.test.ts b/indexer/streams/src/handlers/streamCreated.test.ts index 3e1dedc..e36be52 100644 --- a/indexer/streams/src/handlers/streamCreated.test.ts +++ b/indexer/streams/src/handlers/streamCreated.test.ts @@ -60,6 +60,43 @@ describe("parseStreamCreatedPayload", () => { "Expected StreamCreated event topic, got undefined", ); }); + + test("throws on invalid JSON data", () => { + const event = createMockEvent({ data: "not-json" }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + "Failed to parse event data: invalid JSON", + ); + }); + + test("throws on missing streamId field", () => { + const { streamId: _, ...partial } = mockPayload; + const event = createMockEvent({ data: JSON.stringify(partial) }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + 'Invalid payload: "streamId" must be a non-empty string', + ); + }); + + test("throws on non-string amount field", () => { + const event = createMockEvent({ + data: JSON.stringify({ ...mockPayload, amount: 12345 }), + }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + 'Invalid payload: "amount" must be a non-empty string', + ); + }); + + test("throws on empty string recipient", () => { + const event = createMockEvent({ + data: JSON.stringify({ ...mockPayload, recipient: "" }), + }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + 'Invalid payload: "recipient" must be a non-empty string', + ); + }); }); describe("getEventIdentity", () => { diff --git a/indexer/streams/src/handlers/streamCreated.ts b/indexer/streams/src/handlers/streamCreated.ts index 48af9aa..33df1a0 100644 --- a/indexer/streams/src/handlers/streamCreated.ts +++ b/indexer/streams/src/handlers/streamCreated.ts @@ -19,15 +19,38 @@ export function parseStreamCreatedPayload(event: StreamCreatedEvent): StreamCrea ); } - const parsed = JSON.parse(event.data) as StreamCreatedPayload; + let parsed: Record; + try { + parsed = JSON.parse(event.data) as Record; + } catch { + throw new Error("Failed to parse event data: invalid JSON"); + } + + const requiredFields = [ + "streamId", + "sender", + "recipient", + "amount", + "startTime", + "endTime", + ] as const; + + for (const field of requiredFields) { + const value = parsed[field]; + if (typeof value !== "string" || value.length === 0) { + throw new Error( + `Invalid payload: "${field}" must be a non-empty string, got ${typeof value === "string" ? "empty string" : typeof value}`, + ); + } + } return { - streamId: parsed.streamId, - sender: parsed.sender, - recipient: parsed.recipient, - amount: parsed.amount, - startTime: parsed.startTime, - endTime: parsed.endTime, + streamId: parsed.streamId as string, + sender: parsed.sender as string, + recipient: parsed.recipient as string, + amount: parsed.amount as string, + startTime: parsed.startTime as string, + endTime: parsed.endTime as string, }; }