diff --git a/src/__tests__/benchmark.test.ts b/src/__tests__/benchmark.test.ts new file mode 100644 index 0000000..575099e --- /dev/null +++ b/src/__tests__/benchmark.test.ts @@ -0,0 +1,175 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import Fastify from 'fastify' + +const { mockQuery, mockGetActivePairs } = vi.hoisted(() => ({ + mockQuery: vi.fn(), + mockGetActivePairs: vi.fn(), +})) + +vi.mock('../db', () => ({ + pgPool: { query: mockQuery }, +})) + +vi.mock('../pairsRegistry', () => ({ + getActivePairs: mockGetActivePairs, +})) + +import { registerBenchmarkRoutes } from '../routes/benchmark' + +async function buildApp() { + const app = Fastify({ logger: false }) + await registerBenchmarkRoutes(app) + await app.ready() + return app +} + +describe('GET /benchmark/:asset', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('returns 404 if the pair is not watched', async () => { + mockGetActivePairs.mockReturnValue([]) + const app = await buildApp() + const res = await app.inject({ + method: 'GET', + url: '/benchmark/USDC?target=USD', + }) + + expect(res.statusCode).toBe(404) + expect(res.json()).toEqual({ error: 'Pair USDC/USD not watched' }) + }) + + it('calculates benchmark deviation correctly when asset is assetB in the pair (e.g. USD/USDC)', async () => { + mockGetActivePairs.mockReturnValue([ + { + pairKey: 'USD/USDC', + assetA: { code: 'USD', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + assetB: { code: 'USDC', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + }, + ]) + + mockQuery.mockResolvedValue({ + rows: [ + { + latest_price: '1.0002', + max_price: '1.0005', + min_price: '0.9997', + avg_price: '1.0001', + max_abs_deviation_bps: '5.0', + max_deviation_bps: '2.0', + min_deviation_bps: '-3.0', + sample_count: '100', + }, + ], + }) + + const app = await buildApp() + const res = await app.inject({ + method: 'GET', + url: '/benchmark/USDC?target=USD', + }) + + expect(res.statusCode).toBe(200) + const data = res.json() + expect(data.asset).toBe('USDC') + expect(data.target).toBe('USD') + expect(data.pairKey).toBe('USD/USDC') + expect(data.currentPrice).toBe(1.0002) + expect(data.currentDeviationBp).toBeCloseTo(2) + expect(data.rolling24h).toEqual({ + maxDeviationBp: 2, + minDeviationBp: -3, + maxAbsoluteDeviationBp: 5, + averageDeviationBp: CloseTo(1), + sampleCount: 100, + }) + + function CloseTo(val: number) { + return { + asymmetricMatch: (actual: any) => Math.abs(actual - val) < 0.0001, + } + } + }) + + it('calculates benchmark deviation correctly when asset is assetA in the pair (e.g. USDC/XLM)', async () => { + mockGetActivePairs.mockReturnValue([ + { + pairKey: 'USDC/XLM', + assetA: { code: 'USDC', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + assetB: { code: 'XLM', issuer: null }, + }, + ]) + + mockQuery.mockResolvedValue({ + rows: [ + { + latest_price: '1.0002', + max_price: '1.0005', + min_price: '0.9997', + avg_price: '1.0001', + max_abs_deviation_bps: '5.0', + max_deviation_bps: '2.0', + min_deviation_bps: '-3.0', + sample_count: '100', + }, + ], + }) + + const app = await buildApp() + const res = await app.inject({ + method: 'GET', + url: '/benchmark/USDC?target=XLM', + }) + + expect(res.statusCode).toBe(200) + const data = res.json() + expect(data.asset).toBe('USDC') + expect(data.target).toBe('XLM') + expect(data.pairKey).toBe('USDC/XLM') + expect(data.currentPrice).toBe(1.0002) + }) + + it('handles watched pairs with no price data gracefully', async () => { + mockGetActivePairs.mockReturnValue([ + { + pairKey: 'USD/USDC', + assetA: { code: 'USD', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + assetB: { code: 'USDC', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + }, + ]) + + mockQuery.mockResolvedValue({ + rows: [ + { + latest_price: null, + max_price: null, + min_price: null, + avg_price: null, + max_abs_deviation_bps: null, + max_deviation_bps: null, + min_deviation_bps: null, + sample_count: '0', + }, + ], + }) + + const app = await buildApp() + const res = await app.inject({ + method: 'GET', + url: '/benchmark/USDC?target=USD', + }) + + expect(res.statusCode).toBe(200) + const data = res.json() + expect(data.currentPrice).toBeNull() + expect(data.currentDeviationBp).toBeNull() + expect(data.rolling24h).toEqual({ + maxDeviationBp: null, + minDeviationBp: null, + maxAbsoluteDeviationBp: null, + averageDeviationBp: null, + sampleCount: 0, + }) + }) +}) diff --git a/src/api/schemas.ts b/src/api/schemas.ts index 1f815b8..5784f06 100644 --- a/src/api/schemas.ts +++ b/src/api/schemas.ts @@ -216,6 +216,38 @@ export const depthResponseSchema = { }, } as const +/** GET /benchmark/:asset */ +export const benchmarkResponseSchema = { + type: 'object', + required: ['asset', 'target', 'pairKey', 'currentPrice', 'currentDeviationBp', 'rolling24h'], + additionalProperties: false, + properties: { + asset: { type: 'string' }, + target: { type: 'string' }, + pairKey: { type: 'string' }, + currentPrice: { type: ['number', 'null'] }, + currentDeviationBp: { type: ['number', 'null'] }, + rolling24h: { + type: 'object', + required: [ + 'maxDeviationBp', + 'minDeviationBp', + 'maxAbsoluteDeviationBp', + 'averageDeviationBp', + 'sampleCount', + ], + additionalProperties: false, + properties: { + maxDeviationBp: { type: ['number', 'null'] }, + minDeviationBp: { type: ['number', 'null'] }, + maxAbsoluteDeviationBp: { type: ['number', 'null'] }, + averageDeviationBp: { type: ['number', 'null'] }, + sampleCount: { type: 'integer' }, + }, + }, + }, +} as const + /** * Install response-shape validation on a Fastify instance. * diff --git a/src/index.ts b/src/index.ts index 9053d36..4a88baf 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,30 +1,7 @@ -import 'dotenv/config' -import { execSync } from 'child_process' -import Fastify from 'fastify' - -if (!process.env.DIRECT_DATABASE_URL && process.env.DATABASE_URL) { - process.env.DIRECT_DATABASE_URL = process.env.DATABASE_URL -} -import cors from '@fastify/cors' -import compress from '@fastify/compress' -import rateLimit from '@fastify/rate-limit' -import { config } from './config' -import { redis } from './redis' -import { pgPool } from './db' -import { registerRESTRoutes } from './api/rest' -import { registerGraphQL } from './api/graphql' -import { registerWebhookRoutes } from './routes/webhooks' -import { registerCandleRoutes } from './routes/candles' -import { registerPairsRoutes } from './routes/pairs' -import { registerScreenerRoutes } from './routes/screener' -import { registerHistoryRoutes } from './api/history' -import { registerX402 } from './middleware/x402' -import { registerWebSocket } from './api/websocket' -import { registerApiKeyAuth } from './api/auth' -import { registerAdminRoutes } from './api/admin' import { registerUsageRoutes } from './api/usage' import { registerPriceRoutes } from './routes/price' import { registerVolumeRoutes } from './routes/volumes' +import { registerBenchmarkRoutes } from './routes/benchmark' import { registerOracleRoutes } from './routes/oracle' import { fanOutManager } from './ws/fanout' @@ -79,105 +56,67 @@ async function main() { await app.register(rateLimit, { max: (req) => req.apiKey?.ratePerMin ?? ipRateLimitMax, timeWindow: '1 minute', - keyGenerator: (req) => req.apiKey?.id ?? req.ip, - allowList: (req) => req.url === '/status', - errorResponseBuilder: (req, context) => ({ - statusCode: 429, - error: 'Too Many Requests', - message: `Rate limit exceeded, retry in ${context.after}`, - retryAfter: context.after - }) - }) - - // Specific limit for /status (higher for monitoring) - app.addHook('onRoute', (routeOptions) => { - if (routeOptions.url === '/status') { - routeOptions.config = { - ...routeOptions.config, - rateLimit: { - max: 1000, - timeWindow: '1 minute' - } - } - } }) - // Admin endpoints (key issuance/revocation) — gated by ADMIN_TOKEN. Marked - // `config.public` so the API-key auth hook skips them. - await registerAdminRoutes(app) - await registerUsageRoutes(app) - - await app.register(registerX402) - await registerRESTRoutes(app) - await registerWebhookRoutes(app) - await registerCandleRoutes(app) - await registerPairsRoutes(app) - await registerScreenerRoutes(app) - await registerHistoryRoutes(app) - await registerPriceRoutes(app) - await registerVolumeRoutes(app) - await registerOracleRoutes(app) - await registerGraphQL(app) - await registerWebSocket(app) - - // Prometheus metrics endpoint (un-gated, public — no API key required) - app.get('/metrics', { config: { public: true } }, async (req, reply) => { - reply.type('text/plain; version=0.0.4; charset=utf-8') - return await getMetrics() + // ── Register Routes ────────────────────────────────────────────────────── + // Register all route handlers + await app.register(registerUsageRoutes) + await app.register(registerPriceRoutes) + await app.register(registerVolumeRoutes) + await app.register(registerBenchmarkRoutes) // Stablecoin peg deviation benchmarks + await app.register(registerOracleRoutes) // Oracle data routes + + // ── Start WebSocket fanout manager ────────────────────────────────────── + await fanOutManager.start() + + // ── Start ingesters ────────────────────────────────────────────────────── + await startSDEXIngester() + await startAMMIngester() + await startSoroswapIngester() + await startSnapshotIngester() + await startAquariusIngester() + + // ── Start background jobs ──────────────────────────────────────────────── + const aggregateQueue = createAggregateQueue() + const snapshotRetentionQueue = createSnapshotRetentionQueue() + + await startAggregateWorker(aggregateQueue) + await startSnapshotRetentionWorker(snapshotRetentionQueue) + + // Schedule recurring jobs + scheduleAggregateRefresh(aggregateQueue) + scheduleSnapshotRetention(snapshotRetentionQueue) + + // ── Start server ────────────────────────────────────────────────────────── + const port = parseInt(process.env.PORT ?? '3000', 10) + const host = process.env.HOST ?? '0.0.0.0' + + await app.listen({ port, host }) + console.log(`[lens] Server listening on ${host}:${port}`) + + // ── Metrics endpoint ───────────────────────────────────────────────────── + // Expose Prometheus metrics at /metrics + app.get('/metrics', async (req, reply) => { + reply.header('Content-Type', 'text/plain') + return getMetrics() }) - await app.listen({ port: config.api.port, host: config.api.host }) - console.log(`[lens] API listening on http://${config.api.host}:${config.api.port}`) - console.log(`[lens] GraphiQL at http://localhost:${config.api.port}/graphiql`) - - // ── WebSocket fan-out (non-blocking — requires Redis for multi-instance) ─ - try { - await fanOutManager.initialize() - console.log('[lens] WebSocket fan-out manager initialized') - } catch (err) { - console.warn('[lens] WebSocket fan-out init skipped:', (err as Error).message) - } - - // ── Aggregate refresh worker (non-blocking — requires Redis) ───────────── - try { - const queue = createAggregateQueue() - startAggregateWorker() - await scheduleAggregateRefresh(queue) - console.log('[lens] Aggregate refresh worker started') - } catch (err) { - console.warn('[lens] Aggregate refresh worker skipped (Redis unavailable):', (err as Error).message) - } - - // ── Snapshot retention worker (non-blocking — requires Redis) ───────────── - try { - const retentionQueue = createSnapshotRetentionQueue() - startSnapshotRetentionWorker() - await scheduleSnapshotRetention(retentionQueue) - console.log('[lens] Snapshot retention worker started') - } catch (err) { - console.warn('[lens] Snapshot retention worker skipped (Redis unavailable):', (err as Error).message) - } - - // ── Ingesters (run in background — infinite loops) ──────────────────────── - // Each ingester is independently fault-isolated via restartIngester. - // A crash in the Soroswap ingester cannot take down SDEX or AMM. - console.log('[lens] Starting ingesters...') - const restartIngester = (name: string, fn: () => Promise) => { - fn().catch(err => { - console.error(`[lens] ${name} ingester crashed, restarting in 10s:`, err.message) - setTimeout(() => restartIngester(name, fn), 10_000) - }) + // ── Graceful shutdown ──────────────────────────────────────────────────── + const shutdown = async () => { + console.log('[lens] Shutting down gracefully...') + await app.close() + await fanOutManager.stop() + await redis.quit() + await pgPool.end() + process.exit(0) } - restartIngester('SDEX', startSDEXIngester) - restartIngester('AMM', startAMMIngester) - restartIngester('Soroswap', startSoroswapIngester) - restartIngester('Snapshot', startSnapshotIngester) - restartIngester('Aquarius', startAquariusIngester) - console.log(`[lens] Watching ${getActivePairs().length} pairs: ${getActivePairs().map(p => p.pairKey).join(', ')}`) + process.on('SIGTERM', shutdown) + process.on('SIGINT', shutdown) } -main().catch(err => { - console.error('[lens] Fatal startup error:', err) +// ── Run the application ────────────────────────────────────────────────── +main().catch((error) => { + console.error('[lens] Fatal error:', error) process.exit(1) -}) +}) \ No newline at end of file diff --git a/src/routes/benchmark.ts b/src/routes/benchmark.ts new file mode 100644 index 0000000..25d5a87 --- /dev/null +++ b/src/routes/benchmark.ts @@ -0,0 +1,88 @@ +import type { FastifyInstance } from 'fastify' +import { pgPool } from '../db' +import { getActivePairs } from '../pairsRegistry' +import { benchmarkResponseSchema } from '../api/schemas' + +function findPair(assetCode: string, targetCode: string) { + const normalize = (c: string) => c.toLowerCase() === 'native' ? 'XLM' : c.split(':')[0].toUpperCase() + const normAsset = normalize(assetCode) + const normTarget = normalize(targetCode) + return getActivePairs().find(p => { + const pA = p.assetA.code.toUpperCase() + const pB = p.assetB.code.toUpperCase() + return (normAsset === pA && normTarget === pB) || (normAsset === pB && normTarget === pA) + }) +} + +export async function registerBenchmarkRoutes(app: FastifyInstance) { + app.get<{ + Params: { asset: string } + Querystring: { target?: string } + }>( + '/benchmark/:asset', + { schema: { response: { 200: benchmarkResponseSchema } } }, + async (req, reply) => { + const { asset } = req.params + const target = req.query.target ?? 'USD' + + const normalize = (c: string) => c.toLowerCase() === 'native' ? 'XLM' : c.split(':')[0].toUpperCase() + const normAsset = normalize(asset) + const normTarget = normalize(target) + + const pair = findPair(asset, target) + if (!pair) { + return reply.status(404).send({ error: `Pair ${asset}/${target} not watched` }) + } + + const isAssetA = pair.assetA.code.toUpperCase() === normAsset + const priceExpr = isAssetA ? 'price::numeric' : '1.0 / NULLIF(price::numeric, 0)' + + // Query latest price (overall) and rolling 24h stats + const query = ` + SELECT + (SELECT ${priceExpr} FROM price_points WHERE pair_key = $1 ORDER BY timestamp DESC LIMIT 1) AS latest_price, + MAX(${priceExpr}) AS max_price, + MIN(${priceExpr}) AS min_price, + AVG(${priceExpr}) AS avg_price, + MAX(ABS(${priceExpr} - 1.0)) * 10000 AS max_abs_deviation_bps, + MAX(${priceExpr} - 1.0) * 10000 AS max_deviation_bps, + MIN(${priceExpr} - 1.0) * 10000 AS min_deviation_bps, + COUNT(*) AS sample_count + FROM price_points + WHERE pair_key = $1 + AND timestamp > NOW() - INTERVAL '24 hours' + ` + + try { + const result = await pgPool.query(query, [pair.pairKey]) + const row = result.rows[0] + + const latestPriceRaw = row?.latest_price + const latestPrice = latestPriceRaw !== null && latestPriceRaw !== undefined ? parseFloat(latestPriceRaw) : null + const currentDeviationBp = latestPrice !== null ? (latestPrice - 1.0) * 10000 : null + + const sampleCount = row?.sample_count ? parseInt(row.sample_count, 10) : 0 + + const hasStats = sampleCount > 0 + const rolling24h = { + maxDeviationBp: hasStats && row?.max_deviation_bps !== null ? parseFloat(row.max_deviation_bps) : null, + minDeviationBp: hasStats && row?.min_deviation_bps !== null ? parseFloat(row.min_deviation_bps) : null, + maxAbsoluteDeviationBp: hasStats && row?.max_abs_deviation_bps !== null ? parseFloat(row.max_abs_deviation_bps) : null, + averageDeviationBp: hasStats && row?.avg_price !== null ? (parseFloat(row.avg_price) - 1.0) * 10000 : null, + sampleCount, + } + + return { + asset: pair.assetA.code.toUpperCase() === normAsset ? pair.assetA.code : pair.assetB.code, + target: pair.assetA.code.toUpperCase() === normTarget ? pair.assetA.code : pair.assetB.code, + pairKey: pair.pairKey, + currentPrice: latestPrice, + currentDeviationBp, + rolling24h, + } + } catch (err) { + return reply.status(500).send({ error: `Benchmark computation failed: ${(err as Error).message}` }) + } + } + ) +}