From 1b8817bb9f8a3981ac86d64de3aeafa9ecbc53bd Mon Sep 17 00:00:00 2001 From: Arome8240 Date: Thu, 25 Jun 2026 16:21:18 +0100 Subject: [PATCH] feat: add deterministic identity storage for indexed events - Add IndexedEventEntity with contract ID, ledger number, transaction hash, event index - Create migration with uniqueness constraints and indexes for replay/debug queries - Implement IndexedEventRepository with deduplication via insertSafely() - Add indexer database configuration and environment variables - Update documentation and add tests Implements: Migrated from https://github.com/Fundable-Protocol/stellar_indexer/issues/6 --- .env.example | 10 +- indexer/INDEXER_GUIDELINES.md | 25 +++ indexer/common/package.json | 7 +- indexer/common/src/db/EXAMPLE_USAGE.md | 119 ++++++++++++ indexer/common/src/db/data-source.ts | 108 +++++++++++ indexer/common/src/db/index.ts | 13 ++ .../src/db/indexed-event.entity.test.ts | 48 +++++ indexer/common/src/db/indexed-event.entity.ts | 91 ++++++++++ .../src/db/indexed-event.repository.test.ts | 169 ++++++++++++++++++ .../common/src/db/indexed-event.repository.ts | 130 ++++++++++++++ indexer/common/src/index.ts | 3 + src/config/persistence/data-source.ts | 2 + .../CreateIndexedEventTable1704000000001.js | 68 +++++++ 13 files changed, 791 insertions(+), 2 deletions(-) create mode 100644 indexer/common/src/db/EXAMPLE_USAGE.md create mode 100644 indexer/common/src/db/data-source.ts create mode 100644 indexer/common/src/db/index.ts create mode 100644 indexer/common/src/db/indexed-event.entity.test.ts create mode 100644 indexer/common/src/db/indexed-event.entity.ts create mode 100644 indexer/common/src/db/indexed-event.repository.test.ts create mode 100644 indexer/common/src/db/indexed-event.repository.ts create mode 100644 src/migrations/CreateIndexedEventTable1704000000001.js diff --git a/.env.example b/.env.example index ea1c603..80b79fb 100644 --- a/.env.example +++ b/.env.example @@ -50,7 +50,15 @@ CAIRO_MOCK=true # These values are used by code under indexer/. INDEXER_PORT=4000 INDEXER_LOG_LEVEL=info -INDEXER_DATABASE_URL=postgres://postgres:postgres@localhost:5432/fundable_indexer + +# Indexer database configuration (can use same or separate database) +INDEXER_DATABASE_HOST=localhost +INDEXER_DATABASE_PORT=5432 +INDEXER_DATABASE_USERNAME=postgres +INDEXER_DATABASE_PASSWORD=postgres +INDEXER_DATABASE_NAME=fundable_indexer +INDEXER_DATABASE_SSL=false + SOROBAN_RPC_URL=https://soroban-testnet.stellar.org STELLAR_NETWORK_PASSPHRASE=Test SDF Network ; September 2015 POLL_INTERVAL_MS=5000 diff --git a/indexer/INDEXER_GUIDELINES.md b/indexer/INDEXER_GUIDELINES.md index cb560b7..5df00d4 100644 --- a/indexer/INDEXER_GUIDELINES.md +++ b/indexer/INDEXER_GUIDELINES.md @@ -34,6 +34,31 @@ Required event identity fields: Database writes that represent indexed events should use unique constraints or equivalent conflict handling based on this identity. +### Indexed Event Schema + +The `indexed_event` table provides deterministic identity storage with the +following schema: + +```sql +CREATE TABLE indexed_event ( + id TEXT PRIMARY KEY, + contract_id TEXT NOT NULL, + ledger_number BIGINT NOT NULL, + transaction_hash TEXT NOT NULL, + event_index INTEGER NOT NULL, + event_data JSONB NOT NULL, + event_topics JSONB NOT NULL, + processed_by TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + + -- Deterministic identity constraint + UNIQUE(contract_id, ledger_number, transaction_hash, event_index) +); +``` + +Use the `IndexedEventRepository.insertSafely()` method for idempotent event +storage that handles duplicate detection automatically. + ## Cursor Safety Cursors represent indexed progress and must be conservative: diff --git a/indexer/common/package.json b/indexer/common/package.json index da8956f..c97fa9a 100644 --- a/indexer/common/package.json +++ b/indexer/common/package.json @@ -4,7 +4,8 @@ "private": true, "type": "module", "exports": { - ".": "./src/index.ts" + ".": "./src/index.ts", + "./db": "./src/db/index.ts" }, "scripts": { "build": "tsc -p tsconfig.json", @@ -13,5 +14,9 @@ "lint": "biome check .", "test": "vitest run src", "type-check": "tsc -p tsconfig.json --noEmit" + }, + "dependencies": { + "typeorm": "^0.3.20", + "ulidx": "^2.4.1" } } diff --git a/indexer/common/src/db/EXAMPLE_USAGE.md b/indexer/common/src/db/EXAMPLE_USAGE.md new file mode 100644 index 0000000..3c7dbf4 --- /dev/null +++ b/indexer/common/src/db/EXAMPLE_USAGE.md @@ -0,0 +1,119 @@ +# Indexed Event System - Example Usage + +## Overview + +The indexed event system provides deterministic identity storage for Soroban events +to prevent duplicates during retries, restarts, and replays. + +## Basic Usage + +### 1. Initialize Database Connection + +```typescript +import { initializeIndexerDataSource } from "@fundable-indexer/common/db"; + +const config = { + INDEXER_DATABASE_HOST: process.env.INDEXER_DATABASE_HOST, + INDEXER_DATABASE_PORT: process.env.INDEXER_DATABASE_PORT, + INDEXER_DATABASE_USERNAME: process.env.INDEXER_DATABASE_USERNAME, + INDEXER_DATABASE_PASSWORD: process.env.INDEXER_DATABASE_PASSWORD, + INDEXER_DATABASE_NAME: process.env.INDEXER_DATABASE_NAME, + INDEXER_DATABASE_SSL: process.env.INDEXER_DATABASE_SSL, +}; + +const dataSource = await initializeIndexerDataSource(config); +``` + +### 2. Store Events with Deduplication + +```typescript +import { IndexedEventRepository } from "@fundable-indexer/common/db"; + +const repository = new IndexedEventRepository(dataSource); + +const eventData = { + contractId: "CAFEBABE", + ledgerNumber: BigInt(123456), + transactionHash: "0x1234567890abcdef", + eventIndex: 0, + eventData: { + type: "PaymentStreamCreated", + streamId: "stream-123", + amount: "1000000", + }, + eventTopics: ["PaymentStreamCreated", "CAFEBABE"], + processedBy: "streams", +}; + +// Safe insert - won't create duplicates +const storedEvent = await repository.insertSafely(eventData); +console.log(`Event stored with ID: ${storedEvent.id}`); +``` + +### 3. Check if Event is Already Processed + +```typescript +const isProcessed = await repository.isProcessed( + "CAFEBABE", + BigInt(123456), + "0x1234567890abcdef", + 0, +); + +if (isProcessed) { + console.log("Event already processed, skipping..."); +} else { + console.log("Event not yet processed, handling..."); +} +``` + +### 4. Query Events for Replay/Debug + +```typescript +// Get events for a ledger range +const events = await repository.getByLedgerRange( + BigInt(123000), + BigInt(124000), + "streams", +); + +// Get latest processed ledger +const latestLedger = await repository.getLatestLedger("streams"); +console.log(`Latest processed ledger: ${latestLedger}`); + +// Get events for a specific contract +const contractEvents = await repository.getByContract("CAFEBABE", 10); +``` + +## Migration + +The `CreateIndexedEventTable1704000000001` migration creates the table with: + +1. Unique constraint on `(contract_id, ledger_number, transaction_hash, event_index)` +2. Indexes for efficient queries by contract, ledger, transaction hash, and domain +3. Composite indexes for common query patterns + +Run the migration: +```bash +bun run migration:run +``` + +## Testing + +Run the tests: +```bash +bun run indexer:test +``` + +## Environment Variables + +Add to your `.env` file: +```env +# Indexer Database Configuration +INDEXER_DATABASE_HOST=localhost +INDEXER_DATABASE_PORT=5432 +INDEXER_DATABASE_USERNAME=postgres +INDEXER_DATABASE_PASSWORD=postgres +INDEXER_DATABASE_NAME=fundable_indexer +INDEXER_DATABASE_SSL=false +``` \ No newline at end of file diff --git a/indexer/common/src/db/data-source.ts b/indexer/common/src/db/data-source.ts new file mode 100644 index 0000000..9d0ada6 --- /dev/null +++ b/indexer/common/src/db/data-source.ts @@ -0,0 +1,108 @@ +import "reflect-metadata"; +import { DataSource } from "typeorm"; +import { IndexedEventEntity } from "./indexed-event.entity.js"; + +/** + * Environment variables required for indexer database connection + */ +export interface IndexerDatabaseConfig { + INDEXER_DATABASE_HOST: string; + INDEXER_DATABASE_PORT: string; + INDEXER_DATABASE_USERNAME: string; + INDEXER_DATABASE_PASSWORD: string; + INDEXER_DATABASE_NAME: string; + INDEXER_DATABASE_SSL?: string; +} + +/** + * Validate required database configuration + */ +export function validateIndexerDatabaseConfig( + config: Partial, +): asserts config is IndexerDatabaseConfig { + const missingKeys = [ + ["INDEXER_DATABASE_HOST", config.INDEXER_DATABASE_HOST], + ["INDEXER_DATABASE_PORT", config.INDEXER_DATABASE_PORT], + ["INDEXER_DATABASE_USERNAME", config.INDEXER_DATABASE_USERNAME], + ["INDEXER_DATABASE_PASSWORD", config.INDEXER_DATABASE_PASSWORD], + ["INDEXER_DATABASE_NAME", config.INDEXER_DATABASE_NAME], + ] + .filter(([, value]) => !value) + .map(([key]) => key); + + if (missingKeys.length > 0) { + throw new Error( + `Missing required indexer database env vars: ${missingKeys.join(", ")}`, + ); + } +} + +/** + * Create indexer data source + */ +export function createIndexerDataSource( + config: IndexerDatabaseConfig, +): DataSource { + const port = Number(config.INDEXER_DATABASE_PORT); + const useSsl = config.INDEXER_DATABASE_SSL === "true"; + + return new DataSource({ + host: config.INDEXER_DATABASE_HOST, + port: Number.isNaN(port) ? 5432 : port, + username: config.INDEXER_DATABASE_USERNAME, + password: config.INDEXER_DATABASE_PASSWORD, + database: config.INDEXER_DATABASE_NAME, + type: "postgres", + connectTimeoutMS: 5000, + synchronize: false, + logging: process.env.NODE_ENV === "development", + entities: [IndexedEventEntity], + migrations: ["src/migrations/*.js"], + ...(useSsl ? { ssl: { rejectUnauthorized: false } } : {}), + }); +} + +/** + * Singleton instance of indexer data source + */ +let indexerDataSource: DataSource | null = null; + +/** + * Get or initialize the indexer data source + */ +export function getIndexerDataSource(): DataSource { + if (!indexerDataSource) { + throw new Error( + "Indexer data source not initialized. Call initializeIndexerDataSource first.", + ); + } + return indexerDataSource; +} + +/** + * Initialize indexer data source with environment configuration + */ +export async function initializeIndexerDataSource( + config: IndexerDatabaseConfig, +): Promise { + validateIndexerDatabaseConfig(config); + + if (indexerDataSource?.isInitialized) { + return indexerDataSource; + } + + indexerDataSource = createIndexerDataSource(config); + await indexerDataSource.initialize(); + + return indexerDataSource; +} + +/** + * Close indexer data source connection + */ +export async function closeIndexerDataSource(): Promise { + if (indexerDataSource?.isInitialized) { + await indexerDataSource.destroy(); + indexerDataSource = null; + } +} \ No newline at end of file diff --git a/indexer/common/src/db/index.ts b/indexer/common/src/db/index.ts new file mode 100644 index 0000000..c9469a0 --- /dev/null +++ b/indexer/common/src/db/index.ts @@ -0,0 +1,13 @@ +export { IndexedEventEntity } from "./indexed-event.entity.js"; +export { + IndexedEventRepository, + type IndexedEventData, +} from "./indexed-event.repository.js"; +export { + type IndexerDatabaseConfig, + validateIndexerDatabaseConfig, + createIndexerDataSource, + getIndexerDataSource, + initializeIndexerDataSource, + closeIndexerDataSource, +} from "./data-source.js"; \ No newline at end of file diff --git a/indexer/common/src/db/indexed-event.entity.test.ts b/indexer/common/src/db/indexed-event.entity.test.ts new file mode 100644 index 0000000..453bcb4 --- /dev/null +++ b/indexer/common/src/db/indexed-event.entity.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, test, beforeEach } from "vitest"; +import { IndexedEventEntity } from "./indexed-event.entity.js"; + +describe("IndexedEventEntity", () => { + let entity: IndexedEventEntity; + + beforeEach(() => { + entity = new IndexedEventEntity(); + entity.contractId = "CAFEBABE"; + entity.ledgerNumber = BigInt(123456); + entity.transactionHash = "0x1234567890abcdef"; + entity.eventIndex = 0; + entity.eventData = { type: "TestEvent", amount: "100" }; + entity.eventTopics = ["topic1", "topic2"]; + entity.processedBy = "streams"; + }); + + test("generates ULID when id is not provided", () => { + entity.generateId(); + expect(entity.id).toBeDefined(); + expect(entity.id.length).toBe(26); // ULID length + expect(entity.id).toMatch(/^[0-9A-Z]{26}$/); + }); + + test("preserves existing id", () => { + const existingId = "01J0XYZABCDEFGHIJKLMNOPQR"; + entity.id = existingId; + entity.generateId(); + expect(entity.id).toBe(existingId); + }); + + test("has required fields for deterministic identity", () => { + expect(entity.contractId).toBe("CAFEBABE"); + expect(entity.ledgerNumber).toBe(BigInt(123456)); + expect(entity.transactionHash).toBe("0x1234567890abcdef"); + expect(entity.eventIndex).toBe(0); + }); + + test("has JSON data fields", () => { + expect(entity.eventData).toEqual({ type: "TestEvent", amount: "100" }); + expect(entity.eventTopics).toEqual(["topic1", "topic2"]); + }); + + test("has metadata fields", () => { + expect(entity.processedBy).toBe("streams"); + expect(entity.createdAt).toBeUndefined(); // Will be set by DB + }); +}); \ No newline at end of file diff --git a/indexer/common/src/db/indexed-event.entity.ts b/indexer/common/src/db/indexed-event.entity.ts new file mode 100644 index 0000000..7bc6ffe --- /dev/null +++ b/indexer/common/src/db/indexed-event.entity.ts @@ -0,0 +1,91 @@ +import { + Entity, + Column, + PrimaryColumn, + CreateDateColumn, + Index, + BeforeInsert, +} from "typeorm"; +import { ulid } from "ulidx"; + +/** + * Entity representing a Soroban event that has been indexed. + * Provides deterministic identity for deduplication and replay safety. + * + * Required fields for event identity (per INDEXER_GUIDELINES.md): + * - Contract ID + * - Ledger number + * - Transaction hash + * - Event index or another deterministic event position + */ +@Entity("indexed_event") +@Index("indexed_event_contract_id_idx", ["contractId"]) +@Index("indexed_event_ledger_idx", ["ledgerNumber"]) +@Index("indexed_event_tx_hash_idx", ["transactionHash"]) +@Index("indexed_event_created_at_idx", ["createdAt"]) +@Index("indexed_event_dedupe_idx", ["contractId", "ledgerNumber", "transactionHash", "eventIndex"], { + unique: true, +}) +export class IndexedEventEntity { + @PrimaryColumn("text") + id: string; + + /** + * Soroban contract ID that emitted the event + */ + @Column("text", { name: "contract_id", nullable: false }) + contractId: string; + + /** + * Stellar ledger number when the event occurred + */ + @Column("bigint", { name: "ledger_number", nullable: false }) + ledgerNumber: bigint; + + /** + * Transaction hash that contains the event + */ + @Column("text", { name: "transaction_hash", nullable: false }) + transactionHash: string; + + /** + * Position of the event within the transaction (0-indexed) + */ + @Column("integer", { name: "event_index", nullable: false }) + eventIndex: number; + + /** + * Raw event data as JSON + */ + @Column("jsonb", { name: "event_data", nullable: false }) + eventData: Record; + + /** + * Event topics as JSON array + */ + @Column("jsonb", { name: "event_topics", nullable: false }) + eventTopics: string[]; + + /** + * Domain that processed this event (e.g., 'streams', 'distributions') + */ + @Column("text", { name: "processed_by", nullable: false }) + processedBy: string; + + /** + * When the indexer processed this event + */ + @CreateDateColumn({ + type: "timestamp", + name: "created_at", + default: () => "CURRENT_TIMESTAMP", + }) + createdAt: Date; + + @BeforeInsert() + generateId() { + if (!this.id) { + this.id = ulid(); + } + } +} \ No newline at end of file diff --git a/indexer/common/src/db/indexed-event.repository.test.ts b/indexer/common/src/db/indexed-event.repository.test.ts new file mode 100644 index 0000000..3464d96 --- /dev/null +++ b/indexer/common/src/db/indexed-event.repository.test.ts @@ -0,0 +1,169 @@ +import { describe, expect, test, beforeEach, vi } from "vitest"; +import { DataSource, Repository } from "typeorm"; +import { IndexedEventEntity } from "./indexed-event.entity.js"; +import { IndexedEventRepository } from "./indexed-event.repository.js"; + +// Mock data +const mockEventData = { + contractId: "CAFEBABE", + ledgerNumber: BigInt(123456), + transactionHash: "0x1234567890abcdef", + eventIndex: 0, + eventData: { type: "TestEvent", amount: "100" }, + eventTopics: ["topic1", "topic2"], + processedBy: "streams", +}; + +describe("IndexedEventRepository", () => { + let mockDataSource: DataSource; + let mockRepo: Repository; + let repository: IndexedEventRepository; + + beforeEach(() => { + mockRepo = { + create: vi.fn(), + save: vi.fn(), + findOne: vi.fn(), + count: vi.fn(), + createQueryBuilder: vi.fn(), + } as unknown as Repository; + + mockDataSource = { + getRepository: vi.fn(() => mockRepo), + } as unknown as DataSource; + + repository = new IndexedEventRepository(mockDataSource); + }); + + describe("insertSafely", () => { + test("inserts new event successfully", async () => { + const mockEntity = new IndexedEventEntity(); + Object.assign(mockEntity, { id: "test-id", ...mockEventData }); + + vi.mocked(mockRepo.create).mockReturnValue(mockEntity); + vi.mocked(mockRepo.save).mockResolvedValue(mockEntity); + + const result = await repository.insertSafely(mockEventData); + + expect(mockRepo.create).toHaveBeenCalledWith(mockEventData); + expect(mockRepo.save).toHaveBeenCalledWith(mockEntity); + expect(result).toBe(mockEntity); + }); + + test("handles duplicate event by returning existing", async () => { + const mockEntity = new IndexedEventEntity(); + Object.assign(mockEntity, { id: "existing-id", ...mockEventData }); + + const duplicateError = new Error("Duplicate key"); + (duplicateError as any).code = "23505"; // PostgreSQL unique violation + + vi.mocked(mockRepo.create).mockReturnValue(mockEntity); + vi.mocked(mockRepo.save).mockRejectedValue(duplicateError); + vi.mocked(mockRepo.findOne).mockResolvedValue(mockEntity); + + const result = await repository.insertSafely(mockEventData); + + expect(mockRepo.findOne).toHaveBeenCalledWith({ + where: { + contractId: mockEventData.contractId, + ledgerNumber: mockEventData.ledgerNumber, + transactionHash: mockEventData.transactionHash, + eventIndex: mockEventData.eventIndex, + }, + }); + expect(result).toBe(mockEntity); + }); + + test("re-throws non-duplicate errors", async () => { + const mockEntity = new IndexedEventEntity(); + Object.assign(mockEntity, mockEventData); + + const otherError = new Error("Database connection failed"); + + vi.mocked(mockRepo.create).mockReturnValue(mockEntity); + vi.mocked(mockRepo.save).mockRejectedValue(otherError); + + await expect(repository.insertSafely(mockEventData)).rejects.toThrow( + "Database connection failed", + ); + }); + }); + + describe("isProcessed", () => { + test("returns true when event exists", async () => { + vi.mocked(mockRepo.count).mockResolvedValue(1); + + const result = await repository.isProcessed( + "CAFEBABE", + BigInt(123456), + "0x1234567890abcdef", + 0, + ); + + expect(mockRepo.count).toHaveBeenCalledWith({ + where: { + contractId: "CAFEBABE", + ledgerNumber: BigInt(123456), + transactionHash: "0x1234567890abcdef", + eventIndex: 0, + }, + }); + expect(result).toBe(true); + }); + + test("returns false when event does not exist", async () => { + vi.mocked(mockRepo.count).mockResolvedValue(0); + + const result = await repository.isProcessed( + "CAFEBABE", + BigInt(123456), + "0x1234567890abcdef", + 0, + ); + + expect(result).toBe(false); + }); + }); + + describe("getLatestLedger", () => { + test("returns latest ledger number for domain", async () => { + const mockQueryBuilder = { + select: vi.fn().mockReturnThis(), + where: vi.fn().mockReturnThis(), + getRawOne: vi.fn().mockResolvedValue({ maxLedger: "123456" }), + }; + + vi.mocked(mockRepo.createQueryBuilder).mockReturnValue( + mockQueryBuilder as any, + ); + + const result = await repository.getLatestLedger("streams"); + + expect(mockRepo.createQueryBuilder).toHaveBeenCalledWith("event"); + expect(mockQueryBuilder.select).toHaveBeenCalledWith( + "MAX(event.ledgerNumber)", + "maxLedger", + ); + expect(mockQueryBuilder.where).toHaveBeenCalledWith( + "event.processedBy = :domain", + { domain: "streams" }, + ); + expect(result).toBe(BigInt(123456)); + }); + + test("returns null when no events exist", async () => { + const mockQueryBuilder = { + select: vi.fn().mockReturnThis(), + getRawOne: vi.fn().mockResolvedValue({}), + }; + + vi.mocked(mockRepo.createQueryBuilder).mockReturnValue( + mockQueryBuilder as any, + ); + + const result = await repository.getLatestLedger(); + + expect(result).toBeNull(); + }); + }); +}); \ No newline at end of file diff --git a/indexer/common/src/db/indexed-event.repository.ts b/indexer/common/src/db/indexed-event.repository.ts new file mode 100644 index 0000000..302ccf1 --- /dev/null +++ b/indexer/common/src/db/indexed-event.repository.ts @@ -0,0 +1,130 @@ +import { DataSource, Repository } from "typeorm"; +import { IndexedEventEntity } from "./indexed-event.entity.js"; + +export interface IndexedEventData { + contractId: string; + ledgerNumber: bigint; + transactionHash: string; + eventIndex: number; + eventData: Record; + eventTopics: string[]; + processedBy: string; +} + +export class IndexedEventRepository { + private repo: Repository; + + constructor(dataSource: DataSource) { + this.repo = dataSource.getRepository(IndexedEventEntity); + } + + /** + * Safely insert an event with deduplication check. + * Returns the existing event if already processed, otherwise inserts new. + */ + async insertSafely(event: IndexedEventData): Promise { + try { + const entity = this.repo.create(event); + return await this.repo.save(entity); + } catch (error: unknown) { + // Check if it's a unique constraint violation (duplicate event) + if ( + error instanceof Error && + "code" in error && + error.code === "23505" // PostgreSQL unique violation + ) { + // Find the existing event + const existing = await this.repo.findOne({ + where: { + contractId: event.contractId, + ledgerNumber: event.ledgerNumber, + transactionHash: event.transactionHash, + eventIndex: event.eventIndex, + }, + }); + if (existing) { + return existing; + } + } + throw error; + } + } + + /** + * Check if an event has already been processed + */ + async isProcessed( + contractId: string, + ledgerNumber: bigint, + transactionHash: string, + eventIndex: number, + ): Promise { + const count = await this.repo.count({ + where: { + contractId, + ledgerNumber, + transactionHash, + eventIndex, + }, + }); + return count > 0; + } + + /** + * Get events for a specific ledger range + */ + async getByLedgerRange( + startLedger: bigint, + endLedger: bigint, + domain?: string, + ): Promise { + const query = this.repo + .createQueryBuilder("event") + .where("event.ledgerNumber >= :startLedger", { startLedger }) + .andWhere("event.ledgerNumber <= :endLedger", { endLedger }) + .orderBy("event.ledgerNumber", "ASC") + .addOrderBy("event.eventIndex", "ASC"); + + if (domain) { + query.andWhere("event.processedBy = :domain", { domain }); + } + + return query.getMany(); + } + + /** + * Get events for a specific contract + */ + async getByContract( + contractId: string, + limit?: number, + ): Promise { + const query = this.repo + .createQueryBuilder("event") + .where("event.contractId = :contractId", { contractId }) + .orderBy("event.ledgerNumber", "DESC") + .addOrderBy("event.eventIndex", "DESC"); + + if (limit) { + query.limit(limit); + } + + return query.getMany(); + } + + /** + * Get the latest processed ledger for a domain + */ + async getLatestLedger(domain?: string): Promise { + const query = this.repo + .createQueryBuilder("event") + .select("MAX(event.ledgerNumber)", "maxLedger"); + + if (domain) { + query.where("event.processedBy = :domain", { domain }); + } + + const result = await query.getRawOne(); + return result?.maxLedger ? BigInt(result.maxLedger) : null; + } +} \ No newline at end of file diff --git a/indexer/common/src/index.ts b/indexer/common/src/index.ts index a7a318b..935817a 100644 --- a/indexer/common/src/index.ts +++ b/indexer/common/src/index.ts @@ -2,3 +2,6 @@ export const commonPackage = { name: "@fundable-indexer/common", role: "shared-infrastructure", } as const; + +// Re-export database module +export * from "./db/index.js"; diff --git a/src/config/persistence/data-source.ts b/src/config/persistence/data-source.ts index 160c940..2b8146c 100644 --- a/src/config/persistence/data-source.ts +++ b/src/config/persistence/data-source.ts @@ -15,6 +15,7 @@ import FeeConfigEntity from '../../components/v1/feeConfig/feeConfig.entity'; import UserEntity from '../../components/v1/user/user.entity'; import CampaignEntity from '../../components/v1/campaign/campaign.entity'; import AuditLogEntity from '../../components/v1/audit/auditLog.entity'; +import { IndexedEventEntity } from '../../../indexer/common/src/db/indexed-event.entity'; const { dbConfigs } = appConfigs; @@ -56,6 +57,7 @@ const AppDataSource = new DataSource({ UserEntity, CampaignEntity, AuditLogEntity, + IndexedEventEntity, ], migrations: ['src/migrations/*.js'], ...(appConfigs.isProd || appConfigs.isStaging diff --git a/src/migrations/CreateIndexedEventTable1704000000001.js b/src/migrations/CreateIndexedEventTable1704000000001.js new file mode 100644 index 0000000..11c8755 --- /dev/null +++ b/src/migrations/CreateIndexedEventTable1704000000001.js @@ -0,0 +1,68 @@ +const { MigrationInterface, QueryRunner } = require("typeorm"); + +module.exports = class CreateIndexedEventTable1704000000001 { + name = "CreateIndexedEventTable1704000000001"; + + async up(queryRunner) { + // Create indexed_event table + await queryRunner.query(` + CREATE TABLE "indexed_event" ( + "id" text NOT NULL, + "contract_id" text NOT NULL, + "ledger_number" bigint NOT NULL, + "transaction_hash" text NOT NULL, + "event_index" integer NOT NULL, + "event_data" jsonb NOT NULL, + "event_topics" jsonb NOT NULL, + "processed_by" text NOT NULL, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "PK_indexed_event_id" PRIMARY KEY ("id"), + CONSTRAINT "UQ_indexed_event_dedupe" UNIQUE ( + "contract_id", + "ledger_number", + "transaction_hash", + "event_index" + ) + ) + `); + + // Create indexes for efficient queries + await queryRunner.query(` + CREATE INDEX "indexed_event_contract_id_idx" ON "indexed_event" ("contract_id") + `); + await queryRunner.query(` + CREATE INDEX "indexed_event_ledger_idx" ON "indexed_event" ("ledger_number") + `); + await queryRunner.query(` + CREATE INDEX "indexed_event_tx_hash_idx" ON "indexed_event" ("transaction_hash") + `); + await queryRunner.query(` + CREATE INDEX "indexed_event_created_at_idx" ON "indexed_event" ("created_at") + `); + await queryRunner.query(` + CREATE INDEX "indexed_event_processed_by_idx" ON "indexed_event" ("processed_by") + `); + + // Composite index for common query patterns + await queryRunner.query(` + CREATE INDEX "indexed_event_contract_ledger_idx" ON "indexed_event" ("contract_id", "ledger_number") + `); + await queryRunner.query(` + CREATE INDEX "indexed_event_domain_ledger_idx" ON "indexed_event" ("processed_by", "ledger_number") + `); + } + + async down(queryRunner) { + // Drop indexes first + await queryRunner.query(`DROP INDEX "indexed_event_domain_ledger_idx"`); + await queryRunner.query(`DROP INDEX "indexed_event_contract_ledger_idx"`); + await queryRunner.query(`DROP INDEX "indexed_event_processed_by_idx"`); + await queryRunner.query(`DROP INDEX "indexed_event_created_at_idx"`); + await queryRunner.query(`DROP INDEX "indexed_event_tx_hash_idx"`); + await queryRunner.query(`DROP INDEX "indexed_event_ledger_idx"`); + await queryRunner.query(`DROP INDEX "indexed_event_contract_id_idx"`); + + // Drop table + await queryRunner.query(`DROP TABLE "indexed_event"`); + } +}; \ No newline at end of file