From a5326377160029af4bce78fc392ff6172a875b1e Mon Sep 17 00:00:00 2001 From: ProtonsAndElectrons Date: Thu, 25 Jun 2026 02:41:54 +0200 Subject: [PATCH] add soroban event indexer --- .env.example | 8 ++ README.md | 32 ++++++ src/config.js | 8 ++ src/index.js | 6 ++ src/indexer/eventParser.js | 118 ++++++++++++++++++++++ src/indexer/eventPoller.js | 124 +++++++++++++++++++++++ src/indexer/eventStore.js | 200 +++++++++++++++++++++++++++++++++++++ src/indexer/runtime.js | 3 + src/routes/indexer.js | 86 ++++++++++++++++ test/eventPoller.test.js | 98 ++++++++++++++++++ test/indexerParser.test.js | 93 +++++++++++++++++ test/indexerRoutes.test.js | 102 +++++++++++++++++++ test/indexerStore.test.js | 115 +++++++++++++++++++++ 13 files changed, 993 insertions(+) create mode 100644 src/indexer/eventParser.js create mode 100644 src/indexer/eventPoller.js create mode 100644 src/indexer/eventStore.js create mode 100644 src/indexer/runtime.js create mode 100644 src/routes/indexer.js create mode 100644 test/eventPoller.test.js create mode 100644 test/indexerParser.test.js create mode 100644 test/indexerRoutes.test.js create mode 100644 test/indexerStore.test.js diff --git a/.env.example b/.env.example index 783c72f..b2966e3 100644 --- a/.env.example +++ b/.env.example @@ -9,6 +9,14 @@ REDIS_PASSWORD= # Stellar Horizon STELLAR_HORIZON_URL=https://horizon.stellar.org +# Soroban event indexer +SOROBAN_RPC_URL=https://soroban-rpc.mainnet.stellar.gateway.fm +SMARTDROP_CONTRACT_ID= +INDEXER_ENABLED=true +INDEXER_POLL_INTERVAL_MS=5000 +INDEXER_POLL_LIMIT=100 +INDEXER_START_LEDGER=0 + # Stellar USDC Issuer USDC_ISSUER=GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335AX2OBFLDTQLNUEHRGPTM6RIA diff --git a/README.md b/README.md index 986083f..f502db6 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,23 @@ Multi-source price oracle that fetches and caches USD prices for Stellar assets. - Price anomaly logging (>10% changes) - Fallback chain: DEX → CoinGecko → CoinMarketCap → cached +### Soroban Event Indexer + +Polls Soroban RPC for SmartDrop contract events and stores decoded event state in Redis so the API can answer claim-status queries without live RPC calls on every request. + +**Indexed events:** +- `airdrop_created` +- `recipient_added` +- `token_claimed` +- `airdrop_expired` + +**Features:** +- Configurable contract ID, RPC URL, poll interval, poll limit, and start ledger +- Last indexed ledger checkpoint persisted in Redis +- Raw XDR and decoded event data retained for each indexed event +- Aggregated airdrop status, recipient lists, recipient claim history, and indexer status endpoints +- RPC errors are logged and the poller continues on the next interval + ## Setup ### Prerequisites @@ -85,6 +102,12 @@ cp .env.example .env | `REDIS_PORT` | Redis server port | 6379 | No | | `REDIS_PASSWORD` | Redis password | undefined | No | | `STELLAR_HORIZON_URL` | Horizon API URL | https://horizon.stellar.org | No | +| `SOROBAN_RPC_URL` | Soroban RPC URL for contract event polling | https://soroban-rpc.mainnet.stellar.gateway.fm | No | +| `SMARTDROP_CONTRACT_ID` | SmartDrop contract ID to index | undefined | Yes, for indexer | +| `INDEXER_ENABLED` | Enable Soroban event polling | true | No | +| `INDEXER_POLL_INTERVAL_MS` | Soroban event polling interval in milliseconds | 5000 | No | +| `INDEXER_POLL_LIMIT` | Maximum events requested per poll | 100 | No | +| `INDEXER_START_LEDGER` | First ledger to scan when no checkpoint exists | 0 | No | | `USDC_ISSUER` | USDC issuer address | GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335AX2OBFLDTQLNUEHRGPTM6RIA | No | | `COINGECKO_API_KEY` | CoinGecko API key | undefined | No | | `COINMARKETCAP_API_KEY` | CoinMarketCap API key | undefined | No | @@ -148,6 +171,15 @@ GET /health } ``` +### Indexed Airdrop Data + +``` +GET /api/v1/airdrops/:id/status +GET /api/v1/airdrops/:id/recipients +GET /api/v1/recipients/:address/claims +GET /api/v1/indexer/status +``` + ## Usage Examples ### Fetch XLM Price diff --git a/src/config.js b/src/config.js index b99a1ac..632ac06 100644 --- a/src/config.js +++ b/src/config.js @@ -9,8 +9,16 @@ module.exports = { }, stellar: { horizonUrl: process.env.STELLAR_HORIZON_URL || 'https://horizon.stellar.org', + sorobanRpcUrl: process.env.SOROBAN_RPC_URL || 'https://soroban-rpc.mainnet.stellar.gateway.fm', usdcIssuer: process.env.USDC_ISSUER || 'GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335AX2OBFLDTQLNUEHRGPTM6RIA', }, + indexer: { + enabled: process.env.INDEXER_ENABLED !== 'false', + contractId: process.env.SMARTDROP_CONTRACT_ID || '', + pollIntervalMs: parseInt(process.env.INDEXER_POLL_INTERVAL_MS, 10) || 5000, + pollLimit: parseInt(process.env.INDEXER_POLL_LIMIT, 10) || 100, + startLedger: parseInt(process.env.INDEXER_START_LEDGER, 10) || 0, + }, coingecko: { apiKey: process.env.COINGECKO_API_KEY || '', baseUrl: 'https://api.coingecko.com/api/v3', diff --git a/src/index.js b/src/index.js index 07299e0..96b8ea2 100644 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,8 @@ const priceRefreshJob = require('./jobs/priceRefresh'); const buildCorsMiddleware = require('./middleware/cors'); const pricesRouter = require('./routes/prices'); const alertsRouter = require('./routes/alerts'); +const indexerRouter = require('./routes/indexer'); +const indexerPoller = require('./indexer/runtime'); const app = express(); @@ -26,6 +28,7 @@ app.get('/health', (req, res) => { app.use('/api/v1', pricesRouter); app.use('/api/v1', alertsRouter); +app.use('/api/v1', indexerRouter); app.use((err, req, res, _next) => { const status = err.status || 500; @@ -36,11 +39,13 @@ app.use((err, req, res, _next) => { const server = app.listen(config.port, () => { logger.info(`SmartDrop backend running on port ${config.port}`); priceRefreshJob.start(); + indexerPoller.start(); }); process.on('SIGTERM', async () => { logger.info('SIGTERM received, shutting down'); priceRefreshJob.stop(); + indexerPoller.stop(); server.close(); await cache.disconnect(); process.exit(0); @@ -49,6 +54,7 @@ process.on('SIGTERM', async () => { process.on('SIGINT', async () => { logger.info('SIGINT received, shutting down'); priceRefreshJob.stop(); + indexerPoller.stop(); server.close(); await cache.disconnect(); process.exit(0); diff --git a/src/indexer/eventParser.js b/src/indexer/eventParser.js new file mode 100644 index 0000000..f338a3d --- /dev/null +++ b/src/indexer/eventParser.js @@ -0,0 +1,118 @@ +const crypto = require('crypto'); +const { scValToNative } = require('stellar-sdk'); + +const EVENT_FIELDS = { + airdrop_created: ['airdrop_id', 'creator', 'token', 'total_amount', 'expiry_ledger'], + recipient_added: ['airdrop_id', 'recipient', 'amount'], + token_claimed: ['airdrop_id', 'recipient', 'amount', 'ledger'], + airdrop_expired: ['airdrop_id', 'unclaimed_amount'], +}; + +const EVENT_NAMES = Object.keys(EVENT_FIELDS); + +function toJsonSafe(value) { + if (typeof value === 'bigint') return value.toString(); + if (Buffer.isBuffer(value)) return value.toString('base64'); + if (Array.isArray(value)) return value.map(toJsonSafe); + if (value && typeof value === 'object') { + return Object.fromEntries(Object.entries(value).map(([key, val]) => [key, toJsonSafe(val)])); + } + return value; +} + +function xdrBase64(scVal) { + if (!scVal || typeof scVal.toXDR !== 'function') return null; + return scVal.toXDR('base64'); +} + +function decodeScVal(scVal) { + if (scVal === undefined || scVal === null) return null; + return toJsonSafe(scValToNative(scVal)); +} + +function normalizeEventName(value) { + if (typeof value !== 'string') return null; + return EVENT_NAMES.includes(value) ? value : null; +} + +function dataFromValue(eventName, value, topicHintCount = 0) { + if (value && !Array.isArray(value) && typeof value === 'object') { + return value; + } + + const fields = EVENT_FIELDS[eventName]; + if (Array.isArray(value)) { + const valueFields = topicHintCount > 0 && value.length < fields.length + ? fields.slice(fields.length - value.length) + : fields; + + return Object.fromEntries(valueFields.map((field, index) => [field, value[index] ?? null])); + } + + return { value }; +} + +function mergeTopicHints(eventName, data, topics) { + const eventNameIndex = topics.findIndex((topic) => topic === eventName); + const topicHints = eventNameIndex >= 0 ? topics.slice(eventNameIndex + 1) : []; + const merged = { ...data }; + + if (merged.airdrop_id == null && topicHints[0] != null) merged.airdrop_id = topicHints[0]; + if (merged.recipient == null && topicHints[1] != null) merged.recipient = topicHints[1]; + + return merged; +} + +function eventId(event) { + if (event.id) return String(event.id); + const fallback = `${event.ledger}:${event.pagingToken}:${JSON.stringify(event.topic || [])}`; + return crypto.createHash('sha256').update(fallback).digest('hex'); +} + +function contractIdToString(contractId) { + if (!contractId) return null; + if (typeof contractId === 'string') return contractId; + if (typeof contractId.toString === 'function') return contractId.toString(); + return String(contractId); +} + +function parseContractEvent(event) { + const nativeTopics = (event.topic || []).map(decodeScVal); + const eventName = nativeTopics.map(normalizeEventName).find(Boolean); + + if (!eventName) return null; + + const decodedValue = decodeScVal(event.value); + const eventNameIndex = nativeTopics.findIndex((topic) => topic === eventName); + const topicHintCount = eventNameIndex >= 0 ? nativeTopics.length - eventNameIndex - 1 : 0; + const data = mergeTopicHints(eventName, dataFromValue(eventName, decodedValue, topicHintCount), nativeTopics); + + return { + id: eventId(event), + event_name: eventName, + type: event.type, + ledger: event.ledger, + ledger_closed_at: event.ledgerClosedAt || null, + paging_token: event.pagingToken || null, + contract_id: contractIdToString(event.contractId), + in_successful_contract_call: event.inSuccessfulContractCall !== false, + data, + decoded: { + topics: nativeTopics, + value: decodedValue, + }, + raw_xdr: { + topics: (event.topic || []).map(xdrBase64), + value: xdrBase64(event.value), + }, + indexed_at: new Date().toISOString(), + }; +} + +module.exports = { + EVENT_FIELDS, + EVENT_NAMES, + decodeScVal, + parseContractEvent, + toJsonSafe, +}; diff --git a/src/indexer/eventPoller.js b/src/indexer/eventPoller.js new file mode 100644 index 0000000..568b40f --- /dev/null +++ b/src/indexer/eventPoller.js @@ -0,0 +1,124 @@ +const { SorobanRpc } = require('stellar-sdk'); +const config = require('../config'); +const logger = require('../logger'); +const eventStore = require('./eventStore'); +const { parseContractEvent } = require('./eventParser'); + +class EventPoller { + constructor(options = {}) { + this.contractId = options.contractId ?? config.indexer.contractId; + this.pollIntervalMs = options.pollIntervalMs ?? config.indexer.pollIntervalMs; + this.pollLimit = options.pollLimit ?? config.indexer.pollLimit; + this.startLedger = options.startLedger ?? config.indexer.startLedger; + this.enabled = options.enabled ?? config.indexer.enabled; + this.store = options.store || eventStore; + this.logger = options.logger || logger; + this.server = options.server || new SorobanRpc.Server(options.rpcUrl || config.stellar.sorobanRpcUrl); + this.timer = null; + this.lastRun = null; + this.lastError = null; + this.latestLedger = null; + } + + isConfigured() { + return this.enabled && Boolean(this.contractId); + } + + getStatus() { + return { + enabled: this.enabled, + configured: this.isConfigured(), + running: this.timer !== null, + contract_id: this.contractId || null, + poll_interval_ms: this.pollIntervalMs, + poll_limit: this.pollLimit, + last_run: this.lastRun, + last_error: this.lastError, + latest_ledger: this.latestLedger, + }; + } + + async pollOnce() { + if (!this.isConfigured()) { + return { skipped: true, reason: 'SMARTDROP_CONTRACT_ID not configured' }; + } + + const previousLedger = await this.store.getLastLedger(null); + const startLedger = previousLedger == null + ? this.startLedger || 0 + : Math.max(Number(previousLedger) + 1, this.startLedger || 0); + + const response = await this.server.getEvents({ + startLedger, + filters: [ + { + type: 'contract', + contractIds: [this.contractId], + }, + ], + limit: this.pollLimit, + }); + + const parsedEvents = (response.events || []) + .map(parseContractEvent) + .filter(Boolean); + + for (const event of parsedEvents) { + await this.store.saveEvent(event); + } + + const latestIndexedLedger = Math.max( + response.latestLedger || previousLedger, + ...parsedEvents.map((event) => event.ledger) + ); + await this.store.setLastLedger(latestIndexedLedger); + + this.latestLedger = response.latestLedger || null; + this.lastRun = new Date().toISOString(); + this.lastError = null; + + return { + skipped: false, + start_ledger: startLedger, + latest_ledger: response.latestLedger, + indexed_events: parsedEvents.length, + }; + } + + start() { + if (this.timer || !this.enabled) return; + if (!this.contractId) { + this.logger.warn('SmartDrop indexer disabled: SMARTDROP_CONTRACT_ID is not configured'); + return; + } + + const run = async () => { + try { + const result = await this.pollOnce(); + this.logger.info('SmartDrop contract events indexed', result); + } catch (err) { + this.lastRun = new Date().toISOString(); + this.lastError = err.message; + this.logger.warn('SmartDrop event indexing failed', { error: err.message }); + } + }; + + run(); + this.timer = setInterval(run, this.pollIntervalMs); + if (typeof this.timer.unref === 'function') this.timer.unref(); + this.logger.info('SmartDrop event indexer started', { + contractId: this.contractId, + pollIntervalMs: this.pollIntervalMs, + }); + } + + stop() { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + this.logger.info('SmartDrop event indexer stopped'); + } + } +} + +module.exports = { EventPoller }; diff --git a/src/indexer/eventStore.js b/src/indexer/eventStore.js new file mode 100644 index 0000000..6b60511 --- /dev/null +++ b/src/indexer/eventStore.js @@ -0,0 +1,200 @@ +const cache = require('../services/cache'); + +const EVENT_IDS_KEY = 'indexer:contract_events:ids'; +const LAST_LEDGER_KEY = 'indexer:last_ledger'; +const AIRDROP_IDS_KEY = 'indexer:airdrops:ids'; + +function eventKey(id) { + return `indexer:contract_event:${id}`; +} + +function airdropKey(id) { + return `indexer:airdrop:${id}`; +} + +function recipientsKey(id) { + return `indexer:airdrop:${id}:recipients`; +} + +function claimsKey(address) { + return `indexer:recipient:${address}:claims`; +} + +async function getJsonList(key) { + return (await cache.get(key)) || []; +} + +async function setJsonList(key, list) { + await cache.set(key, list); +} + +function getAirdropId(event) { + return event && event.data ? event.data.airdrop_id : null; +} + +function getRecipient(event) { + return event && event.data ? event.data.recipient : null; +} + +async function getLastLedger(defaultLedger = 0) { + const saved = await cache.get(LAST_LEDGER_KEY); + if (saved === null || saved === undefined || saved === '') return defaultLedger; + const parsed = Number(saved); + return Number.isFinite(parsed) ? parsed : defaultLedger; +} + +async function setLastLedger(ledger) { + await cache.set(LAST_LEDGER_KEY, Number(ledger)); +} + +async function upsertAirdrop(event) { + const airdropId = getAirdropId(event); + if (!airdropId) return; + + const existing = (await cache.get(airdropKey(airdropId))) || { airdrop_id: airdropId }; + const next = { + ...existing, + updated_ledger: event.ledger, + updated_at: event.ledger_closed_at, + }; + + if (event.event_name === 'airdrop_created') { + Object.assign(next, { + status: 'created', + creator: event.data.creator ?? existing.creator ?? null, + token: event.data.token ?? existing.token ?? null, + total_amount: event.data.total_amount ?? existing.total_amount ?? null, + expiry_ledger: event.data.expiry_ledger ?? existing.expiry_ledger ?? null, + created_ledger: event.ledger, + created_at: event.ledger_closed_at, + }); + } + + if (event.event_name === 'token_claimed') { + next.status = existing.status === 'expired' ? 'expired' : 'active'; + } + + if (event.event_name === 'airdrop_expired') { + Object.assign(next, { + status: 'expired', + unclaimed_amount: event.data.unclaimed_amount ?? null, + expired_ledger: event.ledger, + expired_at: event.ledger_closed_at, + }); + } + + await cache.set(airdropKey(airdropId), next); + await cache.getClient().sadd(AIRDROP_IDS_KEY, airdropId); +} + +async function upsertRecipient(event) { + const airdropId = getAirdropId(event); + const recipient = getRecipient(event); + if (!airdropId || !recipient) return; + + const key = recipientsKey(airdropId); + const recipients = await getJsonList(key); + const existingIndex = recipients.findIndex((entry) => entry.recipient === recipient); + const existing = existingIndex >= 0 ? recipients[existingIndex] : { recipient }; + const next = { + ...existing, + airdrop_id: airdropId, + amount: event.data.amount ?? existing.amount ?? null, + updated_ledger: event.ledger, + updated_at: event.ledger_closed_at, + }; + + if (event.event_name === 'recipient_added') { + next.status = existing.status || 'pending'; + next.added_ledger = event.ledger; + } + + if (event.event_name === 'token_claimed') { + next.status = 'claimed'; + next.claimed_ledger = event.data.ledger ?? event.ledger; + next.claimed_at = event.ledger_closed_at; + } + + if (existingIndex >= 0) recipients[existingIndex] = next; + else recipients.push(next); + + await setJsonList(key, recipients); +} + +async function appendClaim(event) { + const recipient = getRecipient(event); + const airdropId = getAirdropId(event); + if (event.event_name !== 'token_claimed' || !recipient || !airdropId) return; + + const key = claimsKey(recipient); + const claims = await getJsonList(key); + if (!claims.some((claim) => claim.event_id === event.id)) { + claims.push({ + event_id: event.id, + airdrop_id: airdropId, + recipient, + amount: event.data.amount ?? null, + ledger: event.data.ledger ?? event.ledger, + claimed_at: event.ledger_closed_at, + }); + await setJsonList(key, claims); + } +} + +async function saveEvent(event) { + await cache.set(eventKey(event.id), event); + await cache.getClient().sadd(EVENT_IDS_KEY, event.id); + await upsertAirdrop(event); + await upsertRecipient(event); + await appendClaim(event); +} + +async function getAirdropStatus(airdropId) { + const status = await cache.get(airdropKey(airdropId)); + if (!status) return null; + + const recipients = await getAirdropRecipients(airdropId); + const claimed_count = recipients.filter((recipient) => recipient.status === 'claimed').length; + + return { + ...status, + recipients_count: recipients.length, + claimed_count, + pending_count: recipients.length - claimed_count, + }; +} + +async function getAirdropRecipients(airdropId) { + return getJsonList(recipientsKey(airdropId)); +} + +async function getRecipientClaims(address) { + return getJsonList(claimsKey(address)); +} + +async function getEventCount() { + const ids = await cache.getClient().smembers(EVENT_IDS_KEY); + return ids.length; +} + +async function getStats() { + const [lastLedger, eventsCount] = await Promise.all([ + getLastLedger(0), + getEventCount(), + ]); + + return { + last_ledger: lastLedger, + events_count: eventsCount, + }; +} + +module.exports = { + getAirdropRecipients, + getAirdropStatus, + getLastLedger, + getRecipientClaims, + getStats, + saveEvent, + setLastLedger, +}; diff --git a/src/indexer/runtime.js b/src/indexer/runtime.js new file mode 100644 index 0000000..fc4e3d7 --- /dev/null +++ b/src/indexer/runtime.js @@ -0,0 +1,3 @@ +const { EventPoller } = require('./eventPoller'); + +module.exports = new EventPoller(); diff --git a/src/routes/indexer.js b/src/routes/indexer.js new file mode 100644 index 0000000..2c16300 --- /dev/null +++ b/src/routes/indexer.js @@ -0,0 +1,86 @@ +const express = require('express'); +const eventStore = require('../indexer/eventStore'); +const indexerPoller = require('../indexer/runtime'); +const logger = require('../logger'); + +const router = express.Router(); + +function isValidId(value) { + return typeof value === 'string' && /^[A-Za-z0-9:_-]{1,128}$/.test(value); +} + +function isValidAddress(value) { + return typeof value === 'string' && /^[A-Z0-9]{10,80}$/.test(value); +} + +router.get('/airdrops/:id/status', async (req, res) => { + try { + if (!isValidId(req.params.id)) { + return res.status(400).json({ error: 'Invalid airdrop id' }); + } + + const status = await eventStore.getAirdropStatus(req.params.id); + if (!status) { + return res.status(404).json({ error: 'Airdrop not indexed' }); + } + + return res.json(status); + } catch (err) { + logger.error('Airdrop status lookup failed', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.get('/airdrops/:id/recipients', async (req, res) => { + try { + if (!isValidId(req.params.id)) { + return res.status(400).json({ error: 'Invalid airdrop id' }); + } + + const recipients = await eventStore.getAirdropRecipients(req.params.id); + return res.json({ airdrop_id: req.params.id, recipients }); + } catch (err) { + logger.error('Airdrop recipients lookup failed', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.get('/recipients/:address/claims', async (req, res) => { + try { + if (!isValidAddress(req.params.address)) { + return res.status(400).json({ error: 'Invalid recipient address' }); + } + + const claims = await eventStore.getRecipientClaims(req.params.address); + return res.json({ recipient: req.params.address, claims }); + } catch (err) { + logger.error('Recipient claims lookup failed', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.get('/indexer/status', async (_req, res) => { + try { + const stats = await eventStore.getStats(); + const poller = indexerPoller.getStatus(); + const hasLatestLedger = poller.latest_ledger !== null && poller.latest_ledger !== undefined; + const latestLedger = Number(poller.latest_ledger); + const lastLedger = Number(stats.last_ledger); + const ledgerLag = hasLatestLedger && Number.isFinite(latestLedger) && Number.isFinite(lastLedger) + ? Math.max(0, latestLedger - lastLedger) + : null; + + return res.json({ + ...poller, + last_ledger: stats.last_ledger, + events_count: stats.events_count, + lag: ledgerLag, + ledger_lag: ledgerLag, + }); + } catch (err) { + logger.error('Indexer status lookup failed', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +module.exports = router; diff --git a/test/eventPoller.test.js b/test/eventPoller.test.js new file mode 100644 index 0000000..2904dab --- /dev/null +++ b/test/eventPoller.test.js @@ -0,0 +1,98 @@ +'use strict'; + +const { nativeToScVal } = require('stellar-sdk'); +const { EventPoller } = require('../src/indexer/eventPoller'); + +function contractEvent() { + return { + id: 'evt-1', + type: 'contract', + ledger: 20, + ledgerClosedAt: '2026-06-25T00:00:00Z', + pagingToken: '20-1', + inSuccessfulContractCall: true, + topic: [nativeToScVal('airdrop_created', { type: 'symbol' })], + value: nativeToScVal({ + airdrop_id: 'drop-1', + creator: 'GCREATOR', + token: 'USDC', + total_amount: 1000n, + expiry_ledger: 500n, + }), + }; +} + +describe('EventPoller', () => { + test('polls Soroban RPC, stores parsed events, and advances last ledger', async () => { + const server = { + getEvents: jest.fn(async () => ({ + latestLedger: 25, + events: [contractEvent()], + })), + }; + const store = { + getLastLedger: jest.fn(async () => null), + saveEvent: jest.fn(async () => {}), + setLastLedger: jest.fn(async () => {}), + }; + const logger = { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() }; + + const poller = new EventPoller({ + enabled: true, + contractId: 'CCONTRACT', + startLedger: 10, + pollLimit: 5, + server, + store, + logger, + }); + + const result = await poller.pollOnce(); + + expect(server.getEvents).toHaveBeenCalledWith({ + startLedger: 10, + filters: [{ type: 'contract', contractIds: ['CCONTRACT'] }], + limit: 5, + }); + expect(store.saveEvent).toHaveBeenCalledWith(expect.objectContaining({ + event_name: 'airdrop_created', + data: expect.objectContaining({ airdrop_id: 'drop-1', total_amount: '1000' }), + })); + expect(store.setLastLedger).toHaveBeenCalledWith(25); + expect(result).toMatchObject({ indexed_events: 1, latest_ledger: 25 }); + expect(poller.getStatus()).toMatchObject({ latest_ledger: 25, last_error: null }); + }); + + test('continues from the ledger after the saved checkpoint', async () => { + const server = { + getEvents: jest.fn(async () => ({ latestLedger: 25, events: [] })), + }; + const store = { + getLastLedger: jest.fn(async () => 19), + saveEvent: jest.fn(async () => {}), + setLastLedger: jest.fn(async () => {}), + }; + + const poller = new EventPoller({ + enabled: true, + contractId: 'CCONTRACT', + startLedger: 10, + server, + store, + }); + + await poller.pollOnce(); + + expect(server.getEvents.mock.calls[0][0].startLedger).toBe(20); + }); + + test('skips polling when no contract id is configured', async () => { + const poller = new EventPoller({ + enabled: true, + contractId: '', + server: { getEvents: jest.fn() }, + }); + + await expect(poller.pollOnce()).resolves.toMatchObject({ skipped: true }); + }); +}); diff --git a/test/indexerParser.test.js b/test/indexerParser.test.js new file mode 100644 index 0000000..cd4598e --- /dev/null +++ b/test/indexerParser.test.js @@ -0,0 +1,93 @@ +'use strict'; + +const { nativeToScVal, xdr } = require('stellar-sdk'); +const { EVENT_NAMES, parseContractEvent } = require('../src/indexer/eventParser'); + +function sym(value) { + return nativeToScVal(value, { type: 'symbol' }); +} + +function scVal(value) { + if (Array.isArray(value)) return xdr.ScVal.scvVec(value.map(scVal)); + return nativeToScVal(value); +} + +function topic(value) { + return typeof value === 'string' && EVENT_NAMES.includes(value) ? sym(value) : scVal(value); +} + +function event(topics, value, overrides = {}) { + return { + id: overrides.id || 'evt-1', + type: 'contract', + ledger: overrides.ledger || 123, + ledgerClosedAt: '2026-06-25T00:00:00Z', + pagingToken: '123-1', + inSuccessfulContractCall: true, + topic: topics.map(topic), + value: scVal(value), + ...overrides, + }; +} + +describe('Soroban contract event parser', () => { + test('decodes airdrop_created events with full array payload', () => { + const parsed = parseContractEvent(event(['airdrop_created'], [ + 'drop-1', + 'GCREATOR11111111111111111111111111111111111111111111111', + 'USDC', + 1000n, + 456n, + ])); + + expect(parsed.event_name).toBe('airdrop_created'); + expect(parsed.data).toMatchObject({ + airdrop_id: 'drop-1', + creator: 'GCREATOR11111111111111111111111111111111111111111111111', + token: 'USDC', + total_amount: '1000', + expiry_ledger: '456', + }); + expect(parsed.raw_xdr.value).toEqual(expect.any(String)); + }); + + test('uses topic hints when IDs are emitted as topics', () => { + const parsed = parseContractEvent(event(['recipient_added', 'drop-1', 'GRECIPIENT1111111111111111111111111111111111111111111'], [250n])); + + expect(parsed.event_name).toBe('recipient_added'); + expect(parsed.data).toMatchObject({ + airdrop_id: 'drop-1', + recipient: 'GRECIPIENT1111111111111111111111111111111111111111111', + amount: '250', + }); + }); + + test('decodes token_claimed events with object payloads', () => { + const parsed = parseContractEvent(event(['token_claimed'], { + airdrop_id: 'drop-1', + recipient: 'GRECIPIENT1111111111111111111111111111111111111111111', + amount: 125n, + ledger: 789n, + })); + + expect(parsed.data).toMatchObject({ + airdrop_id: 'drop-1', + amount: '125', + ledger: '789', + }); + }); + + test('decodes airdrop_expired events', () => { + const parsed = parseContractEvent(event(['airdrop_expired', 'drop-1'], [875n])); + + expect(parsed.event_name).toBe('airdrop_expired'); + expect(parsed.data).toMatchObject({ + airdrop_id: 'drop-1', + unclaimed_amount: '875', + }); + }); + + test('ignores unsupported contract events', () => { + expect(parseContractEvent(event(['unrelated_event'], ['drop-1']))).toBeNull(); + }); +}); diff --git a/test/indexerRoutes.test.js b/test/indexerRoutes.test.js new file mode 100644 index 0000000..673daaa --- /dev/null +++ b/test/indexerRoutes.test.js @@ -0,0 +1,102 @@ +'use strict'; + +const express = require('express'); +const request = require('supertest'); + +const mockGetAirdropStatus = jest.fn(); +const mockGetAirdropRecipients = jest.fn(); +const mockGetRecipientClaims = jest.fn(); +const mockGetStats = jest.fn(); + +jest.mock('../src/indexer/eventStore', () => ({ + getAirdropStatus: mockGetAirdropStatus, + getAirdropRecipients: mockGetAirdropRecipients, + getRecipientClaims: mockGetRecipientClaims, + getStats: mockGetStats, +})); + +jest.mock('../src/indexer/runtime', () => ({ + getStatus: jest.fn(() => ({ + enabled: true, + configured: true, + running: true, + contract_id: 'CCONTRACT', + poll_interval_ms: 5000, + poll_limit: 100, + last_run: '2026-06-25T00:00:00.000Z', + last_error: null, + })), +})); + +jest.mock('../src/logger', () => ({ + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), +})); + +const indexerRouter = require('../src/routes/indexer'); + +function buildApp() { + const app = express(); + app.use('/api/v1', indexerRouter); + return app; +} + +beforeEach(() => { + jest.clearAllMocks(); +}); + +describe('indexer routes', () => { + test('returns indexed airdrop status', async () => { + mockGetAirdropStatus.mockResolvedValue({ + airdrop_id: 'drop-1', + status: 'created', + recipients_count: 2, + }); + + const res = await request(buildApp()).get('/api/v1/airdrops/drop-1/status'); + + expect(res.status).toBe(200); + expect(res.body).toMatchObject({ airdrop_id: 'drop-1', status: 'created' }); + }); + + test('returns 404 for unknown airdrop status', async () => { + mockGetAirdropStatus.mockResolvedValue(null); + + const res = await request(buildApp()).get('/api/v1/airdrops/missing/status'); + + expect(res.status).toBe(404); + }); + + test('returns indexed recipients', async () => { + mockGetAirdropRecipients.mockResolvedValue([{ recipient: 'GRECIPIENT', status: 'claimed' }]); + + const res = await request(buildApp()).get('/api/v1/airdrops/drop-1/recipients'); + + expect(res.status).toBe(200); + expect(res.body.recipients).toHaveLength(1); + }); + + test('returns recipient claims', async () => { + mockGetRecipientClaims.mockResolvedValue([{ airdrop_id: 'drop-1', amount: '25' }]); + + const res = await request(buildApp()).get('/api/v1/recipients/GRECIPIENT12345/claims'); + + expect(res.status).toBe(200); + expect(res.body.claims).toEqual([{ airdrop_id: 'drop-1', amount: '25' }]); + }); + + test('returns indexer status with ledger and event counts', async () => { + mockGetStats.mockResolvedValue({ last_ledger: 42, events_count: 7 }); + + const res = await request(buildApp()).get('/api/v1/indexer/status'); + + expect(res.status).toBe(200); + expect(res.body).toMatchObject({ + configured: true, + last_ledger: 42, + events_count: 7, + }); + }); +}); diff --git a/test/indexerStore.test.js b/test/indexerStore.test.js new file mode 100644 index 0000000..fde01bc --- /dev/null +++ b/test/indexerStore.test.js @@ -0,0 +1,115 @@ +'use strict'; + +const mockStore = new Map(); +const mockSets = new Map(); + +const mockRedis = { + smembers: jest.fn(async (key) => [...(mockSets.get(key) || [])]), + sadd: jest.fn(async (key, val) => { + if (!mockSets.has(key)) mockSets.set(key, new Set()); + mockSets.get(key).add(val); + }), +}; + +jest.mock('../src/services/cache', () => ({ + getClient: () => mockRedis, + get: jest.fn(async (key) => { + const value = mockStore.get(key); + return value !== undefined ? JSON.parse(JSON.stringify(value)) : null; + }), + set: jest.fn(async (key, value) => { + mockStore.set(key, JSON.parse(JSON.stringify(value))); + }), + del: jest.fn(async (key) => { + mockStore.delete(key); + }), +})); + +const eventStore = require('../src/indexer/eventStore'); + +function baseEvent(overrides) { + return { + id: overrides.id, + event_name: overrides.event_name, + ledger: overrides.ledger || 100, + ledger_closed_at: '2026-06-25T00:00:00Z', + data: overrides.data, + }; +} + +beforeEach(() => { + mockStore.clear(); + mockSets.clear(); + mockRedis.smembers.mockClear(); + mockRedis.sadd.mockClear(); +}); + +describe('indexer event store', () => { + test('persists airdrop lifecycle, recipients, claims, and stats', async () => { + await eventStore.saveEvent(baseEvent({ + id: 'evt-created', + event_name: 'airdrop_created', + ledger: 10, + data: { + airdrop_id: 'drop-1', + creator: 'GCREATOR', + token: 'USDC', + total_amount: '1000', + expiry_ledger: '500', + }, + })); + await eventStore.saveEvent(baseEvent({ + id: 'evt-recipient', + event_name: 'recipient_added', + ledger: 11, + data: { + airdrop_id: 'drop-1', + recipient: 'GRECIPIENT', + amount: '250', + }, + })); + await eventStore.saveEvent(baseEvent({ + id: 'evt-claim', + event_name: 'token_claimed', + ledger: 12, + data: { + airdrop_id: 'drop-1', + recipient: 'GRECIPIENT', + amount: '250', + ledger: '12', + }, + })); + await eventStore.saveEvent(baseEvent({ + id: 'evt-expired', + event_name: 'airdrop_expired', + ledger: 13, + data: { + airdrop_id: 'drop-1', + unclaimed_amount: '750', + }, + })); + await eventStore.setLastLedger(13); + + const status = await eventStore.getAirdropStatus('drop-1'); + expect(status).toMatchObject({ + airdrop_id: 'drop-1', + status: 'expired', + total_amount: '1000', + recipients_count: 1, + claimed_count: 1, + pending_count: 0, + unclaimed_amount: '750', + }); + + await expect(eventStore.getAirdropRecipients('drop-1')).resolves.toEqual([ + expect.objectContaining({ recipient: 'GRECIPIENT', status: 'claimed', amount: '250' }), + ]); + await expect(eventStore.getRecipientClaims('GRECIPIENT')).resolves.toEqual([ + expect.objectContaining({ event_id: 'evt-claim', airdrop_id: 'drop-1', amount: '250' }), + ]); + await expect(eventStore.getStats()).resolves.toMatchObject({ + last_ledger: 13, + events_count: 4, + }); + }); +});