Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 286 additions & 0 deletions indexer/bun.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions indexer/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
"scripts": {
"build": "tsc -p tsconfig.json",
"codegen": "bun --print \"'common: no codegen configured yet'\"",
"db:migrate": "bun run src/db/migrate.ts",
"dev": "bun --watch src/index.ts",
"lint": "biome check .",
"test": "vitest run src",
"type-check": "tsc -p tsconfig.json --noEmit"
},
"dependencies": {
"postgres": "3.4.5"
},
"devDependencies": {
"@types/node": "22.15.21"
}
}
206 changes: 206 additions & 0 deletions indexer/common/src/cursor/ledger-cursor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import { describe, expect, it } from "vitest";

import { type LedgerCursor, type SqlClient, findCursor, upsertCursor } from "./ledger-cursor.js";

// ---------------------------------------------------------------------------
// In-memory SQL mock
// ---------------------------------------------------------------------------

/**
* Creates a lightweight in-memory store that mimics the ledger_cursor table
* and returns a mock SqlClient backed by it.
*/
function makeStore(): {
sql: SqlClient;
rows: Map<string, LedgerCursor>;
} {
let nextId = 1n;
const rows = new Map<string, LedgerCursor>();

const storeKey = (source: string, domain: string) => `${source}|${domain}`;

const sql: SqlClient = (template: TemplateStringsArray, ...values: unknown[]) => {
// Reconstruct a rough SQL string so we can branch on the operation type.
const query = template.reduce(
(acc, part, i) => acc + part + (values[i] !== undefined ? String(values[i]) : ""),
"",
);

const normalised = query.replace(/\s+/g, " ").trim().toUpperCase();

// ---- SELECT (findCursor) ----
if (normalised.startsWith("SELECT")) {
// Extract the two positional values: source and domain.
const [source, domain] = values as [string, string];
const row = rows.get(storeKey(source, domain));
return Promise.resolve(row ? [row] : []) as Promise<LedgerCursor[]>;
}

// ---- INSERT … ON CONFLICT … RETURNING (upsertCursor) ----
if (normalised.startsWith("INSERT")) {
const [source, domain, lastLedgerSeq] = values as [string, string, bigint];
const key = storeKey(source, domain);
const existing = rows.get(key);
const now = new Date();

if (!existing) {
const newRow: LedgerCursor = {
id: nextId++,
source,
domain,
last_ledger_seq: lastLedgerSeq,
created_at: now,
updated_at: now,
};
rows.set(key, newRow);
return Promise.resolve([newRow]) as Promise<LedgerCursor[]>;
}

// Simulate the WHERE clause: only advance if the new seq is greater.
const existingSeq = existing.last_ledger_seq;
if (existingSeq === null || existingSeq < lastLedgerSeq) {
const updated: LedgerCursor = {
...existing,
last_ledger_seq: lastLedgerSeq,
updated_at: now,
};
rows.set(key, updated);
return Promise.resolve([updated]) as Promise<LedgerCursor[]>;
}

// Update suppressed by WHERE — return empty to trigger the fallback path.
return Promise.resolve([]) as Promise<LedgerCursor[]>;
}

return Promise.resolve([]) as Promise<LedgerCursor[]>;
};

return { sql, rows };
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

describe("findCursor", () => {
it("returns null when no cursor exists for the given source/domain", async () => {
const { sql } = makeStore();
const result = await findCursor(sql, "testnet", "streams");
expect(result).toBeNull();
});

it("returns the cursor row after it has been created", async () => {
const { sql } = makeStore();
await upsertCursor(sql, "testnet", "streams", 100n);

const cursor = await findCursor(sql, "testnet", "streams");
expect(cursor).not.toBeNull();
expect(cursor?.source).toBe("testnet");
expect(cursor?.domain).toBe("streams");
expect(cursor?.last_ledger_seq).toBe(100n);
});

it("returns null for an unrelated domain on the same source", async () => {
const { sql } = makeStore();
await upsertCursor(sql, "testnet", "streams", 50n);

const result = await findCursor(sql, "testnet", "distributions");
expect(result).toBeNull();
});

it("throws when source is empty", async () => {
const { sql } = makeStore();
await expect(findCursor(sql, "", "streams")).rejects.toThrow("source must not be empty");
});

it("throws when domain is empty", async () => {
const { sql } = makeStore();
await expect(findCursor(sql, "testnet", "")).rejects.toThrow("domain must not be empty");
});
});

describe("upsertCursor", () => {
it("inserts a new row when none exists", async () => {
const { sql, rows } = makeStore();
const cursor = await upsertCursor(sql, "testnet", "streams", 42n);

expect(cursor.source).toBe("testnet");
expect(cursor.domain).toBe("streams");
expect(cursor.last_ledger_seq).toBe(42n);
expect(rows.size).toBe(1);
});

it("advances the cursor when the new ledger seq is greater", async () => {
const { sql } = makeStore();
await upsertCursor(sql, "testnet", "streams", 100n);
const cursor = await upsertCursor(sql, "testnet", "streams", 200n);

expect(cursor.last_ledger_seq).toBe(200n);
});

it("does not roll back the cursor when a stale (lower) seq is given", async () => {
const { sql } = makeStore();
await upsertCursor(sql, "testnet", "streams", 200n);
const cursor = await upsertCursor(sql, "testnet", "streams", 100n);

// The stored value must remain 200 (the higher seq).
expect(cursor.last_ledger_seq).toBe(200n);
});

it("does not roll back the cursor when the same seq is given again (idempotent)", async () => {
const { sql } = makeStore();
await upsertCursor(sql, "testnet", "streams", 150n);
const cursor = await upsertCursor(sql, "testnet", "streams", 150n);

expect(cursor.last_ledger_seq).toBe(150n);
});

it("keeps separate cursors per domain on the same source", async () => {
const { sql } = makeStore();
await upsertCursor(sql, "testnet", "streams", 10n);
await upsertCursor(sql, "testnet", "distributions", 20n);

const streamsCursor = await findCursor(sql, "testnet", "streams");
const distCursor = await findCursor(sql, "testnet", "distributions");

expect(streamsCursor?.last_ledger_seq).toBe(10n);
expect(distCursor?.last_ledger_seq).toBe(20n);
});

it("keeps separate cursors per source on the same domain", async () => {
const { sql } = makeStore();
await upsertCursor(sql, "testnet", "streams", 10n);
await upsertCursor(sql, "mainnet", "streams", 99n);

const testnetCursor = await findCursor(sql, "testnet", "streams");
const mainnetCursor = await findCursor(sql, "mainnet", "streams");

expect(testnetCursor?.last_ledger_seq).toBe(10n);
expect(mainnetCursor?.last_ledger_seq).toBe(99n);
});

it("throws when source is empty", async () => {
const { sql } = makeStore();
await expect(upsertCursor(sql, "", "streams", 1n)).rejects.toThrow("source must not be empty");
});

it("throws when domain is empty", async () => {
const { sql } = makeStore();
await expect(upsertCursor(sql, "testnet", "", 1n)).rejects.toThrow("domain must not be empty");
});

it("throws when lastLedgerSeq is negative", async () => {
const { sql } = makeStore();
await expect(upsertCursor(sql, "testnet", "streams", -1n)).rejects.toThrow(
"lastLedgerSeq must be >= 0",
);
});

it("stores timestamps on the returned cursor", async () => {
const { sql } = makeStore();
const cursor = await upsertCursor(sql, "testnet", "streams", 1n);

expect(cursor.created_at).toBeInstanceOf(Date);
expect(cursor.updated_at).toBeInstanceOf(Date);
});
});
111 changes: 111 additions & 0 deletions indexer/common/src/cursor/ledger-cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Ledger cursor — durable progress tracking for the Soroban event poller.
*
* A cursor row records the last successfully processed ledger sequence number
* for a given (source, domain) pair. The poller reads the cursor on startup
* to resume from where it left off, and advances the cursor after every
* successfully processed ledger batch.
*
* See: indexer/common/src/db/migrations/0001_create_ledger_cursor.sql
*/

// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------

/**
* Shape of a row returned from the ledger_cursor table.
*
* - `source` identifies the Soroban RPC endpoint / network being polled.
* - `domain` is the indexer package that owns this cursor ("streams",
* "distributions", etc.).
* - `last_ledger_seq` is null until the first batch commits successfully.
*/
export interface LedgerCursor {
readonly id: bigint;
readonly source: string;
readonly domain: string;
readonly last_ledger_seq: bigint | null;
readonly created_at: Date;
readonly updated_at: Date;
}

/**
* Minimal subset of a `postgres` Sql instance used by the cursor functions.
* Typed against the parts we actually call so callers can inject a typed or
* mocked sql object in tests without importing `postgres` directly.
*/
// biome-ignore lint/suspicious/noExplicitAny: intentional escape-hatch for the tagged-template signature
export type SqlClient = (template: TemplateStringsArray, ...values: any[]) => Promise<unknown[]>;

// ---------------------------------------------------------------------------
// Repository functions
// ---------------------------------------------------------------------------

/**
* Returns the cursor for (source, domain), or null if none exists yet.
*/
export async function findCursor(
sql: SqlClient,
source: string,
domain: string,
): Promise<LedgerCursor | null> {
if (source === "") throw new Error("source must not be empty");
if (domain === "") throw new Error("domain must not be empty");

const rows = (await sql`
SELECT id, source, domain, last_ledger_seq, created_at, updated_at
FROM ledger_cursor
WHERE source = ${source}
AND domain = ${domain}
LIMIT 1
`) as LedgerCursor[];

return rows[0] ?? null;
}

/**
* Upserts the cursor for (source, domain), setting last_ledger_seq to the
* given value. Returns the updated row.
*
* On INSERT (first call for this pair) the row is created with the supplied
* ledger sequence.
*
* On conflict the existing row's last_ledger_seq and updated_at are updated
* only if the new sequence is strictly greater than the stored one. This
* prevents a stale write from rolling back progress.
*/
export async function upsertCursor(
sql: SqlClient,
source: string,
domain: string,
lastLedgerSeq: bigint,
): Promise<LedgerCursor> {
if (source === "") throw new Error("source must not be empty");
if (domain === "") throw new Error("domain must not be empty");
if (lastLedgerSeq < 0n) throw new Error("lastLedgerSeq must be >= 0");

const rows = (await sql`
INSERT INTO ledger_cursor (source, domain, last_ledger_seq, updated_at)
VALUES (${source}, ${domain}, ${lastLedgerSeq}, NOW())
ON CONFLICT (source, domain) DO UPDATE
SET last_ledger_seq = EXCLUDED.last_ledger_seq,
updated_at = EXCLUDED.updated_at
WHERE ledger_cursor.last_ledger_seq IS NULL
OR ledger_cursor.last_ledger_seq < EXCLUDED.last_ledger_seq
RETURNING id, source, domain, last_ledger_seq, created_at, updated_at
`) as LedgerCursor[];

const row = rows[0];
if (!row) {
// The WHERE clause suppressed the update (attempted regression); return
// the existing row unchanged.
const existing = await findCursor(sql, source, domain);
if (!existing) {
throw new Error(`Cursor (${source}, ${domain}) vanished unexpectedly after upsert`);
}
return existing;
}

return row;
}
Loading