Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions comebackhere-backend/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import express from "express"
import invoicesRouter from "./routes/invoices.js"
import disputesRouter from "./routes/disputes.js"
import treasuryRouter from "./routes/treasury.js"
import complianceRouter from "./routes/compliance.js"
import invoiceSettingsRouter from "./routes/invoice-settings.js"

export function createApp() {
const app = express()
app.use(express.json())
app.use("/invoices", invoicesRouter)
app.use("/disputes", disputesRouter)
app.use("/api/treasury", treasuryRouter)
app.use("/api/compliance", complianceRouter)
app.use("/api/invoice", invoiceSettingsRouter)
app.use("/compliance", complianceRouter)
return app
}
189 changes: 189 additions & 0 deletions comebackhere-backend/src/indexer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* Invoice event indexer — #69
*
* Polls Soroban for invoice contract events (invoice_created, invoice_paid,
* invoice_expired, invoice_cancelled, escrow_released) using cursor-based
* pagination so missed events and re-org recovery are handled automatically.
*
* Usage (standalone):
* SOROBAN_RPC_URL=... INVOICE_CONTRACT_ID=... node dist/indexer.js
*
* Usage (embedded): call startIndexer() from index.ts or a worker.
*/

import { SorobanRpc, xdr } from "stellar-sdk"

// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------

export type InvoiceEventType =
| "invoice_created"
| "invoice_paid"
| "invoice_expired"
| "invoice_cancelled"
| "escrow_released"

export interface InvoiceStateTransition {
event_type: InvoiceEventType
invoice_id: string
ledger: number
ledger_closed_at: string
transaction_hash: string
contract_id: string
raw_topics: string[]
raw_value: string
}

const TRACKED_EVENTS = new Set<string>([
"invoice_created",
"invoice_paid",
"invoice_expired",
"invoice_cancelled",
"escrow_released",
])

// ---------------------------------------------------------------------------
// Cursor persistence (in-memory with optional env override for restarts)
// ---------------------------------------------------------------------------

let cursor: string = process.env.INDEXER_START_CURSOR ?? "0"

function saveCursor(next: string): void {
cursor = next
// In production swap this for a DB or Redis write so restarts resume cleanly.
// e.g.: await redis.set("invoice_indexer_cursor", next)
}

// ---------------------------------------------------------------------------
// Event parsing
// ---------------------------------------------------------------------------

function parseEventType(topics: xdr.ScVal[]): InvoiceEventType | null {
// Soroban contract events encode the event name as the first topic symbol.
const name = topics[0]?.sym()?.toString()
if (!name || !TRACKED_EVENTS.has(name)) return null
return name as InvoiceEventType
}

function parseInvoiceId(topics: xdr.ScVal[]): string {
// Convention: second topic is the invoice_id (u32 or u64).
const id = topics[1]?.u32() ?? topics[1]?.u64()
return id?.toString() ?? "unknown"
}

function scValToString(val: xdr.ScVal): string {
try {
return val.toXDR("base64")
} catch {
return ""
}
}

// ---------------------------------------------------------------------------
// Persistence stub
// ---------------------------------------------------------------------------

/**
* Persist a state transition record. Replace with real DB writes in production.
* e.g.: await db.invoiceEvents.insert(transition)
*/
export function persistTransition(transition: InvoiceStateTransition): void {
console.log(
`[indexer] ${transition.event_type} invoice_id=${transition.invoice_id}` +
` ledger=${transition.ledger} tx=${transition.transaction_hash}`
)
}

// ---------------------------------------------------------------------------
// Core poll loop
// ---------------------------------------------------------------------------

export async function pollOnce(
server: SorobanRpc.Server,
contractId: string
): Promise<void> {
const response = await (server as any).getEvents({
startLedger: cursor === "0" ? undefined : undefined,
cursor: cursor === "0" ? undefined : cursor,
filters: [
{
type: "contract",
contractIds: [contractId],
},
],
limit: 100,
})

const events: any[] = response?.events ?? []

for (const event of events) {
const topics: xdr.ScVal[] = (event.topic ?? []).map((t: string) =>
xdr.ScVal.fromXDR(t, "base64")
)
const eventType = parseEventType(topics)
if (!eventType) continue

const rawValue = event.value?.xdr ?? ""
const transition: InvoiceStateTransition = {
event_type: eventType,
invoice_id: parseInvoiceId(topics),
ledger: event.ledger,
ledger_closed_at: event.ledgerClosedAt ?? new Date().toISOString(),
transaction_hash: event.txHash ?? "",
contract_id: contractId,
raw_topics: (event.topic ?? []) as string[],
raw_value: rawValue,
}

persistTransition(transition)
}

// Advance cursor to the last seen event's paging token for re-org safety.
if (events.length > 0) {
saveCursor(events[events.length - 1].pagingToken)
}
}

// ---------------------------------------------------------------------------
// Start function — exported for embedding; also runs as CLI entry point
// ---------------------------------------------------------------------------

export async function startIndexer(options?: {
rpcUrl?: string
contractId?: string
pollIntervalMs?: number
onError?: (err: unknown) => void
}): Promise<void> {
const rpcUrl = options?.rpcUrl ?? process.env.SOROBAN_RPC_URL
const contractId = options?.contractId ?? process.env.INVOICE_CONTRACT_ID
const pollIntervalMs = options?.pollIntervalMs ?? 5_000

if (!rpcUrl || !contractId) {
throw new Error("startIndexer: SOROBAN_RPC_URL and INVOICE_CONTRACT_ID are required")
}

const server = new SorobanRpc.Server(rpcUrl)

console.log(`[indexer] starting — contract=${contractId} cursor=${cursor} interval=${pollIntervalMs}ms`)

const loop = async () => {
try {
await pollOnce(server, contractId)
} catch (err) {
const handler = options?.onError ?? ((e) => console.error("[indexer] poll error", e))
handler(err)
}
setTimeout(loop, pollIntervalMs)
}

loop()
}

// Run as standalone entry point
if (import.meta.url === new URL(process.argv[1], import.meta.url).href) {
startIndexer().catch((err) => {
console.error("[indexer] fatal", err)
process.exit(1)
})
}
Loading