diff --git a/indexer/streams/src/handlers/index.ts b/indexer/streams/src/handlers/index.ts index 060e416..edd56d9 100644 --- a/indexer/streams/src/handlers/index.ts +++ b/indexer/streams/src/handlers/index.ts @@ -1,4 +1,9 @@ export { streamFundedHandler } from "./stream-funded.handler.js"; export { streamWithdrawalHandler } from "./stream-withdrawal.handler.js"; export { streamCancelHandler } from "./stream-cancel.handler.js"; +export { + handleStreamCreated, + parseStreamCreatedPayload, + STREAM_CREATED_TOPIC, +} from "./streamCreated.js"; export * from "./types.js"; diff --git a/indexer/streams/src/handlers/streamCreated.test.ts b/indexer/streams/src/handlers/streamCreated.test.ts new file mode 100644 index 0000000..e36be52 --- /dev/null +++ b/indexer/streams/src/handlers/streamCreated.test.ts @@ -0,0 +1,179 @@ +import { describe, expect, test } from "vitest"; + +import { + STREAM_CREATED_TOPIC, + getEventIdentity, + handleStreamCreated, + mapStreamCreatedToRecord, + parseStreamCreatedPayload, +} from "./streamCreated.js"; +import type { StreamCreatedEvent } from "./types.js"; + +function createMockEvent(overrides?: Partial): StreamCreatedEvent { + return { + contractId: "0x123", + ledger: 12345, + txHash: "0xabc", + eventIndex: 0, + topics: [STREAM_CREATED_TOPIC], + data: JSON.stringify({ + streamId: "stream-1", + sender: "0xsender", + recipient: "0xrecipient", + amount: "1000000000", + startTime: "1000000", + endTime: "2000000", + }), + ...overrides, + }; +} + +const mockPayload = { + streamId: "stream-1", + sender: "0xsender", + recipient: "0xrecipient", + amount: "1000000000", + startTime: "1000000", + endTime: "2000000", +}; + +describe("parseStreamCreatedPayload", () => { + test("parses a valid StreamCreated event", () => { + const event = createMockEvent(); + const payload = parseStreamCreatedPayload(event); + + expect(payload).toEqual(mockPayload); + }); + + test("throws when event topic does not match", () => { + const event = createMockEvent({ topics: ["WrongTopic"] }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + "Expected StreamCreated event topic, got WrongTopic", + ); + }); + + test("throws when topics array is empty", () => { + const event = createMockEvent({ topics: [] }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + "Expected StreamCreated event topic, got undefined", + ); + }); + + test("throws on invalid JSON data", () => { + const event = createMockEvent({ data: "not-json" }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + "Failed to parse event data: invalid JSON", + ); + }); + + test("throws on missing streamId field", () => { + const { streamId: _, ...partial } = mockPayload; + const event = createMockEvent({ data: JSON.stringify(partial) }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + 'Invalid payload: "streamId" must be a non-empty string', + ); + }); + + test("throws on non-string amount field", () => { + const event = createMockEvent({ + data: JSON.stringify({ ...mockPayload, amount: 12345 }), + }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + 'Invalid payload: "amount" must be a non-empty string', + ); + }); + + test("throws on empty string recipient", () => { + const event = createMockEvent({ + data: JSON.stringify({ ...mockPayload, recipient: "" }), + }); + + expect(() => parseStreamCreatedPayload(event)).toThrow( + 'Invalid payload: "recipient" must be a non-empty string', + ); + }); +}); + +describe("getEventIdentity", () => { + test("produces a deterministic identity string", () => { + const event = createMockEvent(); + const identity = getEventIdentity(event); + + expect(identity).toBe("0x123:12345:0xabc:0"); + }); + + test("changes when any identity field changes", () => { + const base = createMockEvent(); + const differentContract = createMockEvent({ contractId: "0x456" }); + const differentLedger = createMockEvent({ ledger: 99999 }); + const differentTxHash = createMockEvent({ txHash: "0xdef" }); + const differentIndex = createMockEvent({ eventIndex: 1 }); + + const baseId = getEventIdentity(base); + expect(getEventIdentity(differentContract)).not.toBe(baseId); + expect(getEventIdentity(differentLedger)).not.toBe(baseId); + expect(getEventIdentity(differentTxHash)).not.toBe(baseId); + expect(getEventIdentity(differentIndex)).not.toBe(baseId); + }); +}); + +describe("mapStreamCreatedToRecord", () => { + test("maps payload and event to a Stream record", () => { + const event = createMockEvent(); + const record = mapStreamCreatedToRecord(mockPayload, event); + + expect(record).toEqual({ + id: "stream-1", + sender: "0xsender", + recipient: "0xrecipient", + amount: "1000000000", + startTime: "1000000", + endTime: "2000000", + contractId: "0x123", + ledger: 12345, + txHash: "0xabc", + eventIndex: 0, + }); + }); +}); + +describe("handleStreamCreated", () => { + test("returns stream record and identity", () => { + const event = createMockEvent(); + const result = handleStreamCreated(event); + + expect(result.stream.id).toBe("stream-1"); + expect(result.stream.contractId).toBe("0x123"); + expect(result.stream.ledger).toBe(12345); + expect(result.identity).toBe("0x123:12345:0xabc:0"); + }); + + test("is idempotent (same input produces same output)", () => { + const event = createMockEvent(); + const result1 = handleStreamCreated(event); + const result2 = handleStreamCreated(event); + + expect(result1).toEqual(result2); + }); + + test("handles different stream IDs correctly", () => { + const event1 = createMockEvent({ + data: JSON.stringify({ ...mockPayload, streamId: "stream-1" }), + }); + const event2 = createMockEvent({ + data: JSON.stringify({ ...mockPayload, streamId: "stream-2" }), + }); + + const result1 = handleStreamCreated(event1); + const result2 = handleStreamCreated(event2); + + expect(result1.stream.id).toBe("stream-1"); + expect(result2.stream.id).toBe("stream-2"); + expect(result1.stream.id).not.toBe(result2.stream.id); + }); +}); diff --git a/indexer/streams/src/handlers/streamCreated.ts b/indexer/streams/src/handlers/streamCreated.ts new file mode 100644 index 0000000..33df1a0 --- /dev/null +++ b/indexer/streams/src/handlers/streamCreated.ts @@ -0,0 +1,88 @@ +import type { StreamCreatedEvent, StreamRecord } from "./types.js"; + +export const STREAM_CREATED_TOPIC = "StreamCreated"; + +export interface StreamCreatedPayload { + streamId: string; + sender: string; + recipient: string; + amount: string; + startTime: string; + endTime: string; +} + +export function parseStreamCreatedPayload(event: StreamCreatedEvent): StreamCreatedPayload { + const eventName = event.topics[0]; + if (!eventName || eventName !== STREAM_CREATED_TOPIC) { + throw new Error( + `Expected ${STREAM_CREATED_TOPIC} event topic, got ${eventName ?? "undefined"}`, + ); + } + + let parsed: Record; + try { + parsed = JSON.parse(event.data) as Record; + } catch { + throw new Error("Failed to parse event data: invalid JSON"); + } + + const requiredFields = [ + "streamId", + "sender", + "recipient", + "amount", + "startTime", + "endTime", + ] as const; + + for (const field of requiredFields) { + const value = parsed[field]; + if (typeof value !== "string" || value.length === 0) { + throw new Error( + `Invalid payload: "${field}" must be a non-empty string, got ${typeof value === "string" ? "empty string" : typeof value}`, + ); + } + } + + return { + streamId: parsed.streamId as string, + sender: parsed.sender as string, + recipient: parsed.recipient as string, + amount: parsed.amount as string, + startTime: parsed.startTime as string, + endTime: parsed.endTime as string, + }; +} + +export function getEventIdentity(event: StreamCreatedEvent): string { + return `${event.contractId}:${event.ledger}:${event.txHash}:${event.eventIndex}`; +} + +export function mapStreamCreatedToRecord( + payload: StreamCreatedPayload, + event: StreamCreatedEvent, +): StreamRecord { + return { + id: payload.streamId, + sender: payload.sender, + recipient: payload.recipient, + amount: payload.amount, + startTime: payload.startTime, + endTime: payload.endTime, + contractId: event.contractId, + ledger: event.ledger, + txHash: event.txHash, + eventIndex: event.eventIndex, + }; +} + +export function handleStreamCreated(event: StreamCreatedEvent): { + stream: StreamRecord; + identity: string; +} { + const payload = parseStreamCreatedPayload(event); + const stream = mapStreamCreatedToRecord(payload, event); + const identity = getEventIdentity(event); + + return { stream, identity }; +} diff --git a/indexer/streams/src/handlers/types.ts b/indexer/streams/src/handlers/types.ts index 92bb017..ad59caf 100644 --- a/indexer/streams/src/handlers/types.ts +++ b/indexer/streams/src/handlers/types.ts @@ -1,3 +1,25 @@ +export interface StreamCreatedEvent { + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; + topics: string[]; + data: string; +} + +export interface StreamRecord { + id: string; + sender: string; + recipient: string; + amount: string; + startTime: string; + endTime: string; + contractId: string; + ledger: number; + txHash: string; + eventIndex: number; +} + export interface StreamFundedPayload { streamId: string | undefined; sender: string | undefined; diff --git a/indexer/streams/src/index.ts b/indexer/streams/src/index.ts index 94682ad..731f74f 100644 --- a/indexer/streams/src/index.ts +++ b/indexer/streams/src/index.ts @@ -10,3 +10,8 @@ export { Stream } from "./db/entity/Stream.js"; export { WithdrawalAction } from "./db/entity/WithdrawalAction.js"; export { CancelAction } from "./db/entity/CancelAction.js"; export * from "./handlers/index.js"; +export { + handleStreamCreated, + parseStreamCreatedPayload, + STREAM_CREATED_TOPIC, +} from "./handlers/streamCreated.js";