Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,15 @@ CAIRO_MOCK=true
# These values are used by code under indexer/.
INDEXER_PORT=4000
INDEXER_LOG_LEVEL=info
INDEXER_DATABASE_URL=postgres://postgres:postgres@localhost:5432/fundable_indexer

# Indexer database configuration (can use same or separate database)
INDEXER_DATABASE_HOST=localhost
INDEXER_DATABASE_PORT=5432
INDEXER_DATABASE_USERNAME=postgres
INDEXER_DATABASE_PASSWORD=postgres
INDEXER_DATABASE_NAME=fundable_indexer
INDEXER_DATABASE_SSL=false

SOROBAN_RPC_URL=https://soroban-testnet.stellar.org
STELLAR_NETWORK_PASSPHRASE=Test SDF Network ; September 2015
POLL_INTERVAL_MS=5000
Expand Down
25 changes: 25 additions & 0 deletions indexer/INDEXER_GUIDELINES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,31 @@ Required event identity fields:
Database writes that represent indexed events should use unique constraints or
equivalent conflict handling based on this identity.

### Indexed Event Schema

The `indexed_event` table provides deterministic identity storage with the
following schema:

```sql
CREATE TABLE indexed_event (
id TEXT PRIMARY KEY,
contract_id TEXT NOT NULL,
ledger_number BIGINT NOT NULL,
transaction_hash TEXT NOT NULL,
event_index INTEGER NOT NULL,
event_data JSONB NOT NULL,
event_topics JSONB NOT NULL,
processed_by TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

-- Deterministic identity constraint
UNIQUE(contract_id, ledger_number, transaction_hash, event_index)
);
```

Use the `IndexedEventRepository.insertSafely()` method for idempotent event
storage that handles duplicate detection automatically.

## Cursor Safety

Cursors represent indexed progress and must be conservative:
Expand Down
7 changes: 6 additions & 1 deletion indexer/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"private": true,
"type": "module",
"exports": {
".": "./src/index.ts"
".": "./src/index.ts",
"./db": "./src/db/index.ts"
},
"scripts": {
"build": "tsc -p tsconfig.json",
Expand All @@ -13,5 +14,9 @@
"lint": "biome check .",
"test": "vitest run src",
"type-check": "tsc -p tsconfig.json --noEmit"
},
"dependencies": {
"typeorm": "^0.3.20",
"ulidx": "^2.4.1"
Comment on lines +17 to +20

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Checking indexer/common/package.json dependencies:"
cat indexer/common/package.json | jq '.dependencies // {}'

echo
echo "Searching for direct import of reflect-metadata in indexer/common:"
rg -n --type=ts '^\s*import\s+["'\'']reflect-metadata["'\'']' indexer/common/src

echo
echo "Expected: dependencies includes \"reflect-metadata\" if imported by package entrypoints."

Repository: Fundable-Protocol/Backend

Length of output: 484


Add reflect-metadata to package.json dependencies.

The file indexer/common/src/db/data-source.ts explicitly imports reflect-metadata. Since this file is exposed via the new ./db entrypoint in indexer/common/package.json, consumers using this module will encounter a runtime failure (ReferenceError) if reflect-metadata is not installed in their own node_modules.

// indexer/common/package.json
"dependencies": {
  "reflect-metadata": "0.2.2",
  "typeorm": "^0.3.20",
  "ulidx": "^2.4.1"
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/package.json` around lines 17 - 20, Add reflect-metadata to
the dependencies in indexer/common/package.json so the new ./db entrypoint can
resolve the explicit import used by data-source.ts. Update the dependencies list
alongside typeorm and ulidx, keeping the version pinned as requested, so
consumers of the common package do not hit a runtime ReferenceError when
importing the database module.

}
}
119 changes: 119 additions & 0 deletions indexer/common/src/db/EXAMPLE_USAGE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Indexed Event System - Example Usage

## Overview

The indexed event system provides deterministic identity storage for Soroban events
to prevent duplicates during retries, restarts, and replays.

## Basic Usage

### 1. Initialize Database Connection

```typescript
import { initializeIndexerDataSource } from "@fundable-indexer/common/db";

const config = {
INDEXER_DATABASE_HOST: process.env.INDEXER_DATABASE_HOST,
INDEXER_DATABASE_PORT: process.env.INDEXER_DATABASE_PORT,
INDEXER_DATABASE_USERNAME: process.env.INDEXER_DATABASE_USERNAME,
INDEXER_DATABASE_PASSWORD: process.env.INDEXER_DATABASE_PASSWORD,
INDEXER_DATABASE_NAME: process.env.INDEXER_DATABASE_NAME,
INDEXER_DATABASE_SSL: process.env.INDEXER_DATABASE_SSL,
};

const dataSource = await initializeIndexerDataSource(config);
```

### 2. Store Events with Deduplication

```typescript
import { IndexedEventRepository } from "@fundable-indexer/common/db";

const repository = new IndexedEventRepository(dataSource);

const eventData = {
contractId: "CAFEBABE",
ledgerNumber: BigInt(123456),
transactionHash: "0x1234567890abcdef",
eventIndex: 0,
eventData: {
type: "PaymentStreamCreated",
streamId: "stream-123",
amount: "1000000",
},
eventTopics: ["PaymentStreamCreated", "CAFEBABE"],
processedBy: "streams",
};

// Safe insert - won't create duplicates
const storedEvent = await repository.insertSafely(eventData);
console.log(`Event stored with ID: ${storedEvent.id}`);
```

### 3. Check if Event is Already Processed

```typescript
const isProcessed = await repository.isProcessed(
"CAFEBABE",
BigInt(123456),
"0x1234567890abcdef",
0,
);

if (isProcessed) {
console.log("Event already processed, skipping...");
} else {
console.log("Event not yet processed, handling...");
}
```

### 4. Query Events for Replay/Debug

```typescript
// Get events for a ledger range
const events = await repository.getByLedgerRange(
BigInt(123000),
BigInt(124000),
"streams",
);

// Get latest processed ledger
const latestLedger = await repository.getLatestLedger("streams");
console.log(`Latest processed ledger: ${latestLedger}`);

// Get events for a specific contract
const contractEvents = await repository.getByContract("CAFEBABE", 10);
```

## Migration

The `CreateIndexedEventTable1704000000001` migration creates the table with:

1. Unique constraint on `(contract_id, ledger_number, transaction_hash, event_index)`
2. Indexes for efficient queries by contract, ledger, transaction hash, and domain
3. Composite indexes for common query patterns

Run the migration:
```bash
bun run migration:run
```

## Testing

Run the tests:
```bash
bun run indexer:test
```

## Environment Variables

Add to your `.env` file:
```env
# Indexer Database Configuration
INDEXER_DATABASE_HOST=localhost
INDEXER_DATABASE_PORT=5432
INDEXER_DATABASE_USERNAME=postgres
INDEXER_DATABASE_PASSWORD=postgres
INDEXER_DATABASE_NAME=fundable_indexer
INDEXER_DATABASE_SSL=false
```
108 changes: 108 additions & 0 deletions indexer/common/src/db/data-source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import "reflect-metadata";
import { DataSource } from "typeorm";
import { IndexedEventEntity } from "./indexed-event.entity.js";

/**
* Environment variables required for indexer database connection
*/
export interface IndexerDatabaseConfig {
INDEXER_DATABASE_HOST: string;
INDEXER_DATABASE_PORT: string;
INDEXER_DATABASE_USERNAME: string;
INDEXER_DATABASE_PASSWORD: string;
INDEXER_DATABASE_NAME: string;
INDEXER_DATABASE_SSL?: string;
}

/**
* Validate required database configuration
*/
export function validateIndexerDatabaseConfig(
config: Partial<IndexerDatabaseConfig>,
): asserts config is IndexerDatabaseConfig {
const missingKeys = [
["INDEXER_DATABASE_HOST", config.INDEXER_DATABASE_HOST],
["INDEXER_DATABASE_PORT", config.INDEXER_DATABASE_PORT],
["INDEXER_DATABASE_USERNAME", config.INDEXER_DATABASE_USERNAME],
["INDEXER_DATABASE_PASSWORD", config.INDEXER_DATABASE_PASSWORD],
["INDEXER_DATABASE_NAME", config.INDEXER_DATABASE_NAME],
]
.filter(([, value]) => !value)
.map(([key]) => key);

if (missingKeys.length > 0) {
throw new Error(
`Missing required indexer database env vars: ${missingKeys.join(", ")}`,
);
}
}

/**
* Create indexer data source
*/
export function createIndexerDataSource(
config: IndexerDatabaseConfig,
): DataSource {
const port = Number(config.INDEXER_DATABASE_PORT);
const useSsl = config.INDEXER_DATABASE_SSL === "true";

return new DataSource({
host: config.INDEXER_DATABASE_HOST,
port: Number.isNaN(port) ? 5432 : port,
username: config.INDEXER_DATABASE_USERNAME,
password: config.INDEXER_DATABASE_PASSWORD,
database: config.INDEXER_DATABASE_NAME,
type: "postgres",
connectTimeoutMS: 5000,
synchronize: false,
logging: process.env.NODE_ENV === "development",
entities: [IndexedEventEntity],
migrations: ["src/migrations/*.js"],
...(useSsl ? { ssl: { rejectUnauthorized: false } } : {}),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔒 Security & Privacy | 🟠 Major | ⚡ Quick win

Do not disable TLS certificate verification by default.

Using rejectUnauthorized: false disables server cert validation and weakens DB transport security in production-like environments.

Suggested fix
-    ...(useSsl ? { ssl: { rejectUnauthorized: false } } : {}),
+    ...(useSsl ? { ssl: true } : {}),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
...(useSsl ? { ssl: { rejectUnauthorized: false } } : {}),
...(useSsl ? { ssl: true } : {}),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/db/data-source.ts` at line 61, The TLS configuration in
the data source setup currently disables certificate verification by default via
the ssl settings, which weakens transport security. Update the db connection
logic in data-source.ts so the DataSource/connection options no longer hardcode
rejectUnauthorized: false; instead, keep certificate validation enabled by
default and only relax it behind an explicit opt-in path using the useSsl
configuration flow.

});
}

/**
* Singleton instance of indexer data source
*/
let indexerDataSource: DataSource | null = null;

/**
* Get or initialize the indexer data source
*/
export function getIndexerDataSource(): DataSource {
if (!indexerDataSource) {
throw new Error(
"Indexer data source not initialized. Call initializeIndexerDataSource first.",
);
}
return indexerDataSource;
}

/**
* Initialize indexer data source with environment configuration
*/
export async function initializeIndexerDataSource(
config: IndexerDatabaseConfig,
): Promise<DataSource> {
validateIndexerDatabaseConfig(config);

if (indexerDataSource?.isInitialized) {
return indexerDataSource;
}

indexerDataSource = createIndexerDataSource(config);
await indexerDataSource.initialize();

return indexerDataSource;
Comment on lines +85 to +97

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Guard singleton initialization against concurrent calls.

Concurrent initializeIndexerDataSource() calls can race and create multiple DataSource instances before initialization completes.

Suggested fix
 let indexerDataSource: DataSource | null = null;
+let indexerDataSourceInitPromise: Promise<DataSource> | null = null;
@@
 export async function initializeIndexerDataSource(
   config: IndexerDatabaseConfig,
 ): Promise<DataSource> {
   validateIndexerDatabaseConfig(config);
-  
+
   if (indexerDataSource?.isInitialized) {
     return indexerDataSource;
   }
+  if (indexerDataSourceInitPromise) {
+    return indexerDataSourceInitPromise;
+  }
 
-  indexerDataSource = createIndexerDataSource(config);
-  await indexerDataSource.initialize();
-  
-  return indexerDataSource;
+  indexerDataSource = createIndexerDataSource(config);
+  indexerDataSourceInitPromise = indexerDataSource.initialize().then(() => indexerDataSource!);
+  try {
+    await indexerDataSourceInitPromise;
+    return indexerDataSource;
+  } finally {
+    indexerDataSourceInitPromise = null;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function initializeIndexerDataSource(
config: IndexerDatabaseConfig,
): Promise<DataSource> {
validateIndexerDatabaseConfig(config);
if (indexerDataSource?.isInitialized) {
return indexerDataSource;
}
indexerDataSource = createIndexerDataSource(config);
await indexerDataSource.initialize();
return indexerDataSource;
let indexerDataSource: DataSource | null = null;
let indexerDataSourceInitPromise: Promise<DataSource> | null = null;
export async function initializeIndexerDataSource(
config: IndexerDatabaseConfig,
): Promise<DataSource> {
validateIndexerDatabaseConfig(config);
if (indexerDataSource?.isInitialized) {
return indexerDataSource;
}
if (indexerDataSourceInitPromise) {
return indexerDataSourceInitPromise;
}
indexerDataSource = createIndexerDataSource(config);
indexerDataSourceInitPromise = indexerDataSource.initialize().then(() => indexerDataSource!);
try {
await indexerDataSourceInitPromise;
return indexerDataSource;
} finally {
indexerDataSourceInitPromise = null;
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@indexer/common/src/db/data-source.ts` around lines 85 - 97, Guard
initializeIndexerDataSource against concurrent calls by reusing a single
in-flight initialization promise instead of creating a new DataSource for each
caller. Update the initializeIndexerDataSource function and its
indexerDataSource singleton handling so that if initialization is already in
progress, later calls await the same promise and return the same initialized
instance. Make sure the createIndexerDataSource and initialize() path only runs
once until completion.

}

/**
* Close indexer data source connection
*/
export async function closeIndexerDataSource(): Promise<void> {
if (indexerDataSource?.isInitialized) {
await indexerDataSource.destroy();
indexerDataSource = null;
}
}
13 changes: 13 additions & 0 deletions indexer/common/src/db/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export { IndexedEventEntity } from "./indexed-event.entity.js";
export {
IndexedEventRepository,
type IndexedEventData,
} from "./indexed-event.repository.js";
export {
type IndexerDatabaseConfig,
validateIndexerDatabaseConfig,
createIndexerDataSource,
getIndexerDataSource,
initializeIndexerDataSource,
closeIndexerDataSource,
} from "./data-source.js";
48 changes: 48 additions & 0 deletions indexer/common/src/db/indexed-event.entity.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { describe, expect, test, beforeEach } from "vitest";
import { IndexedEventEntity } from "./indexed-event.entity.js";

describe("IndexedEventEntity", () => {
let entity: IndexedEventEntity;

beforeEach(() => {
entity = new IndexedEventEntity();
entity.contractId = "CAFEBABE";
entity.ledgerNumber = BigInt(123456);
entity.transactionHash = "0x1234567890abcdef";
entity.eventIndex = 0;
entity.eventData = { type: "TestEvent", amount: "100" };
entity.eventTopics = ["topic1", "topic2"];
entity.processedBy = "streams";
});

test("generates ULID when id is not provided", () => {
entity.generateId();
expect(entity.id).toBeDefined();
expect(entity.id.length).toBe(26); // ULID length
expect(entity.id).toMatch(/^[0-9A-Z]{26}$/);
});

test("preserves existing id", () => {
const existingId = "01J0XYZABCDEFGHIJKLMNOPQR";
entity.id = existingId;
entity.generateId();
expect(entity.id).toBe(existingId);
});

test("has required fields for deterministic identity", () => {
expect(entity.contractId).toBe("CAFEBABE");
expect(entity.ledgerNumber).toBe(BigInt(123456));
expect(entity.transactionHash).toBe("0x1234567890abcdef");
expect(entity.eventIndex).toBe(0);
});

test("has JSON data fields", () => {
expect(entity.eventData).toEqual({ type: "TestEvent", amount: "100" });
expect(entity.eventTopics).toEqual(["topic1", "topic2"]);
});

test("has metadata fields", () => {
expect(entity.processedBy).toBe("streams");
expect(entity.createdAt).toBeUndefined(); // Will be set by DB
});
});
Loading