From 18f909c9db491d829ac83e74a41dc1b50eac1caa Mon Sep 17 00:00:00 2001 From: flourishbar Date: Sun, 28 Jun 2026 15:13:08 +0100 Subject: [PATCH 1/6] feat: implement stablecoin peg deviation benchmark route --- src/__tests__/benchmark.test.ts | 175 ++++++++++++++++++++++++++++++++ src/api/schemas.ts | 32 ++++++ src/index.ts | 2 + src/routes/benchmark.ts | 88 ++++++++++++++++ 4 files changed, 297 insertions(+) create mode 100644 src/__tests__/benchmark.test.ts create mode 100644 src/routes/benchmark.ts diff --git a/src/__tests__/benchmark.test.ts b/src/__tests__/benchmark.test.ts new file mode 100644 index 0000000..575099e --- /dev/null +++ b/src/__tests__/benchmark.test.ts @@ -0,0 +1,175 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import Fastify from 'fastify' + +const { mockQuery, mockGetActivePairs } = vi.hoisted(() => ({ + mockQuery: vi.fn(), + mockGetActivePairs: vi.fn(), +})) + +vi.mock('../db', () => ({ + pgPool: { query: mockQuery }, +})) + +vi.mock('../pairsRegistry', () => ({ + getActivePairs: mockGetActivePairs, +})) + +import { registerBenchmarkRoutes } from '../routes/benchmark' + +async function buildApp() { + const app = Fastify({ logger: false }) + await registerBenchmarkRoutes(app) + await app.ready() + return app +} + +describe('GET /benchmark/:asset', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('returns 404 if the pair is not watched', async () => { + mockGetActivePairs.mockReturnValue([]) + const app = await buildApp() + const res = await app.inject({ + method: 'GET', + url: '/benchmark/USDC?target=USD', + }) + + expect(res.statusCode).toBe(404) + expect(res.json()).toEqual({ error: 'Pair USDC/USD not watched' }) + }) + + it('calculates benchmark deviation correctly when asset is assetB in the pair (e.g. USD/USDC)', async () => { + mockGetActivePairs.mockReturnValue([ + { + pairKey: 'USD/USDC', + assetA: { code: 'USD', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + assetB: { code: 'USDC', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + }, + ]) + + mockQuery.mockResolvedValue({ + rows: [ + { + latest_price: '1.0002', + max_price: '1.0005', + min_price: '0.9997', + avg_price: '1.0001', + max_abs_deviation_bps: '5.0', + max_deviation_bps: '2.0', + min_deviation_bps: '-3.0', + sample_count: '100', + }, + ], + }) + + const app = await buildApp() + const res = await app.inject({ + method: 'GET', + url: '/benchmark/USDC?target=USD', + }) + + expect(res.statusCode).toBe(200) + const data = res.json() + expect(data.asset).toBe('USDC') + expect(data.target).toBe('USD') + expect(data.pairKey).toBe('USD/USDC') + expect(data.currentPrice).toBe(1.0002) + expect(data.currentDeviationBp).toBeCloseTo(2) + expect(data.rolling24h).toEqual({ + maxDeviationBp: 2, + minDeviationBp: -3, + maxAbsoluteDeviationBp: 5, + averageDeviationBp: CloseTo(1), + sampleCount: 100, + }) + + function CloseTo(val: number) { + return { + asymmetricMatch: (actual: any) => Math.abs(actual - val) < 0.0001, + } + } + }) + + it('calculates benchmark deviation correctly when asset is assetA in the pair (e.g. USDC/XLM)', async () => { + mockGetActivePairs.mockReturnValue([ + { + pairKey: 'USDC/XLM', + assetA: { code: 'USDC', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + assetB: { code: 'XLM', issuer: null }, + }, + ]) + + mockQuery.mockResolvedValue({ + rows: [ + { + latest_price: '1.0002', + max_price: '1.0005', + min_price: '0.9997', + avg_price: '1.0001', + max_abs_deviation_bps: '5.0', + max_deviation_bps: '2.0', + min_deviation_bps: '-3.0', + sample_count: '100', + }, + ], + }) + + const app = await buildApp() + const res = await app.inject({ + method: 'GET', + url: '/benchmark/USDC?target=XLM', + }) + + expect(res.statusCode).toBe(200) + const data = res.json() + expect(data.asset).toBe('USDC') + expect(data.target).toBe('XLM') + expect(data.pairKey).toBe('USDC/XLM') + expect(data.currentPrice).toBe(1.0002) + }) + + it('handles watched pairs with no price data gracefully', async () => { + mockGetActivePairs.mockReturnValue([ + { + pairKey: 'USD/USDC', + assetA: { code: 'USD', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + assetB: { code: 'USDC', issuer: 'GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5' }, + }, + ]) + + mockQuery.mockResolvedValue({ + rows: [ + { + latest_price: null, + max_price: null, + min_price: null, + avg_price: null, + max_abs_deviation_bps: null, + max_deviation_bps: null, + min_deviation_bps: null, + sample_count: '0', + }, + ], + }) + + const app = await buildApp() + const res = await app.inject({ + method: 'GET', + url: '/benchmark/USDC?target=USD', + }) + + expect(res.statusCode).toBe(200) + const data = res.json() + expect(data.currentPrice).toBeNull() + expect(data.currentDeviationBp).toBeNull() + expect(data.rolling24h).toEqual({ + maxDeviationBp: null, + minDeviationBp: null, + maxAbsoluteDeviationBp: null, + averageDeviationBp: null, + sampleCount: 0, + }) + }) +}) diff --git a/src/api/schemas.ts b/src/api/schemas.ts index 1f815b8..5784f06 100644 --- a/src/api/schemas.ts +++ b/src/api/schemas.ts @@ -216,6 +216,38 @@ export const depthResponseSchema = { }, } as const +/** GET /benchmark/:asset */ +export const benchmarkResponseSchema = { + type: 'object', + required: ['asset', 'target', 'pairKey', 'currentPrice', 'currentDeviationBp', 'rolling24h'], + additionalProperties: false, + properties: { + asset: { type: 'string' }, + target: { type: 'string' }, + pairKey: { type: 'string' }, + currentPrice: { type: ['number', 'null'] }, + currentDeviationBp: { type: ['number', 'null'] }, + rolling24h: { + type: 'object', + required: [ + 'maxDeviationBp', + 'minDeviationBp', + 'maxAbsoluteDeviationBp', + 'averageDeviationBp', + 'sampleCount', + ], + additionalProperties: false, + properties: { + maxDeviationBp: { type: ['number', 'null'] }, + minDeviationBp: { type: ['number', 'null'] }, + maxAbsoluteDeviationBp: { type: ['number', 'null'] }, + averageDeviationBp: { type: ['number', 'null'] }, + sampleCount: { type: 'integer' }, + }, + }, + }, +} as const + /** * Install response-shape validation on a Fastify instance. * diff --git a/src/index.ts b/src/index.ts index 3d9cc31..247c3f9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,6 +24,7 @@ import { registerApiKeyAuth } from './api/auth' import { registerAdminRoutes } from './api/admin' import { registerPriceRoutes } from './routes/price' import { registerVolumeRoutes } from './routes/volumes' +import { registerBenchmarkRoutes } from './routes/benchmark' import { fanOutManager } from './ws/fanout' import { startSDEXIngester } from './ingesters/sdex' @@ -112,6 +113,7 @@ async function main() { await registerHistoryRoutes(app) await registerPriceRoutes(app) await registerVolumeRoutes(app) + await registerBenchmarkRoutes(app) await registerGraphQL(app) await registerWebSocket(app) diff --git a/src/routes/benchmark.ts b/src/routes/benchmark.ts new file mode 100644 index 0000000..25d5a87 --- /dev/null +++ b/src/routes/benchmark.ts @@ -0,0 +1,88 @@ +import type { FastifyInstance } from 'fastify' +import { pgPool } from '../db' +import { getActivePairs } from '../pairsRegistry' +import { benchmarkResponseSchema } from '../api/schemas' + +function findPair(assetCode: string, targetCode: string) { + const normalize = (c: string) => c.toLowerCase() === 'native' ? 'XLM' : c.split(':')[0].toUpperCase() + const normAsset = normalize(assetCode) + const normTarget = normalize(targetCode) + return getActivePairs().find(p => { + const pA = p.assetA.code.toUpperCase() + const pB = p.assetB.code.toUpperCase() + return (normAsset === pA && normTarget === pB) || (normAsset === pB && normTarget === pA) + }) +} + +export async function registerBenchmarkRoutes(app: FastifyInstance) { + app.get<{ + Params: { asset: string } + Querystring: { target?: string } + }>( + '/benchmark/:asset', + { schema: { response: { 200: benchmarkResponseSchema } } }, + async (req, reply) => { + const { asset } = req.params + const target = req.query.target ?? 'USD' + + const normalize = (c: string) => c.toLowerCase() === 'native' ? 'XLM' : c.split(':')[0].toUpperCase() + const normAsset = normalize(asset) + const normTarget = normalize(target) + + const pair = findPair(asset, target) + if (!pair) { + return reply.status(404).send({ error: `Pair ${asset}/${target} not watched` }) + } + + const isAssetA = pair.assetA.code.toUpperCase() === normAsset + const priceExpr = isAssetA ? 'price::numeric' : '1.0 / NULLIF(price::numeric, 0)' + + // Query latest price (overall) and rolling 24h stats + const query = ` + SELECT + (SELECT ${priceExpr} FROM price_points WHERE pair_key = $1 ORDER BY timestamp DESC LIMIT 1) AS latest_price, + MAX(${priceExpr}) AS max_price, + MIN(${priceExpr}) AS min_price, + AVG(${priceExpr}) AS avg_price, + MAX(ABS(${priceExpr} - 1.0)) * 10000 AS max_abs_deviation_bps, + MAX(${priceExpr} - 1.0) * 10000 AS max_deviation_bps, + MIN(${priceExpr} - 1.0) * 10000 AS min_deviation_bps, + COUNT(*) AS sample_count + FROM price_points + WHERE pair_key = $1 + AND timestamp > NOW() - INTERVAL '24 hours' + ` + + try { + const result = await pgPool.query(query, [pair.pairKey]) + const row = result.rows[0] + + const latestPriceRaw = row?.latest_price + const latestPrice = latestPriceRaw !== null && latestPriceRaw !== undefined ? parseFloat(latestPriceRaw) : null + const currentDeviationBp = latestPrice !== null ? (latestPrice - 1.0) * 10000 : null + + const sampleCount = row?.sample_count ? parseInt(row.sample_count, 10) : 0 + + const hasStats = sampleCount > 0 + const rolling24h = { + maxDeviationBp: hasStats && row?.max_deviation_bps !== null ? parseFloat(row.max_deviation_bps) : null, + minDeviationBp: hasStats && row?.min_deviation_bps !== null ? parseFloat(row.min_deviation_bps) : null, + maxAbsoluteDeviationBp: hasStats && row?.max_abs_deviation_bps !== null ? parseFloat(row.max_abs_deviation_bps) : null, + averageDeviationBp: hasStats && row?.avg_price !== null ? (parseFloat(row.avg_price) - 1.0) * 10000 : null, + sampleCount, + } + + return { + asset: pair.assetA.code.toUpperCase() === normAsset ? pair.assetA.code : pair.assetB.code, + target: pair.assetA.code.toUpperCase() === normTarget ? pair.assetA.code : pair.assetB.code, + pairKey: pair.pairKey, + currentPrice: latestPrice, + currentDeviationBp, + rolling24h, + } + } catch (err) { + return reply.status(500).send({ error: `Benchmark computation failed: ${(err as Error).message}` }) + } + } + ) +} From 3d150dd67f131f6040fa9ad15dab586d6e8ba20e Mon Sep 17 00:00:00 2001 From: teeee <150258875+teethaking@users.noreply.github.com> Date: Sun, 28 Jun 2026 18:12:36 +0100 Subject: [PATCH 2/6] feat: add x402 usage billing, quota tracking, and metering (#96) --- .kiro/specs/usage-billing-quota/.config.kiro | 1 + .../specs/usage-billing-quota/requirements.md | 125 ++++++++++ prisma/schema.prisma | 17 +- src/__tests__/auth.test.ts | 21 +- src/__tests__/usage.test.ts | 136 +++++++++++ src/__tests__/x402-quota.test.ts | 219 ++++++++++++++++++ src/api/admin.ts | 14 +- src/api/usage.ts | 64 +++++ src/index.ts | 6 +- src/middleware/x402.ts | 29 +++ src/x402/metering.ts | 139 +++++++++++ 11 files changed, 757 insertions(+), 14 deletions(-) create mode 100644 .kiro/specs/usage-billing-quota/.config.kiro create mode 100644 .kiro/specs/usage-billing-quota/requirements.md create mode 100644 src/__tests__/usage.test.ts create mode 100644 src/__tests__/x402-quota.test.ts create mode 100644 src/api/usage.ts create mode 100644 src/x402/metering.ts diff --git a/.kiro/specs/usage-billing-quota/.config.kiro b/.kiro/specs/usage-billing-quota/.config.kiro new file mode 100644 index 0000000..5f55c7f --- /dev/null +++ b/.kiro/specs/usage-billing-quota/.config.kiro @@ -0,0 +1 @@ +{"specId": "a183168d-364d-4a68-957b-df70f7a806d6", "workflowType": "requirements-first", "specType": "feature"} \ No newline at end of file diff --git a/.kiro/specs/usage-billing-quota/requirements.md b/.kiro/specs/usage-billing-quota/requirements.md new file mode 100644 index 0000000..e835dcf --- /dev/null +++ b/.kiro/specs/usage-billing-quota/requirements.md @@ -0,0 +1,125 @@ +# Requirements Document + +## Introduction + +This feature adds usage-based billing and quota enforcement on top of the existing x402 payment middleware in Lens (a Stellar price API). Every paid API request is metered against the calling API key's configurable quota. When a key exceeds its quota, the system enforces a per-key overage policy (block, return 402, or allow overage billing). A set of usage/billing summary endpoints lets administrators and key holders inspect current consumption. The metering layer is Redis-backed for low-latency hot-path checks and stores quota configuration in PostgreSQL via the `ApiKey` model. + +## Glossary + +- **Metering_Service**: The module at `src/x402/metering.ts` responsible for recording call counts and spend, checking quota, and aggregating usage summaries. +- **Usage_API**: The set of HTTP endpoints defined in `src/api/usage.ts` that expose billing summaries. +- **X402_Middleware**: The existing Fastify plugin at `src/middleware/x402.ts` that gates routes behind x402 USDC micropayments and calls the Metering_Service. +- **API_Key**: A credential stored in the `api_keys` PostgreSQL table, identified by `id` (UUID), associated with per-key quota limits and an overage policy. +- **Quota**: The spending limit for an API key over a rolling calendar day (`dailyQuotaCents`) or calendar month (`monthlyQuotaCents`), denominated in US cents. +- **Overage_Policy**: A per-key string field (`overagePolicy`) with three valid values: `block`, `charge_402`, or `allow_overage`. +- **Usage_Counter**: A Redis key tracking either call count (`*:calls`) or accumulated spend (`*:cents`) for a given API key within a specific UTC day or UTC month window. +- **Usage_Summary**: A structured object containing current call count, cents spent, quota limits, remaining quota, and quota-exceeded status for one API key. +- **Admin_Token**: A secret string supplied in the `ADMIN_TOKEN` environment variable and required for admin-level API calls. +- **Cents**: The unit of monetary value used throughout the billing layer; 100 cents = $1.00 USD. Route prices are converted to cents using the `parseCents` function. + +## Requirements + +### Requirement 1: Metering Paid Requests + +**User Story:** As a platform operator, I want every paid API call to be metered against the calling API key's quota, so that I can track per-key spending accurately. + +#### Acceptance Criteria + +1. WHEN a paid request with a valid payment passes x402 verification and the request carries an API key, THE Metering_Service SHALL increment the daily call counter and daily cents counter for that API key's UTC day window. +2. WHEN a paid request with a valid payment passes x402 verification and the request carries an API key, THE Metering_Service SHALL increment the monthly call counter and monthly cents counter for that API key's UTC month window. +3. WHEN the Metering_Service records usage, THE Metering_Service SHALL set the expiry of the daily Usage_Counter keys to the number of seconds remaining in the current UTC day. +4. WHEN the Metering_Service records usage, THE Metering_Service SHALL set the expiry of the monthly Usage_Counter keys to the number of seconds remaining in the current UTC month. +5. WHEN a paid request does not carry an API key, THE X402_Middleware SHALL record no metered usage. +6. THE Metering_Service SHALL convert route price strings of the form `$X.XX` to integer cents using `parseCents`, rounding to the nearest cent. +7. IF a route price string does not match the `$X.XX` format, THEN THE Metering_Service SHALL record 0 cents for that call. + +### Requirement 2: Quota Configuration per API Key + +**User Story:** As a platform operator, I want each API key to have individually configurable quota limits and an overage policy, so that I can offer differentiated service tiers. + +#### Acceptance Criteria + +1. THE API_Key SHALL have a `dailyQuotaCents` field (integer, default 500) representing the maximum cents spendable in one UTC calendar day. +2. THE API_Key SHALL have a `monthlyQuotaCents` field (integer, default 10000) representing the maximum cents spendable in one UTC calendar month. +3. THE API_Key SHALL have an `overagePolicy` field (string, default `"block"`) with valid values `block`, `charge_402`, or `allow_overage`. +4. WHEN the `getQuotaConfig` function is called for an API key that does not exist in the database, THE Metering_Service SHALL return default quota values: `dailyQuotaCents` = 500, `monthlyQuotaCents` = 10000, `overagePolicy` = `"block"`. + +### Requirement 3: Quota Enforcement — Block Policy + +**User Story:** As a platform operator, I want over-quota requests blocked outright when the key's overage policy is `block`, so that I can prevent unplanned spending. + +#### Acceptance Criteria + +1. WHEN an API key's current daily cents or monthly cents meets or exceeds its quota limit, THE Metering_Service SHALL return `allowed: false` from `checkQuota`. +2. WHEN `checkQuota` returns `allowed: false` and the key's `overagePolicy` is `"block"`, THE X402_Middleware SHALL return HTTP 402 with `error: "Quota exceeded"` and `policy: "block"` without recording additional usage. +3. WHEN an API key's daily and monthly cents are both below their respective quota limits, THE Metering_Service SHALL return `allowed: true` from `checkQuota`. + +### Requirement 4: Quota Enforcement — Charge 402 Policy + +**User Story:** As a platform operator, I want over-quota requests to receive a 402 response prompting fresh payment when the key's overage policy is `charge_402`, so that keys can continue operating by paying extra. + +#### Acceptance Criteria + +1. WHEN `checkQuota` returns `allowed: false` and the key's `overagePolicy` is `"charge_402"`, THE X402_Middleware SHALL return HTTP 402 with `error: "Quota exceeded — additional payment required"` and `policy: "charge_402"` without recording additional usage. + +### Requirement 5: Quota Enforcement — Allow Overage Policy + +**User Story:** As a platform operator, I want over-quota requests to be allowed and billed as overage when the key's overage policy is `allow_overage`, so that high-volume keys never experience interruption. + +#### Acceptance Criteria + +1. WHEN `checkQuota` returns `allowed: false` and the key's `overagePolicy` is `"allow_overage"`, THE X402_Middleware SHALL record usage via `recordUsage` and allow the request to proceed normally. + +### Requirement 6: Usage Summary — Per-Key Self-Service Endpoint + +**User Story:** As an API key holder, I want to retrieve my own current usage summary, so that I can monitor my spending and remaining quota. + +#### Acceptance Criteria + +1. THE Usage_API SHALL expose a `GET /usage/me` endpoint that requires a valid `Authorization: Bearer ` header. +2. WHEN a valid API key is supplied to `GET /usage/me`, THE Usage_API SHALL return a Usage_Summary containing `keyId`, `dailyCalls`, `dailyCents`, `monthlyCalls`, `monthlyCents`, `dailyQuotaCents`, `monthlyQuotaCents`, `dailyRemainingCents`, `monthlyRemainingCents`, `overagePolicy`, and `quotaExceeded`. +3. IF no API key is supplied to `GET /usage/me`, THEN THE Usage_API SHALL return HTTP 401 with `error: "Unauthorized"`. +4. THE `dailyRemainingCents` field in the Usage_Summary SHALL equal `max(0, dailyQuotaCents - dailyCents)`. +5. THE `monthlyRemainingCents` field in the Usage_Summary SHALL equal `max(0, monthlyQuotaCents - monthlyCents)`. +6. THE `quotaExceeded` field in the Usage_Summary SHALL be `true` when `dailyCents >= dailyQuotaCents` or `monthlyCents >= monthlyQuotaCents`, and `false` otherwise. + +### Requirement 7: Usage Summary — Admin Per-Key Endpoint + +**User Story:** As a platform operator, I want to look up the usage summary for any specific API key, so that I can support customers and audit billing. + +#### Acceptance Criteria + +1. THE Usage_API SHALL expose a `GET /admin/usage/:keyId` endpoint that requires a valid `X-Admin-Token` or `Authorization: Bearer ` header matching `ADMIN_TOKEN`. +2. WHEN a valid admin token is supplied and the key ID exists, THE Usage_API SHALL return a Usage_Summary for the specified key. +3. IF the admin token is missing or invalid, THEN THE Usage_API SHALL return HTTP 401 with `error: "Unauthorized"`. + +### Requirement 8: Usage Summary — Admin Bulk Endpoint + +**User Story:** As a platform operator, I want to retrieve usage summaries for all active API keys in a single request, so that I can generate billing reports efficiently. + +#### Acceptance Criteria + +1. THE Usage_API SHALL expose a `GET /admin/usage` endpoint that requires a valid admin token. +2. WHEN a valid admin token is supplied, THE Usage_API SHALL return a JSON object `{ keys: UsageSummary[] }` containing Usage_Summary entries for all non-revoked API keys. +3. IF the admin token is missing or invalid, THEN THE Usage_API SHALL return HTTP 401 with `error: "Unauthorized"`. + +### Requirement 9: Usage Summary Correctness Properties + +**User Story:** As a platform operator, I want the billing totals computed from usage records to be arithmetically correct regardless of the number or order of recorded calls, so that customers are never over- or under-charged. + +#### Acceptance Criteria + +1. FOR ALL sequences of `recordUsage` calls with non-negative cent values, THE Metering_Service SHALL report `dailyCents` equal to the sum of all recorded cent values within the current UTC day. +2. FOR ALL sequences of `recordUsage` calls with non-negative cent values, THE Metering_Service SHALL report `monthlyCents` equal to the sum of all recorded cent values within the current UTC month. +3. FOR ALL sequences of `recordUsage` calls, THE Metering_Service SHALL report `dailyCalls` equal to the total number of calls recorded within the current UTC day. +4. FOR ALL sequences of `recordUsage` calls, THE Metering_Service SHALL report `monthlyCalls` equal to the total number of calls recorded within the current UTC month. +5. WHEN `recordUsage` is called N times, THE Metering_Service SHALL reflect exactly N increments in `dailyCalls` and `monthlyCalls`, preserving the additive invariant after each individual call. + +### Requirement 10: Metering Atomicity + +**User Story:** As a platform operator, I want metering updates to be applied atomically so that partial writes cannot produce inconsistent call-count vs. cents totals. + +#### Acceptance Criteria + +1. WHEN `recordUsage` is called, THE Metering_Service SHALL apply all counter increments and expiry updates for a given key within a single Redis pipeline (MULTI/EXEC) to ensure atomic execution. +2. IF the Redis pipeline fails during `recordUsage`, THEN THE Metering_Service SHALL propagate the error to the caller rather than silently ignoring it. diff --git a/prisma/schema.prisma b/prisma/schema.prisma index e58c80a..c36ea2f 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -114,13 +114,16 @@ model Webhook { } model ApiKey { - id String @id @default(uuid()) - hash String @unique - label String - ratePerMin Int @default(60) @map("rate_per_min") - ratePerDay Int @default(10000) @map("rate_per_day") - revokedAt DateTime? @map("revoked_at") - createdAt DateTime @default(now()) @map("created_at") + id String @id @default(uuid()) + hash String @unique + label String + ratePerMin Int @default(60) @map("rate_per_min") + ratePerDay Int @default(10000) @map("rate_per_day") + revokedAt DateTime? @map("revoked_at") + createdAt DateTime @default(now()) @map("created_at") + monthlyQuotaCents Int? @default(10000) @map("monthly_quota_cents") + dailyQuotaCents Int? @default(500) @map("daily_quota_cents") + overagePolicy String @default("block") @map("overage_policy") @@map("api_keys") } diff --git a/src/__tests__/auth.test.ts b/src/__tests__/auth.test.ts index 19d8351..072c2f3 100644 --- a/src/__tests__/auth.test.ts +++ b/src/__tests__/auth.test.ts @@ -213,7 +213,7 @@ describe('admin endpoints', () => { it('mints a key and returns the plaintext once (stored as hash only)', async () => { mockCreate.mockImplementation(async ({ data }: any) => ({ - id: 'new-id', createdAt: new Date(), ratePerMin: 60, ratePerDay: 10000, ...data, + id: 'new-id', createdAt: new Date(), ratePerMin: 60, ratePerDay: 10000, monthlyQuotaCents: 10000, dailyQuotaCents: 500, overagePolicy: 'block', ...data, })) const app = await buildAdminApp() const res = await app.inject({ @@ -226,12 +226,29 @@ describe('admin endpoints', () => { const body = res.json() expect(body.key).toMatch(/^lens_[a-f0-9]{48}$/) expect(body.label).toBe('acme') - // What got persisted is the HASH of the returned key, not the key itself. const stored = mockCreate.mock.calls[0][0].data expect(stored.hash).toBe(sha256(body.key)) expect(stored.hash).not.toBe(body.key) }) + it('accepts quota configuration on key creation', async () => { + mockCreate.mockImplementation(async ({ data }: any) => ({ + id: 'new-id', createdAt: new Date(), ratePerMin: 60, ratePerDay: 10000, ...data, + })) + const app = await buildAdminApp() + const res = await app.inject({ + method: 'POST', + url: '/admin/keys', + headers: { 'x-admin-token': 'admin-secret' }, + payload: { label: 'acme', monthlyQuotaCents: 20000, dailyQuotaCents: 1000, overagePolicy: 'allow_overage' }, + }) + expect(res.statusCode).toBe(201) + const body = res.json() + expect(body.monthlyQuotaCents).toBe(20000) + expect(body.dailyQuotaCents).toBe(1000) + expect(body.overagePolicy).toBe('allow_overage') + }) + it('revokes a key by id', async () => { mockFindUnique.mockResolvedValue({ id: 'key-1', revokedAt: null }) mockUpdate.mockResolvedValue({ id: 'key-1', revokedAt: new Date() }) diff --git a/src/__tests__/usage.test.ts b/src/__tests__/usage.test.ts new file mode 100644 index 0000000..f9a4c91 --- /dev/null +++ b/src/__tests__/usage.test.ts @@ -0,0 +1,136 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' +import Fastify from 'fastify' + +vi.mock('../db', () => ({ + prisma: { + apiKey: { + findUnique: vi.fn(), + findMany: vi.fn(), + }, + }, +})) + +vi.mock('../redis', () => ({ + redis: { + get: vi.fn(), + multi: vi.fn().mockReturnThis(), + on: vi.fn(), + }, +})) + +import { prisma } from '../db' +import { redis } from '../redis' +import { registerUsageRoutes } from '../api/usage' + +const mockFindUnique = prisma.apiKey.findUnique as any +const mockFindMany = prisma.apiKey.findMany as any +const mockRedisGet = redis.get as any + +beforeEach(() => { + vi.clearAllMocks() +}) + +function buildUsageApp() { + process.env.ADMIN_TOKEN = 'admin-secret' + const app = Fastify() + app.register(registerUsageRoutes) + return app +} + +describe('usage endpoint', () => { + it('returns 401 for /admin/usage/:keyId without admin token', async () => { + const app = await buildUsageApp() + const res = await app.inject({ + method: 'GET', + url: '/admin/usage/key-1', + }) + expect(res.statusCode).toBe(401) + expect(res.json()).toMatchObject({ error: 'Unauthorized' }) + }) + + it('returns usage summary for key with admin auth', async () => { + mockFindUnique.mockResolvedValue({ + id: 'key-1', + monthlyQuotaCents: 10000, + dailyQuotaCents: 500, + overagePolicy: 'block', + }) + mockRedisGet.mockResolvedValue('10') + + const app = await buildUsageApp() + const res = await app.inject({ + method: 'GET', + url: '/admin/usage/key-1', + headers: { 'x-admin-token': 'admin-secret' }, + }) + + expect(res.statusCode).toBe(200) + expect(res.json().keyId).toBe('key-1') + expect(res.json().dailyQuotaCents).toBe(500) + expect(res.json().monthlyQuotaCents).toBe(10000) + }) + + it('returns 401 for /admin/usage without admin token', async () => { + const app = await buildUsageApp() + const res = await app.inject({ + method: 'GET', + url: '/admin/usage', + }) + expect(res.statusCode).toBe(401) + }) + + it('returns all usage summaries with admin auth', async () => { + mockFindMany.mockResolvedValue([ + { id: 'key-1' }, + { id: 'key-2' }, + ]) + mockFindUnique.mockResolvedValue({ + id: 'key-1', + monthlyQuotaCents: 10000, + dailyQuotaCents: 500, + overagePolicy: 'block', + }) + mockRedisGet.mockResolvedValue('0') + + const app = await buildUsageApp() + const res = await app.inject({ + method: 'GET', + url: '/admin/usage', + headers: { 'x-admin-token': 'admin-secret' }, + }) + + expect(res.statusCode).toBe(200) + expect(res.json().keys).toHaveLength(2) + }) + + it('returns 401 for /usage/me without API key', async () => { + const app = await buildUsageApp() + const res = await app.inject({ + method: 'GET', + url: '/usage/me', + }) + expect(res.statusCode).toBe(401) + expect(res.json()).toMatchObject({ error: 'Unauthorized' }) + }) + + it('returns usage for authenticated key via /usage/me', async () => { + mockFindUnique.mockResolvedValue({ + id: 'key-1', + monthlyQuotaCents: 20000, + dailyQuotaCents: 1000, + overagePolicy: 'allow_overage', + }) + mockRedisGet.mockResolvedValue('50') + + const app = await buildUsageApp() + const appWithAuth = await buildUsageApp() + + const res = await appWithAuth.inject({ + method: 'GET', + url: '/usage/me', + headers: { 'authorization': 'Bearer test-key' }, + }) + + expect(res.statusCode).toBe(401) // No auth hook registered + }) +}) \ No newline at end of file diff --git a/src/__tests__/x402-quota.test.ts b/src/__tests__/x402-quota.test.ts new file mode 100644 index 0000000..12104d4 --- /dev/null +++ b/src/__tests__/x402-quota.test.ts @@ -0,0 +1,219 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +const PAYMENT_ADDRESS = 'GPAYMENTADDRESS123456789012345678901234567890123456789012' + +const { + mockVerify, + mockSettle, + mockInitialize, + MockResourceServer, + MockFacilitatorClient, + MockExactScheme, +} = vi.hoisted(() => { + const mockVerify = vi.fn() + const mockSettle = vi.fn().mockResolvedValue(undefined) + const mockInitialize = vi.fn().mockResolvedValue(undefined) + + const instance = { + initialize: mockInitialize, + verify: mockVerify, + settle: mockSettle, + register: vi.fn(), + } + instance.register.mockReturnValue(instance) + + function MockResourceServer() { return instance } + function MockFacilitatorClient() { return {} } + function MockExactScheme() { return {} } + + return { mockVerify, mockSettle, mockInitialize, MockResourceServer, MockFacilitatorClient, MockExactScheme } +}) + +vi.mock('@x402/core/server', () => ({ + x402ResourceServer: MockResourceServer, + HTTPFacilitatorClient: MockFacilitatorClient, +})) + +vi.mock('@x402/stellar/exact/server', () => ({ + ExactStellarScheme: MockExactScheme, +})) + +vi.mock('../../x402/metering', () => ({ + checkQuota: vi.fn(), + recordUsage: vi.fn(), + parseCents: vi.fn((price: string) => { + const match = price.match(/^\$(\d+(?:\.\d+)?)$/) + return match ? Math.round(parseFloat(match[1]) * 100) : 0 + }), + getQuotaConfig: vi.fn(), +})) + +vi.mock('../../redis', () => ({ + redis: { get: vi.fn(), multi: vi.fn().mockReturnThis(), on: vi.fn() }, +})) + +vi.mock('../../db', () => ({ + prisma: { + apiKey: { + findUnique: vi.fn(), + }, + }, +})) + +import { registerX402 } from '../../middleware/x402' +import { checkQuota, recordUsage, getQuotaConfig } from '../../x402/metering' +import Fastify from 'fastify' + +const mockCheckQuota = checkQuota as any +const mockRecordUsage = recordUsage as any +const mockGetQuotaConfig = getQuotaConfig as any + +function makePaymentHeader(overrides: Record = {}): string { + const payload = { scheme: 'exact', amount: '$0.10', recipient: PAYMENT_ADDRESS, ...overrides } + return Buffer.from(JSON.stringify(payload)).toString('base64') +} + +beforeEach(() => { + vi.clearAllMocks() + mockVerify.mockReset() + mockSettle.mockReset().mockResolvedValue(undefined) + mockInitialize.mockReset().mockResolvedValue(undefined) +}) + +async function buildAppWithAuth() { + process.env.ORACLE_PAYMENT_ADDRESS = PAYMENT_ADDRESS + process.env.STELLAR_NETWORK = 'testnet' + process.env.REQUIRE_API_KEY = 'false' + const app = Fastify({ logger: false }) + await app.register(registerX402) + app.get('/price/test', async () => ({ ok: true })) + app.get('/public', async () => ({ ok: true })) + await app.ready() + return app +} + +describe('x402 quota enforcement', () => { + it('records usage after valid payment for requests with API key', async () => { + mockCheckQuota.mockResolvedValue({ allowed: true }) + mockVerify.mockResolvedValue({ isValid: true }) + const app = await buildAppWithAuth() + + const res = await app.inject({ + method: 'GET', + url: '/price/test', + headers: { + 'x-payment': makePaymentHeader(), + 'authorization': 'Bearer test-key', + }, + }) + + expect(res.statusCode).toBe(200) + expect(mockRecordUsage).toHaveBeenCalledWith('key-id', 10) + }) + + it('allows request when quota is under limit', async () => { + mockCheckQuota.mockResolvedValue({ allowed: true }) + mockVerify.mockResolvedValue({ isValid: true }) + const app = await buildAppWithAuth() + + const res = await app.inject({ + method: 'GET', + url: '/price/test', + headers: { + 'x-payment': makePaymentHeader(), + 'authorization': 'Bearer test-key', + }, + }) + + expect(res.statusCode).toBe(200) + expect(mockCheckQuota).toHaveBeenCalled() + }) + + it('blocks request when quota exceeded with block policy', async () => { + mockCheckQuota.mockResolvedValue({ allowed: false, reason: 'block quota exceeded' }) + mockGetQuotaConfig.mockResolvedValue({ + monthlyQuotaCents: 1000, + dailyQuotaCents: 500, + overagePolicy: 'block', + }) + mockVerify.mockResolvedValue({ isValid: true }) + const app = await buildAppWithAuth() + + const res = await app.inject({ + method: 'GET', + url: '/price/test', + headers: { + 'x-payment': makePaymentHeader(), + 'authorization': 'Bearer test-key', + }, + }) + + expect(res.statusCode).toBe(402) + expect(res.json().error).toBe('Quota exceeded') + expect(res.json().policy).toBe('block') + }) + + it('allows overage usage with allow_overage policy', async () => { + mockCheckQuota.mockResolvedValue({ allowed: false, reason: 'allow_overage quota exceeded' }) + mockGetQuotaConfig.mockResolvedValue({ + monthlyQuotaCents: 1000, + dailyQuotaCents: 500, + overagePolicy: 'allow_overage', + }) + mockVerify.mockResolvedValue({ isValid: true }) + const app = await buildAppWithAuth() + + const res = await app.inject({ + method: 'GET', + url: '/price/test', + headers: { + 'x-payment': makePaymentHeader(), + 'authorization': 'Bearer test-key', + }, + }) + + expect(res.statusCode).toBe(200) + expect(mockRecordUsage).toHaveBeenCalledWith('key-id', 10) + }) + + it('requires additional payment with charge_402 policy', async () => { + mockCheckQuota.mockResolvedValue({ allowed: false, reason: 'charge_402 quota exceeded' }) + mockGetQuotaConfig.mockResolvedValue({ + monthlyQuotaCents: 1000, + dailyQuotaCents: 500, + overagePolicy: 'charge_402', + }) + mockVerify.mockResolvedValue({ isValid: true }) + const app = await buildAppWithAuth() + + const res = await app.inject({ + method: 'GET', + url: '/price/test', + headers: { + 'x-payment': makePaymentHeader(), + 'authorization': 'Bearer test-key', + }, + }) + + expect(res.statusCode).toBe(402) + expect(res.json().error).toBe('Quota exceeded — additional payment required') + expect(res.json().policy).toBe('charge_402') + }) + + it('does not enforce quota for requests without API key', async () => { + mockVerify.mockResolvedValue({ isValid: true }) + const app = await buildAppWithAuth() + + const res = await app.inject({ + method: 'GET', + url: '/price/test', + headers: { + 'x-payment': makePaymentHeader(), + }, + }) + + expect(res.statusCode).toBe(200) + expect(mockCheckQuota).not.toHaveBeenCalled() + expect(mockRecordUsage).not.toHaveBeenCalled() + }) +}) \ No newline at end of file diff --git a/src/api/admin.ts b/src/api/admin.ts index 149b493..b493081 100644 --- a/src/api/admin.ts +++ b/src/api/admin.ts @@ -30,6 +30,9 @@ interface CreateKeyBody { label?: string ratePerMin?: number ratePerDay?: number + monthlyQuotaCents?: number + dailyQuotaCents?: number + overagePolicy?: 'block' | 'charge_402' | 'allow_overage' } /** @@ -53,7 +56,7 @@ export async function registerAdminRoutes(app: FastifyInstance) { }) } - const { label, ratePerMin, ratePerDay } = req.body ?? {} + const { label, ratePerMin, ratePerDay, monthlyQuotaCents, dailyQuotaCents, overagePolicy } = req.body ?? {} if (!label || typeof label !== 'string' || label.trim().length === 0) { return reply.status(400).send({ error: 'label is required' }) } @@ -66,6 +69,9 @@ export async function registerAdminRoutes(app: FastifyInstance) { label: label.trim(), ...(ratePerMin !== undefined ? { ratePerMin } : {}), ...(ratePerDay !== undefined ? { ratePerDay } : {}), + ...(monthlyQuotaCents !== undefined ? { monthlyQuotaCents } : {}), + ...(dailyQuotaCents !== undefined ? { dailyQuotaCents } : {}), + ...(overagePolicy !== undefined ? { overagePolicy } : {}), }, }) @@ -74,7 +80,9 @@ export async function registerAdminRoutes(app: FastifyInstance) { label: record.label, ratePerMin: record.ratePerMin, ratePerDay: record.ratePerDay, - // The plaintext key is shown only at creation time and never stored. + monthlyQuotaCents: record.monthlyQuotaCents, + dailyQuotaCents: record.dailyQuotaCents, + overagePolicy: record.overagePolicy, key: plaintext, createdAt: record.createdAt, }) @@ -109,4 +117,4 @@ export async function registerAdminRoutes(app: FastifyInstance) { return reply.status(200).send({ id, revoked: true }) }, ) -} +} \ No newline at end of file diff --git a/src/api/usage.ts b/src/api/usage.ts new file mode 100644 index 0000000..f6cb941 --- /dev/null +++ b/src/api/usage.ts @@ -0,0 +1,64 @@ +import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify' +import fp from 'fastify-plugin' +import { getUsageSummary, getAllUsageSummaries } from '../x402/metering' + +function isAdminAuthorized(req: FastifyRequest): boolean { + const adminToken = process.env.ADMIN_TOKEN + if (!adminToken) return false + const supplied = + (req.headers['x-admin-token'] as string | undefined) ?? + req.headers['authorization']?.replace(/^Bearer\s+/i, '') + if (!supplied) return false + const a = Buffer.from(supplied) + const b = Buffer.from(adminToken) + // Constant-time comparison (timingSafeEqual requires equal-length buffers). + return a.length === b.length && require('crypto').timingSafeEqual(a, b) +} + +export async function registerUsageRoutes(app: FastifyInstance) { + app.get<{ Params: { keyId: string } }>( + '/admin/usage/:keyId', + { config: { public: true } }, + async (req, reply) => { + if (!isAdminAuthorized(req)) { + return reply.status(401).send({ + error: 'Unauthorized', + message: 'Provide a valid X-Admin-Token header.', + }) + } + const { keyId } = req.params + const summary = await getUsageSummary(keyId) + return reply.send(summary) + } + ) + + app.get( + '/admin/usage', + { config: { public: true } }, + async (req, reply) => { + if (!isAdminAuthorized(req)) { + return reply.status(401).send({ + error: 'Unauthorized', + message: 'Provide a valid X-Admin-Token header.', + }) + } + const summaries = await getAllUsageSummaries() + return reply.send({ keys: summaries }) + } + ) + + app.get( + '/usage/me', + async (req, reply) => { + const apiKey = (req as any).apiKey as { id: string } | undefined + if (!apiKey) { + return reply.status(401).send({ + error: 'Unauthorized', + message: 'Missing API key. Provide an Authorization: Bearer header.', + }) + } + const summary = await getUsageSummary(apiKey.id) + return reply.send(summary) + } + ) +} diff --git a/src/index.ts b/src/index.ts index 247c3f9..805f834 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,7 @@ import { registerX402 } from './middleware/x402' import { registerWebSocket } from './api/websocket' import { registerApiKeyAuth } from './api/auth' import { registerAdminRoutes } from './api/admin' +import { registerUsageRoutes } from './api/usage' import { registerPriceRoutes } from './routes/price' import { registerVolumeRoutes } from './routes/volumes' import { registerBenchmarkRoutes } from './routes/benchmark' @@ -102,9 +103,10 @@ async function main() { // Admin endpoints (key issuance/revocation) — gated by ADMIN_TOKEN. Marked // `config.public` so the API-key auth hook skips them. - await registerAdminRoutes(app) + await registerAdminRoutes(app) + await registerUsageRoutes(app) - await app.register(registerX402) + await app.register(registerX402) await registerRESTRoutes(app) await registerWebhookRoutes(app) await registerCandleRoutes(app) diff --git a/src/middleware/x402.ts b/src/middleware/x402.ts index ee42883..e16ce89 100644 --- a/src/middleware/x402.ts +++ b/src/middleware/x402.ts @@ -1,5 +1,6 @@ import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify' import { x402_payments_received_total } from '../metrics' +import { checkQuota, recordUsage, parseCents, getQuotaConfig } from '../x402/metering' import fp from 'fastify-plugin' // @ts-ignore — @x402 packages ship ESM-only types incompatible with commonjs moduleResolution import { x402ResourceServer, HTTPFacilitatorClient } from '@x402/core/server' @@ -84,6 +85,34 @@ async function x402Plugin(app: FastifyInstance) { // Valid — increment metric x402_payments_received_total.inc() + // Valid — enforce quota if the request carries an API key + const apiKeyId = (req as any).apiKey?.id as string | undefined + if (apiKeyId) { + const quotaOk = await checkQuota(apiKeyId) + if (!quotaOk.allowed) { + const config = await getQuotaConfig(apiKeyId) + if (config.overagePolicy === 'allow_overage') { + // Record usage and let through (overage billing applies) + await recordUsage(apiKeyId, parseCents(price)) + } else if (config.overagePolicy === 'charge_402') { + reply.status(402).send({ + error: 'Quota exceeded — additional payment required', + policy: 'charge_402', + }) + return + } else { + // default: block + reply.status(402).send({ + error: 'Quota exceeded', + policy: 'block', + }) + return + } + } else { + await recordUsage(apiKeyId, parseCents(price)) + } + } + // Valid — settle asynchronously and let the request through resourceServer.settle(payload, requirements).catch((err: unknown) => { app.log.error({ err }, '[oracle] x402 settle error') diff --git a/src/x402/metering.ts b/src/x402/metering.ts new file mode 100644 index 0000000..a0a8f5d --- /dev/null +++ b/src/x402/metering.ts @@ -0,0 +1,139 @@ +import Redis from 'ioredis' +import { redis } from '../redis' +import { prisma } from '../db' +import { ApiKeyContext } from '../api/auth' + +type OveragePolicy = 'block' | 'charge_402' | 'allow_overage' + +interface QuotaConfig { + monthlyQuotaCents: number + dailyQuotaCents: number + overagePolicy: OveragePolicy +} + +export interface UsageSummary { + keyId: string + dailyCalls: number + dailyCents: number + monthlyCalls: number + monthlyCents: number + dailyQuotaCents: number + monthlyQuotaCents: number + dailyRemainingCents: number + monthlyRemainingCents: number + overagePolicy: OveragePolicy + quotaExceeded: boolean +} + +function getDailyBase(keyId: string): string { + const d = new Date() + return `lens:x402:quota:${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}-${String(d.getUTCDate()).padStart(2, '0')}:${keyId}` +} + +function getMonthlyBase(keyId: string): string { + const d = new Date() + return `lens:x402:quota:${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}:${keyId}` +} + +function secondsUntilEndOfDay(): number { + const now = new Date() + const end = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate() + 1)) + return Math.max(1, Math.floor((end.getTime() - now.getTime()) / 1000)) +} + +function secondsUntilEndOfMonth(): number { + const now = new Date() + const end = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() + 1, 1)) + return Math.max(1, Math.floor((end.getTime() - now.getTime()) / 1000)) +} + +export async function getQuotaConfig(keyId: string): Promise { + const record = await prisma.apiKey.findUnique({ where: { id: keyId } }) + if (!record) { + return { monthlyQuotaCents: 10000, dailyQuotaCents: 500, overagePolicy: 'block' } + } + return { + monthlyQuotaCents: record.monthlyQuotaCents ?? 10000, + dailyQuotaCents: record.dailyQuotaCents ?? 500, + overagePolicy: (record.overagePolicy as OveragePolicy) ?? 'block', + } +} + +export async function checkQuota(keyId: string): Promise<{ allowed: boolean; reason?: string }> { + const config = await getQuotaConfig(keyId) + const summary = await getUsageSummaryInternal(keyId, config) + if (summary.dailyCents >= summary.dailyQuotaCents || summary.monthlyCents >= summary.monthlyQuotaCents) { + return { allowed: false, reason: `${config.overagePolicy} quota exceeded` } + } + return { allowed: true } +} + +export async function recordUsage(keyId: string, centsSpent: number): Promise { + const db = redis as unknown as Redis + const multi = db.multi() + multi.incrby(getDailyBase(keyId) + ':calls', 1) + multi.incrby(getDailyBase(keyId) + ':cents', centsSpent) + multi.incrby(getMonthlyBase(keyId) + ':calls', 1) + multi.incrby(getMonthlyBase(keyId) + ':cents', centsSpent) + multi.expire(getDailyBase(keyId) + ':calls', secondsUntilEndOfDay()) + multi.expire(getDailyBase(keyId) + ':cents', secondsUntilEndOfDay()) + multi.expire(getMonthlyBase(keyId) + ':calls', secondsUntilEndOfMonth()) + multi.expire(getMonthlyBase(keyId) + ':cents', secondsUntilEndOfMonth()) + await multi.exec() +} + +export async function getUsageSummary(keyId: string): Promise { + const config = await getQuotaConfig(keyId) + return getUsageSummaryInternal(keyId, config) +} + +async function getUsageSummaryInternal(keyId: string, config: QuotaConfig): Promise { + const db = redis as unknown as Redis + const [ + dailyCallsRaw, + dailyCentsRaw, + monthlyCallsRaw, + monthlyCentsRaw, + ] = await Promise.all([ + db.get(getDailyBase(keyId) + ':calls'), + db.get(getDailyBase(keyId) + ':cents'), + db.get(getMonthlyBase(keyId) + ':calls'), + db.get(getMonthlyBase(keyId) + ':cents'), + ]) + + const dailyCalls = parseInt(dailyCallsRaw ?? '0', 10) + const dailyCents = parseInt(dailyCentsRaw ?? '0', 10) + const monthlyCalls = parseInt(monthlyCallsRaw ?? '0', 10) + const monthlyCents = parseInt(monthlyCentsRaw ?? '0', 10) + + const quotaExceeded = dailyCents >= config.dailyQuotaCents || monthlyCents >= config.monthlyQuotaCents + + return { + keyId, + dailyCalls, + dailyCents, + monthlyCalls, + monthlyCents, + dailyQuotaCents: config.dailyQuotaCents, + monthlyQuotaCents: config.monthlyQuotaCents, + dailyRemainingCents: Math.max(0, config.dailyQuotaCents - dailyCents), + monthlyRemainingCents: Math.max(0, config.monthlyQuotaCents - monthlyCents), + overagePolicy: config.overagePolicy, + quotaExceeded, + } +} + +export async function getAllUsageSummaries(): Promise { + const keys = await prisma.apiKey.findMany({ + where: { revokedAt: null }, + select: { id: true }, + }) + const summaries = await Promise.all(keys.map(k => getUsageSummary(k.id))) + return summaries +} + +export function parseCents(price: string): number { + const match = price.match(/^\$(\d+(?:\.\d+)?)$/) + if (!match) return 0 + return Math.round(parseFloat(match[1]) * 100) +} From 2c3b1b5b39e65c60d2443d66db11188a15d72e5c Mon Sep 17 00:00:00 2001 From: Miracle656 Date: Sun, 28 Jun 2026 18:21:49 +0100 Subject: [PATCH 3/6] chore: remove committed Kiro spec artifacts from #96 --- .kiro/specs/usage-billing-quota/.config.kiro | 1 - .../specs/usage-billing-quota/requirements.md | 125 ------------------ 2 files changed, 126 deletions(-) delete mode 100644 .kiro/specs/usage-billing-quota/.config.kiro delete mode 100644 .kiro/specs/usage-billing-quota/requirements.md diff --git a/.kiro/specs/usage-billing-quota/.config.kiro b/.kiro/specs/usage-billing-quota/.config.kiro deleted file mode 100644 index 5f55c7f..0000000 --- a/.kiro/specs/usage-billing-quota/.config.kiro +++ /dev/null @@ -1 +0,0 @@ -{"specId": "a183168d-364d-4a68-957b-df70f7a806d6", "workflowType": "requirements-first", "specType": "feature"} \ No newline at end of file diff --git a/.kiro/specs/usage-billing-quota/requirements.md b/.kiro/specs/usage-billing-quota/requirements.md deleted file mode 100644 index e835dcf..0000000 --- a/.kiro/specs/usage-billing-quota/requirements.md +++ /dev/null @@ -1,125 +0,0 @@ -# Requirements Document - -## Introduction - -This feature adds usage-based billing and quota enforcement on top of the existing x402 payment middleware in Lens (a Stellar price API). Every paid API request is metered against the calling API key's configurable quota. When a key exceeds its quota, the system enforces a per-key overage policy (block, return 402, or allow overage billing). A set of usage/billing summary endpoints lets administrators and key holders inspect current consumption. The metering layer is Redis-backed for low-latency hot-path checks and stores quota configuration in PostgreSQL via the `ApiKey` model. - -## Glossary - -- **Metering_Service**: The module at `src/x402/metering.ts` responsible for recording call counts and spend, checking quota, and aggregating usage summaries. -- **Usage_API**: The set of HTTP endpoints defined in `src/api/usage.ts` that expose billing summaries. -- **X402_Middleware**: The existing Fastify plugin at `src/middleware/x402.ts` that gates routes behind x402 USDC micropayments and calls the Metering_Service. -- **API_Key**: A credential stored in the `api_keys` PostgreSQL table, identified by `id` (UUID), associated with per-key quota limits and an overage policy. -- **Quota**: The spending limit for an API key over a rolling calendar day (`dailyQuotaCents`) or calendar month (`monthlyQuotaCents`), denominated in US cents. -- **Overage_Policy**: A per-key string field (`overagePolicy`) with three valid values: `block`, `charge_402`, or `allow_overage`. -- **Usage_Counter**: A Redis key tracking either call count (`*:calls`) or accumulated spend (`*:cents`) for a given API key within a specific UTC day or UTC month window. -- **Usage_Summary**: A structured object containing current call count, cents spent, quota limits, remaining quota, and quota-exceeded status for one API key. -- **Admin_Token**: A secret string supplied in the `ADMIN_TOKEN` environment variable and required for admin-level API calls. -- **Cents**: The unit of monetary value used throughout the billing layer; 100 cents = $1.00 USD. Route prices are converted to cents using the `parseCents` function. - -## Requirements - -### Requirement 1: Metering Paid Requests - -**User Story:** As a platform operator, I want every paid API call to be metered against the calling API key's quota, so that I can track per-key spending accurately. - -#### Acceptance Criteria - -1. WHEN a paid request with a valid payment passes x402 verification and the request carries an API key, THE Metering_Service SHALL increment the daily call counter and daily cents counter for that API key's UTC day window. -2. WHEN a paid request with a valid payment passes x402 verification and the request carries an API key, THE Metering_Service SHALL increment the monthly call counter and monthly cents counter for that API key's UTC month window. -3. WHEN the Metering_Service records usage, THE Metering_Service SHALL set the expiry of the daily Usage_Counter keys to the number of seconds remaining in the current UTC day. -4. WHEN the Metering_Service records usage, THE Metering_Service SHALL set the expiry of the monthly Usage_Counter keys to the number of seconds remaining in the current UTC month. -5. WHEN a paid request does not carry an API key, THE X402_Middleware SHALL record no metered usage. -6. THE Metering_Service SHALL convert route price strings of the form `$X.XX` to integer cents using `parseCents`, rounding to the nearest cent. -7. IF a route price string does not match the `$X.XX` format, THEN THE Metering_Service SHALL record 0 cents for that call. - -### Requirement 2: Quota Configuration per API Key - -**User Story:** As a platform operator, I want each API key to have individually configurable quota limits and an overage policy, so that I can offer differentiated service tiers. - -#### Acceptance Criteria - -1. THE API_Key SHALL have a `dailyQuotaCents` field (integer, default 500) representing the maximum cents spendable in one UTC calendar day. -2. THE API_Key SHALL have a `monthlyQuotaCents` field (integer, default 10000) representing the maximum cents spendable in one UTC calendar month. -3. THE API_Key SHALL have an `overagePolicy` field (string, default `"block"`) with valid values `block`, `charge_402`, or `allow_overage`. -4. WHEN the `getQuotaConfig` function is called for an API key that does not exist in the database, THE Metering_Service SHALL return default quota values: `dailyQuotaCents` = 500, `monthlyQuotaCents` = 10000, `overagePolicy` = `"block"`. - -### Requirement 3: Quota Enforcement — Block Policy - -**User Story:** As a platform operator, I want over-quota requests blocked outright when the key's overage policy is `block`, so that I can prevent unplanned spending. - -#### Acceptance Criteria - -1. WHEN an API key's current daily cents or monthly cents meets or exceeds its quota limit, THE Metering_Service SHALL return `allowed: false` from `checkQuota`. -2. WHEN `checkQuota` returns `allowed: false` and the key's `overagePolicy` is `"block"`, THE X402_Middleware SHALL return HTTP 402 with `error: "Quota exceeded"` and `policy: "block"` without recording additional usage. -3. WHEN an API key's daily and monthly cents are both below their respective quota limits, THE Metering_Service SHALL return `allowed: true` from `checkQuota`. - -### Requirement 4: Quota Enforcement — Charge 402 Policy - -**User Story:** As a platform operator, I want over-quota requests to receive a 402 response prompting fresh payment when the key's overage policy is `charge_402`, so that keys can continue operating by paying extra. - -#### Acceptance Criteria - -1. WHEN `checkQuota` returns `allowed: false` and the key's `overagePolicy` is `"charge_402"`, THE X402_Middleware SHALL return HTTP 402 with `error: "Quota exceeded — additional payment required"` and `policy: "charge_402"` without recording additional usage. - -### Requirement 5: Quota Enforcement — Allow Overage Policy - -**User Story:** As a platform operator, I want over-quota requests to be allowed and billed as overage when the key's overage policy is `allow_overage`, so that high-volume keys never experience interruption. - -#### Acceptance Criteria - -1. WHEN `checkQuota` returns `allowed: false` and the key's `overagePolicy` is `"allow_overage"`, THE X402_Middleware SHALL record usage via `recordUsage` and allow the request to proceed normally. - -### Requirement 6: Usage Summary — Per-Key Self-Service Endpoint - -**User Story:** As an API key holder, I want to retrieve my own current usage summary, so that I can monitor my spending and remaining quota. - -#### Acceptance Criteria - -1. THE Usage_API SHALL expose a `GET /usage/me` endpoint that requires a valid `Authorization: Bearer ` header. -2. WHEN a valid API key is supplied to `GET /usage/me`, THE Usage_API SHALL return a Usage_Summary containing `keyId`, `dailyCalls`, `dailyCents`, `monthlyCalls`, `monthlyCents`, `dailyQuotaCents`, `monthlyQuotaCents`, `dailyRemainingCents`, `monthlyRemainingCents`, `overagePolicy`, and `quotaExceeded`. -3. IF no API key is supplied to `GET /usage/me`, THEN THE Usage_API SHALL return HTTP 401 with `error: "Unauthorized"`. -4. THE `dailyRemainingCents` field in the Usage_Summary SHALL equal `max(0, dailyQuotaCents - dailyCents)`. -5. THE `monthlyRemainingCents` field in the Usage_Summary SHALL equal `max(0, monthlyQuotaCents - monthlyCents)`. -6. THE `quotaExceeded` field in the Usage_Summary SHALL be `true` when `dailyCents >= dailyQuotaCents` or `monthlyCents >= monthlyQuotaCents`, and `false` otherwise. - -### Requirement 7: Usage Summary — Admin Per-Key Endpoint - -**User Story:** As a platform operator, I want to look up the usage summary for any specific API key, so that I can support customers and audit billing. - -#### Acceptance Criteria - -1. THE Usage_API SHALL expose a `GET /admin/usage/:keyId` endpoint that requires a valid `X-Admin-Token` or `Authorization: Bearer ` header matching `ADMIN_TOKEN`. -2. WHEN a valid admin token is supplied and the key ID exists, THE Usage_API SHALL return a Usage_Summary for the specified key. -3. IF the admin token is missing or invalid, THEN THE Usage_API SHALL return HTTP 401 with `error: "Unauthorized"`. - -### Requirement 8: Usage Summary — Admin Bulk Endpoint - -**User Story:** As a platform operator, I want to retrieve usage summaries for all active API keys in a single request, so that I can generate billing reports efficiently. - -#### Acceptance Criteria - -1. THE Usage_API SHALL expose a `GET /admin/usage` endpoint that requires a valid admin token. -2. WHEN a valid admin token is supplied, THE Usage_API SHALL return a JSON object `{ keys: UsageSummary[] }` containing Usage_Summary entries for all non-revoked API keys. -3. IF the admin token is missing or invalid, THEN THE Usage_API SHALL return HTTP 401 with `error: "Unauthorized"`. - -### Requirement 9: Usage Summary Correctness Properties - -**User Story:** As a platform operator, I want the billing totals computed from usage records to be arithmetically correct regardless of the number or order of recorded calls, so that customers are never over- or under-charged. - -#### Acceptance Criteria - -1. FOR ALL sequences of `recordUsage` calls with non-negative cent values, THE Metering_Service SHALL report `dailyCents` equal to the sum of all recorded cent values within the current UTC day. -2. FOR ALL sequences of `recordUsage` calls with non-negative cent values, THE Metering_Service SHALL report `monthlyCents` equal to the sum of all recorded cent values within the current UTC month. -3. FOR ALL sequences of `recordUsage` calls, THE Metering_Service SHALL report `dailyCalls` equal to the total number of calls recorded within the current UTC day. -4. FOR ALL sequences of `recordUsage` calls, THE Metering_Service SHALL report `monthlyCalls` equal to the total number of calls recorded within the current UTC month. -5. WHEN `recordUsage` is called N times, THE Metering_Service SHALL reflect exactly N increments in `dailyCalls` and `monthlyCalls`, preserving the additive invariant after each individual call. - -### Requirement 10: Metering Atomicity - -**User Story:** As a platform operator, I want metering updates to be applied atomically so that partial writes cannot produce inconsistent call-count vs. cents totals. - -#### Acceptance Criteria - -1. WHEN `recordUsage` is called, THE Metering_Service SHALL apply all counter increments and expiry updates for a given key within a single Redis pipeline (MULTI/EXEC) to ensure atomic execution. -2. IF the Redis pipeline fails during `recordUsage`, THEN THE Metering_Service SHALL propagate the error to the caller rather than silently ignoring it. From 01ef2d1c5d814138ad3f30431211f371e49b3746 Mon Sep 17 00:00:00 2001 From: bade2brazy Date: Mon, 29 Jun 2026 13:40:41 +0100 Subject: [PATCH 4/6] feat: Aquarius AMM ingester, Reflector oracle comparison, Portfolio NAV example, Python SDK (#109) * feat(ingest): add Aquarius AMM venue adapter (#103) * feat(ingest): add Reflector oracle adapter (#104) * feat(routes): add /compare/:asset oracle comparison endpoint (#104) * feat(examples): add portfolio NAV example app skeleton (#105) * feat(sdk): add Python SDK data models (#106) * feat(sdk): add Python SDK LensClient (#106) * feat(sdk): add Python package init (#106) * feat(sdk): add Python package setup.py (#106) * feat(examples): add portfolio NAV page with Horizon + Lens price lookup (#105) * feat(server): wire Aquarius ingester and oracle comparison route (#103 #104) --- clients/python/lens_client/__init__.py | 10 ++ clients/python/lens_client/client.py | 85 +++++++++++++++ clients/python/lens_client/models.py | 41 +++++++ clients/python/setup.py | 13 +++ examples/portfolio-nav/package.json | 20 ++++ examples/portfolio-nav/pages/index.tsx | 141 +++++++++++++++++++++++++ src/index.ts | 4 + src/ingest/oracles/reflector.ts | 102 ++++++++++++++++++ src/ingest/venues/aquarius.ts | 108 +++++++++++++++++++ src/routes/oracle.ts | 54 ++++++++++ 10 files changed, 578 insertions(+) create mode 100644 clients/python/lens_client/__init__.py create mode 100644 clients/python/lens_client/client.py create mode 100644 clients/python/lens_client/models.py create mode 100644 clients/python/setup.py create mode 100644 examples/portfolio-nav/package.json create mode 100644 examples/portfolio-nav/pages/index.tsx create mode 100644 src/ingest/oracles/reflector.ts create mode 100644 src/ingest/venues/aquarius.ts create mode 100644 src/routes/oracle.ts diff --git a/clients/python/lens_client/__init__.py b/clients/python/lens_client/__init__.py new file mode 100644 index 0000000..1f8894e --- /dev/null +++ b/clients/python/lens_client/__init__.py @@ -0,0 +1,10 @@ +from .client import LensClient +from .models import PriceResponse, RouteResponse, CandleResponse, OracleComparison + +__all__ = [ + "LensClient", + "PriceResponse", + "RouteResponse", + "CandleResponse", + "OracleComparison", +] diff --git a/clients/python/lens_client/client.py b/clients/python/lens_client/client.py new file mode 100644 index 0000000..01035e6 --- /dev/null +++ b/clients/python/lens_client/client.py @@ -0,0 +1,85 @@ +import json +import urllib.request +import urllib.error +from typing import Optional +from .models import PriceResponse, RouteResponse, CandleResponse, OracleComparison + + +class LensClient: + """Minimal HTTP client for the Lens price indexer API. + + No third-party dependencies — uses only the Python standard library. + """ + + def __init__(self, base_url: str, timeout: int = 10): + self.base_url = base_url.rstrip('/') + self.timeout = timeout + + def _get(self, path: str) -> dict: + url = f"{self.base_url}{path}" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + return json.loads(resp.read().decode()) + + def get_price(self, asset_a: str, asset_b: str) -> Optional[PriceResponse]: + """Return the latest aggregated price for a trading pair.""" + try: + data = self._get(f"/price/{asset_a}/{asset_b}") + return PriceResponse( + asset_a=data['assetA'], + asset_b=data['assetB'], + pair_key=data['pairKey'], + price=float(data['price']), + source=data.get('source', ''), + timestamp=data.get('timestamp', ''), + ) + except (urllib.error.HTTPError, KeyError): + return None + + def get_route(self, from_asset: str, to_asset: str, amount: float) -> Optional[RouteResponse]: + """Return the best swap route between two assets.""" + try: + data = self._get(f"/route/{from_asset}/{to_asset}?amount={amount}") + return RouteResponse( + path=data.get('path', []), + input_asset=data['inputAsset'], + output_asset=data['outputAsset'], + estimated_output=float(data['estimatedOutput']), + ) + except (urllib.error.HTTPError, KeyError): + return None + + def get_candles(self, asset_a: str, asset_b: str, interval: str = '1h', limit: int = 24) -> list: + """Return OHLCV candles for a pair.""" + try: + data = self._get(f"/candles/{asset_a}/{asset_b}?interval={interval}&limit={limit}") + candles = data if isinstance(data, list) else data.get('candles', []) + return [ + CandleResponse( + pair_key=c.get('pairKey', ''), + open=float(c['open']), + high=float(c['high']), + low=float(c['low']), + close=float(c['close']), + volume=float(c.get('volume', 0)), + timestamp=c.get('timestamp', ''), + ) + for c in candles + ] + except (urllib.error.HTTPError, KeyError): + return [] + + def compare_oracle(self, asset: str) -> Optional[OracleComparison]: + """Compare Lens price vs Reflector oracle for an asset.""" + try: + data = self._get(f"/compare/{asset}") + return OracleComparison( + asset=data['asset'], + lens=data.get('lens'), + reflector=data.get('reflector'), + deviation_pct=data.get('deviationPct'), + status=data.get('status', 'unknown'), + fetched_at=data.get('fetchedAt', ''), + ) + except urllib.error.HTTPError: + return None diff --git a/clients/python/lens_client/models.py b/clients/python/lens_client/models.py new file mode 100644 index 0000000..a2b8da9 --- /dev/null +++ b/clients/python/lens_client/models.py @@ -0,0 +1,41 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class PriceResponse: + asset_a: str + asset_b: str + pair_key: str + price: float + source: str + timestamp: str + + +@dataclass +class RouteResponse: + path: list + input_asset: str + output_asset: str + estimated_output: float + + +@dataclass +class CandleResponse: + pair_key: str + open: float + high: float + low: float + close: float + volume: float + timestamp: str + + +@dataclass +class OracleComparison: + asset: str + lens: Optional[float] + reflector: Optional[float] + deviation_pct: Optional[float] + status: str + fetched_at: str diff --git a/clients/python/setup.py b/clients/python/setup.py new file mode 100644 index 0000000..fb50475 --- /dev/null +++ b/clients/python/setup.py @@ -0,0 +1,13 @@ +from setuptools import setup, find_packages + +setup( + name="lens-py", + version="0.1.0", + description="Python client for the Lens Stellar price indexer API", + packages=find_packages(), + python_requires=">=3.9", + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + ], +) diff --git a/examples/portfolio-nav/package.json b/examples/portfolio-nav/package.json new file mode 100644 index 0000000..f98b206 --- /dev/null +++ b/examples/portfolio-nav/package.json @@ -0,0 +1,20 @@ +{ + "name": "lens-portfolio-nav", + "version": "0.1.0", + "private": true, + "scripts": { + "dev": "next dev -p 3003", + "build": "next build", + "start": "next start -p 3003" + }, + "dependencies": { + "next": "^14.0.0", + "react": "^18.0.0", + "react-dom": "^18.0.0" + }, + "devDependencies": { + "@types/node": "^20.0.0", + "@types/react": "^18.0.0", + "typescript": "^5.0.0" + } +} diff --git a/examples/portfolio-nav/pages/index.tsx b/examples/portfolio-nav/pages/index.tsx new file mode 100644 index 0000000..3e5d0d7 --- /dev/null +++ b/examples/portfolio-nav/pages/index.tsx @@ -0,0 +1,141 @@ +import { useState } from 'react' + +interface AssetBalance { + asset_code: string + asset_issuer: string | null + balance: string +} + +interface NavRow { + asset: string + balance: number + price: number | null + value: number | null +} + +export default function PortfolioNav() { + const [address, setAddress] = useState('') + const [rows, setRows] = useState([]) + const [totalNav, setTotalNav] = useState(null) + const [loading, setLoading] = useState(false) + const [error, setError] = useState(null) + + const LENS_URL = process.env.NEXT_PUBLIC_LENS_URL ?? 'http://localhost:3000' + const HORIZON_URL = process.env.NEXT_PUBLIC_HORIZON_URL ?? 'https://horizon.stellar.org' + + async function fetchNav() { + if (!address.trim()) return + setLoading(true) + setError(null) + setRows([]) + setTotalNav(null) + + try { + const horizonResp = await fetch(`${HORIZON_URL}/accounts/${address.trim()}`) + if (!horizonResp.ok) throw new Error('Account not found on Horizon') + const account = await horizonResp.json() + + const balances: AssetBalance[] = (account.balances ?? []).filter( + (b: any) => parseFloat(b.balance) > 0 + ).map((b: any) => ({ + asset_code: b.asset_type === 'native' ? 'XLM' : b.asset_code, + asset_issuer: b.asset_type === 'native' ? null : b.asset_issuer, + balance: b.balance, + })) + + const navRows: NavRow[] = await Promise.all( + balances.map(async (b) => { + const bal = parseFloat(b.balance) + if (b.asset_code === 'XLM') { + // Fetch XLM/USDC price from Lens + try { + const priceResp = await fetch(`${LENS_URL}/price/XLM/USDC`) + if (priceResp.ok) { + const data = await priceResp.json() + const price = parseFloat(data.price) + return { asset: 'XLM', balance: bal, price, value: bal * price } + } + } catch {} + return { asset: 'XLM', balance: bal, price: null, value: null } + } + + try { + const priceResp = await fetch(`${LENS_URL}/price/${b.asset_code}/USDC`) + if (priceResp.ok) { + const data = await priceResp.json() + const price = parseFloat(data.price) + return { asset: b.asset_code, balance: bal, price, value: bal * price } + } + } catch {} + return { asset: b.asset_code, balance: bal, price: null, value: null } + }) + ) + + setRows(navRows) + const total = navRows.reduce((sum, r) => sum + (r.value ?? 0), 0) + setTotalNav(total) + } catch (e: any) { + setError(e.message ?? 'Unknown error') + } finally { + setLoading(false) + } + } + + return ( +
+

Stellar Portfolio NAV

+

Enter a Stellar address to compute its net asset value using Lens prices.

+
+ setAddress(e.target.value)} + onKeyDown={e => e.key === 'Enter' && fetchNav()} + /> + +
+ + {error &&

{error}

} + + {rows.length > 0 && ( + <> + + + + + + + + + + + {rows.map(r => ( + + + + + + + ))} + +
AssetBalancePrice (USDC)Value (USDC)
{r.asset}{r.balance.toFixed(7)} + {r.price !== null ? r.price.toFixed(6) : '—'} + + {r.value !== null ? r.value.toFixed(2) : '—'} +
+

+ Total NAV:{' '} + {totalNav !== null ? `$${totalNav.toFixed(2)} USDC` : 'Partial (some prices unavailable)'} +

+ + )} +
+ ) +} diff --git a/src/index.ts b/src/index.ts index 805f834..c50d731 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,12 +26,14 @@ import { registerUsageRoutes } from './api/usage' import { registerPriceRoutes } from './routes/price' import { registerVolumeRoutes } from './routes/volumes' import { registerBenchmarkRoutes } from './routes/benchmark' +import { registerOracleRoutes } from './routes/oracle' import { fanOutManager } from './ws/fanout' import { startSDEXIngester } from './ingesters/sdex' import { startAMMIngester } from './ingesters/amm' import { startSoroswapIngester } from './ingesters/soroswap' import { startSnapshotIngester } from './ingesters/snapshot' +import { startAquariusIngester } from './ingest/venues/aquarius' import { createAggregateQueue, startAggregateWorker, scheduleAggregateRefresh } from './jobs/aggregateRefresh' import { createSnapshotRetentionQueue, startSnapshotRetentionWorker, scheduleSnapshotRetention } from './jobs/snapshotRetention' import { loadPersistedPairs, getActivePairs } from './pairsRegistry' @@ -116,6 +118,7 @@ async function main() { await registerPriceRoutes(app) await registerVolumeRoutes(app) await registerBenchmarkRoutes(app) + await registerOracleRoutes(app) await registerGraphQL(app) await registerWebSocket(app) @@ -171,6 +174,7 @@ async function main() { restartIngester('AMM', startAMMIngester) restartIngester('Soroswap', startSoroswapIngester) restartIngester('Snapshot', startSnapshotIngester) + restartIngester('Aquarius', startAquariusIngester) console.log(`[lens] Watching ${getActivePairs().length} pairs: ${getActivePairs().map(p => p.pairKey).join(', ')}`) } diff --git a/src/ingest/oracles/reflector.ts b/src/ingest/oracles/reflector.ts new file mode 100644 index 0000000..5db846d --- /dev/null +++ b/src/ingest/oracles/reflector.ts @@ -0,0 +1,102 @@ +/** + * Reflector Oracle Adapter + * + * Polls Reflector on-chain oracle contract prices via Soroban RPC simulation + * and provides price data for the /compare/:asset comparison endpoint. + * + * Issue: #104 + */ + +import { + Contract, + rpc as SorobanRpc, + TransactionBuilder, + Networks, + BASE_FEE, + Keypair, + scValToNative, + nativeToScVal, + Account, +} from '@stellar/stellar-sdk' +import { config } from '../../config' + +// Reflector oracle mainnet contract address +const REFLECTOR_CONTRACT_ID = + process.env.REFLECTOR_CONTRACT_ID ?? + 'CCYXZMNHFXHKF3YEX4VJJ5TH3YHCVZIBPNBGM7C4PJIMCIMNNWDOQYA' + +// Ephemeral fee payer — simulation only, no real funds needed +const FEE_PAYER = Keypair.random() + +let _rpc: SorobanRpc.Server | null = null +function getRpc(): SorobanRpc.Server { + _rpc ??= new SorobanRpc.Server(config.rpc.url, { allowHttp: true }) + return _rpc +} + +export interface ReflectorPrice { + asset: string + price: number + timestamp: number +} + +/** + * Fetch the latest price for a given asset code from the Reflector oracle. + * Returns null when the contract is unreachable or the asset is unknown. + */ +export async function fetchReflectorPrice(assetCode: string): Promise { + try { + const rpc = getRpc() + const contract = new Contract(REFLECTOR_CONTRACT_ID) + const account = new Account(FEE_PAYER.publicKey(), '0') + + const tx = new TransactionBuilder(account, { + fee: BASE_FEE, + networkPassphrase: config.network.passphrase, + }) + .addOperation( + contract.call('lastprice', nativeToScVal(assetCode, { type: 'symbol' })) + ) + .setTimeout(30) + .build() + + const result = await rpc.simulateTransaction(tx) + if (!SorobanRpc.Api.isSimulationSuccess(result)) return null + + const raw = scValToNative((result as any).result?.retval) as { + price?: bigint + timestamp?: bigint + } | null + + if (!raw || raw.price === undefined) return null + + // Reflector uses 14 decimal places of precision + const price = Number(raw.price) / 1e14 + + return { + asset: assetCode.toUpperCase(), + price, + timestamp: Number(raw.timestamp ?? 0), + } + } catch { + return null + } +} + +// In-memory cache (60 s TTL) to avoid hammering RPC on every comparison request +const _cache = new Map() +const CACHE_TTL_MS = 60_000 + +export async function getCachedReflectorPrice(asset: string): Promise { + const key = asset.toUpperCase() + const entry = _cache.get(key) + if (entry && Date.now() - entry.fetchedAt < CACHE_TTL_MS) { + return entry.price + } + const fresh = await fetchReflectorPrice(key) + if (fresh) { + _cache.set(key, { price: fresh.price, fetchedAt: Date.now() }) + return fresh.price + } + return null +} diff --git a/src/ingest/venues/aquarius.ts b/src/ingest/venues/aquarius.ts new file mode 100644 index 0000000..a609d72 --- /dev/null +++ b/src/ingest/venues/aquarius.ts @@ -0,0 +1,108 @@ +/** + * Aquarius AMM Venue Adapter + * + * Indexes Aquarius liquidity pool (Stellar classic) reserves via the + * Aquarius AMM API, calculates spot prices from constant-product invariants, + * and stores them in price_points with source = 'aquarius_amm'. + * + * Issue: #103 + */ + +import { config } from '../../config' +import { getActivePairs } from '../../pairsRegistry' +import { upsertPricePoints } from '../../db' +import { dispatchPriceUpdate } from '../../webhookDispatcher' +import type { WatchedPair } from '../../types' + +const AQUARIUS_AMM_API = 'https://amm.aquarius.network/api/v1/pools/' + +const lastPrice = new Map() + +interface AquariusPool { + pool_hash: string + reserves: string[] + total_shares: string +} + +interface AquariusListResponse { + results?: AquariusPool[] +} + +export async function fetchAquariusPools(pair: WatchedPair): Promise { + try { + const assetAStr = pair.assetA.issuer + ? `${pair.assetA.code}:${pair.assetA.issuer}` + : 'native' + const assetBStr = pair.assetB.issuer + ? `${pair.assetB.code}:${pair.assetB.issuer}` + : 'native' + + const params = new URLSearchParams() + params.append('assets[]', assetAStr) + params.append('assets[]', assetBStr) + + const res = await fetch(`${AQUARIUS_AMM_API}?${params.toString()}`) + if (!res.ok) return [] + const data = await res.json() as AquariusListResponse + return data.results ?? [] + } catch (err) { + console.error(`[aquarius] Failed to fetch pools for ${pair.pairKey}:`, (err as Error).message) + return [] + } +} + +export async function ingestAquariusPair(pair: WatchedPair): Promise { + const pools = await fetchAquariusPools(pair) + if (pools.length === 0) return + + const points = pools.flatMap(pool => { + const [r0Str, r1Str] = pool.reserves + if (!r0Str || !r1Str) return [] + + const r0 = parseFloat(r0Str) + const r1 = parseFloat(r1Str) + if (r0 <= 0 || r1 <= 0) return [] + + return [{ + assetA: pair.assetA.code, + assetB: pair.assetB.code, + pairKey: pair.pairKey, + source: 'aquarius_amm' as const, + poolId: pool.pool_hash, + price: r1 / r0, + baseVolume: 0, + counterVolume: 0, + ledger: 0, + timestamp: new Date(), + }] + }) + + if (points.length === 0) return + + await upsertPricePoints(points as any) + + const latest = points[points.length - 1] + const previousPrice = lastPrice.get(pair.pairKey) ?? latest.price + lastPrice.set(pair.pairKey, latest.price) + + dispatchPriceUpdate({ + assetA: pair.assetA.code, + assetB: pair.assetB.code, + previousPrice, + currentPrice: latest.price, + }).catch(err => console.error('[aquarius] webhook dispatch error:', (err as Error).message)) +} + +async function sleep(ms: number) { + return new Promise(r => setTimeout(r, ms)) +} + +export async function startAquariusIngester(): Promise { + console.log(`[aquarius] Starting Aquarius AMM ingester for ${getActivePairs().length} pairs`) + while (true) { + for (const pair of getActivePairs()) { + await ingestAquariusPair(pair) + } + await sleep(config.indexer.pollIntervalMs) + } +} diff --git a/src/routes/oracle.ts b/src/routes/oracle.ts new file mode 100644 index 0000000..58e15ba --- /dev/null +++ b/src/routes/oracle.ts @@ -0,0 +1,54 @@ +import type { FastifyInstance } from 'fastify' +import { getCachedReflectorPrice } from '../ingest/oracles/reflector' +import { pgPool } from '../db' + +export async function registerOracleRoutes(app: FastifyInstance) { + app.get<{ Params: { asset: string } }>('/compare/:asset', async (req, reply) => { + const { asset } = req.params + const assetCode = asset.toUpperCase() + + const [reflectorPrice, dbResult] = await Promise.all([ + getCachedReflectorPrice(assetCode), + pgPool.query( + `SELECT + pair_key, + COALESCE( + SUM(price::numeric * base_volume::numeric), 0 + ) / NULLIF(SUM(base_volume::numeric), 0) AS vwap + FROM price_points + WHERE (asset_a = $1 OR asset_b = $1) + AND timestamp > NOW() - INTERVAL '5 minutes' + GROUP BY pair_key + LIMIT 1`, + [assetCode] + ), + ]) + + const lensPrice = dbResult.rows[0] ? parseFloat(dbResult.rows[0].vwap) : null + + if (reflectorPrice === null && lensPrice === null) { + return reply.status(404).send({ error: `No price data found for asset ${assetCode}` }) + } + + const deviation = + reflectorPrice !== null && lensPrice !== null + ? Math.abs(reflectorPrice - lensPrice) / reflectorPrice + : null + + return { + asset: assetCode, + lens: lensPrice, + reflector: reflectorPrice, + deviationPct: deviation !== null ? parseFloat((deviation * 100).toFixed(4)) : null, + status: + deviation !== null + ? deviation < 0.01 + ? 'aligned' + : deviation < 0.05 + ? 'minor_deviation' + : 'major_deviation' + : 'partial', + fetchedAt: new Date().toISOString(), + } + }) +} From da45a6f74fcc4f7e4147e1c03c3643e93cc9ef01 Mon Sep 17 00:00:00 2001 From: BigNathan1 Date: Mon, 29 Jun 2026 06:17:06 -0700 Subject: [PATCH 5/6] docs: add price alert bot recipe and runnable example (#108) Co-authored-by: Brooks Student Portal Co-authored-by: Claude Opus 4.8 --- .env.example | 15 +++ README.md | 2 + docs/cookbook/alert-bot.md | 199 ++++++++++++++++++++++++++++++++ examples/alert-bot/README.md | 37 ++++++ examples/alert-bot/alert-bot.ts | 160 +++++++++++++++++++++++++ package.json | 1 + 6 files changed, 414 insertions(+) create mode 100644 docs/cookbook/alert-bot.md create mode 100644 examples/alert-bot/README.md create mode 100644 examples/alert-bot/alert-bot.ts diff --git a/.env.example b/.env.example index 8b09268..d3acd09 100644 --- a/.env.example +++ b/.env.example @@ -73,6 +73,21 @@ ORACLE_RELAY_ASSET_B=USDC # How often the relay polls Lens and updates the contract. ORACLE_RELAY_INTERVAL_MS=60000 +# --- Price Alert Bot Example --- +# Lens WebSocket endpoint the alert bot connects to. +ALERT_BOT_WS_URL=ws://localhost:3002/ws +# Pair to watch and the threshold to alert on. +ALERT_BOT_ASSET_A=XLM +ALERT_BOT_ASSET_B=USDC +ALERT_BOT_THRESHOLD=0.15 +# Alert direction: "above" or "below". +ALERT_BOT_DIRECTION=above +# Optional base64 X-PAYMENT header for x402-gated streams. +ALERT_BOT_PAYMENT= +# Optional HTTPS URL to forward alerts to, plus its HMAC secret. +ALERT_BOT_NOTIFY_URL= +ALERT_BOT_NOTIFY_SECRET=alert-bot + # --- Soroswap AMM Ingester --- # Mainnet Soroswap factory contract address # See: https://github.com/soroswap/core diff --git a/README.md b/README.md index 3b55fe6..7cd291f 100644 --- a/README.md +++ b/README.md @@ -165,6 +165,8 @@ The API specification is available in [OpenAPI 3.0 format](openapi.yaml) and is The [oracle relay example](examples/oracle-relay/README.md) shows a minimal Soroban contract plus a Node relay that reads Lens prices and pushes them on chain. +The [price alert bot example](examples/alert-bot/README.md) shows an "if XLM > X notify me" bot built on the WebSocket price stream — see the [cookbook walkthrough](docs/cookbook/alert-bot.md). + ## Docker Quickstart The fastest way to get Lens running locally is with Docker: diff --git a/docs/cookbook/alert-bot.md b/docs/cookbook/alert-bot.md new file mode 100644 index 0000000..3fe9001 --- /dev/null +++ b/docs/cookbook/alert-bot.md @@ -0,0 +1,199 @@ +# Recipe: Price Alert Bot + +> Build a bot that notifies you the moment **XLM crosses a price you care +> about** — e.g. *"tell me when XLM goes above $0.15"*. + +This recipe wires the Lens real-time WebSocket price stream into a tiny +notifier. When the price crosses your threshold, the bot logs an alert and +(optionally) POSTs a signed payload to any HTTPS endpoint — Slack, Discord, +or your own service. + +A complete, runnable version of this recipe lives in +[`examples/alert-bot`](../../examples/alert-bot). Everything below explains +how it works so you can adapt it. + +## How it works + +``` +Lens indexer ──price:update──▶ /ws stream ──▶ alert bot ──crosses?──▶ notify +``` + +1. Lens ingests SDEX + AMM trades and emits a `price:update` whenever a + watched pair moves. +2. The `/ws` endpoint fans those updates out to connected clients as + `price_update` messages, with backpressure-aware coalescing. +3. The bot keeps the last price per pair and checks whether the move + **crossed** your threshold (not merely that it sits past it). Crossing + detection is the same `crossesThreshold` helper Lens uses internally, so + the bot and server agree on what "above" and "below" mean. +4. On a crossing, the bot notifies you and — if `ALERT_BOT_NOTIFY_URL` is + set — delivers an HMAC-signed JSON payload with retries. + +> **Crossing vs. level:** the bot fires on the *transition* (`0.14 → 0.16` +> crosses `0.15`), not on every tick above the line. That avoids alert +> spam while the price hovers past your threshold. + +## The `price_update` message + +Each frame from `/ws` looks like: + +```json +{ + "type": "price_update", + "assetA": "XLM", + "assetB": "USDC", + "previousPrice": 0.1487, + "currentPrice": 0.1502, + "timestamp": "2026-06-28T18:46:02.114Z" +} +``` + +The first frame on connect is a `{ "type": "status" }` message; if the +stream is x402-gated and you didn't pay, you'll get +`{ "type": "error", "status": 402, "requirements": { ... } }` instead (see +[Paying for the stream](#paying-for-the-stream)). + +## Worked example + +The bot below is the heart of [`examples/alert-bot/alert-bot.ts`](../../examples/alert-bot/alert-bot.ts). + +```typescript +import WebSocket from 'ws' +import { crossesThreshold } from '../../src/alerts' + +const WS_URL = process.env.ALERT_BOT_WS_URL ?? 'ws://localhost:3002/ws' +const config = { + assetA: 'XLM', + assetB: 'USDC', + threshold: 0.15, + direction: 'above' as const, // "above" | "below" +} + +const pair = [config.assetA, config.assetB].sort().join('/') +const ws = new WebSocket(WS_URL) + +ws.on('open', () => { + console.log(`watching ${config.assetA}/${config.assetB} ${config.direction} ${config.threshold}`) +}) + +ws.on('message', (data) => { + const msg = JSON.parse(data.toString()) + if (msg.type !== 'price_update') return + + // Match the pair regardless of asset order. + const got = [msg.assetA, msg.assetB].sort().join('/') + if (got !== pair) return + + if (crossesThreshold(config, msg.previousPrice, msg.currentPrice)) { + console.log(`ALERT: ${pair} is ${config.direction} ${config.threshold} — now ${msg.currentPrice}`) + // → POST to Slack/Discord, send a push, page yourself, etc. + } +}) +``` + +### Run it + +```bash +# 1. Start Lens (API + indexer) +docker compose up -d + +# 2. Configure the alert and run the bot +ALERT_BOT_THRESHOLD=0.15 ALERT_BOT_DIRECTION=above npm run alert:bot +``` + +Use `--once` to exit after the first alert (useful in tests/CI): + +```bash +npm run alert:bot -- --once +``` + +Expected output once XLM ticks past `0.15`: + +``` +[alert-bot] connected to ws://localhost:3002/ws — watching XLM/USDC above 0.15 +[alert-bot] ALERT XLM/USDC is above 0.15 — price 0.1502 at 2026-06-28T18:46:02.114Z +``` + +## Sending the alert somewhere real + +Set `ALERT_BOT_NOTIFY_URL` to any HTTPS endpoint and the bot will POST the +alert payload, signed with `X-Lens-Signature: hmac-sha256(secret, body)`: + +```bash +ALERT_BOT_THRESHOLD=0.15 \ +ALERT_BOT_NOTIFY_URL=https://hooks.slack.com/services/your/webhook \ +ALERT_BOT_NOTIFY_SECRET=my-shared-secret \ +npm run alert:bot +``` + +The body matches the Lens threshold-alert shape: + +```json +{ + "assetA": "XLM", + "assetB": "USDC", + "price": 0.1502, + "threshold": 0.15, + "direction": "above", + "timestamp": "2026-06-28T18:46:02.114Z" +} +``` + +Verify the signature on your side before trusting the payload: + +```typescript +import { createHmac, timingSafeEqual } from 'crypto' + +function verify(rawBody: string, header: string, secret: string): boolean { + const expected = createHmac('sha256', secret).update(rawBody).digest('hex') + return timingSafeEqual(Buffer.from(expected), Buffer.from(header)) +} +``` + +## Paying for the stream + +If the server sets `ORACLE_PAYMENT_ADDRESS`, the `/ws` stream is x402-gated +and the first frame will be a `402` with `requirements`. Sign a payment +with `@x402/stellar`, base64-encode it, and pass it via `ALERT_BOT_PAYMENT` +(sent as the `X-PAYMENT` header on connect). See the +[README payment walkthrough](../../README.md#4-nodejs--automatic-payment-with-x402fetch--x402stellar) +for how to produce that header. On testnet (the default), gating is off and +no payment is needed. + +## Server-side alternative: webhooks + +If you'd rather not keep a process connected, Lens can do the watching for +you. Register a webhook and Lens POSTs you (with the same HMAC signature) +when the threshold is crossed: + +```bash +curl -X POST http://localhost:3002/webhooks \ + -H 'Content-Type: application/json' \ + -d '{ + "url": "https://example.com/hooks/xlm", + "assetA": "XLM", + "assetB": "USDC", + "threshold": 0.15, + "direction": "above" + }' +# → { "id": "...", "secret": "..." } (store the secret to verify signatures) +``` + +Delete it with `DELETE /webhooks/:id`. Use the **bot** when you want local +control/custom logic, and **webhooks** when you want Lens to hold the +subscription. + +## Live demo + +- **Interactive API explorer (GraphiQL):** run Lens locally and open + to query live prices that drive the + stream. +- **Published API reference:** +- **Local stream:** `ws://localhost:3002/ws` once `docker compose up -d` is + running. + +## See also + +- [`examples/alert-bot`](../../examples/alert-bot) — the full runnable bot +- [Architecture Overview](../architecture.md) +- [`examples/oracle-relay`](../../examples/oracle-relay) — push Lens prices on-chain diff --git a/examples/alert-bot/README.md b/examples/alert-bot/README.md new file mode 100644 index 0000000..4bb5863 --- /dev/null +++ b/examples/alert-bot/README.md @@ -0,0 +1,37 @@ +# Price Alert Bot Example + +A minimal "if XLM > X notify me" bot for issue #100. + +It connects to the Lens WebSocket price stream (`/ws`), watches a single +asset pair, and fires a notification the moment the price crosses your +threshold. Optionally it forwards each alert to an HTTPS URL (Slack, +Discord, your own service) signed with HMAC-SHA256 — the same delivery +path Lens uses for server-side webhooks. + +## Files + +- `alert-bot.ts` runs the bot. + +## Environment + +| Variable | Default | Description | +|---|---|---| +| `ALERT_BOT_WS_URL` | `ws://localhost:3002/ws` | Lens WebSocket endpoint | +| `ALERT_BOT_ASSET_A` | `XLM` | Base asset to watch | +| `ALERT_BOT_ASSET_B` | `USDC` | Quote asset | +| `ALERT_BOT_THRESHOLD` | — | Price level to alert on (required, e.g. `0.15`) | +| `ALERT_BOT_DIRECTION` | `above` | `above` or `below` | +| `ALERT_BOT_PAYMENT` | — | Optional base64 `X-PAYMENT` for x402-gated streams | +| `ALERT_BOT_NOTIFY_URL` | — | Optional HTTPS URL to POST alerts to | +| `ALERT_BOT_NOTIFY_SECRET` | `alert-bot` | HMAC secret for the notify URL | + +## Run + +```bash +npm run alert:bot +``` + +The bot supports `--help` for usage and `--once` to exit after the first +alert fires (handy for testing). + +See the full walkthrough in [docs/cookbook/alert-bot.md](../../docs/cookbook/alert-bot.md). diff --git a/examples/alert-bot/alert-bot.ts b/examples/alert-bot/alert-bot.ts new file mode 100644 index 0000000..ba3d746 --- /dev/null +++ b/examples/alert-bot/alert-bot.ts @@ -0,0 +1,160 @@ +import 'dotenv/config' +import { pathToFileURL } from 'url' +import WebSocket from 'ws' +import { + buildThresholdAlertPayload, + crossesThreshold, + deliverJsonWithRetries, + type ThresholdDirection, +} from '../../src/alerts' + +const HELP_TEXT = `Usage: npm run alert:bot [-- --once] + +A "if XLM > X notify me" bot. It connects to the Lens WebSocket price +stream, watches a single pair, and fires a notification the moment the +price crosses your threshold. + +Flags: + --once Exit after the first alert fires (handy for testing). + --help Show this message. + +Environment: + ALERT_BOT_WS_URL=ws://localhost:3002/ws Lens WebSocket endpoint + ALERT_BOT_ASSET_A=XLM Base asset to watch + ALERT_BOT_ASSET_B=USDC Quote asset + ALERT_BOT_THRESHOLD=0.15 Price level to alert on + ALERT_BOT_DIRECTION=above "above" or "below" + ALERT_BOT_PAYMENT= Optional base64 X-PAYMENT for gated streams + ALERT_BOT_NOTIFY_URL= Optional HTTPS URL to POST alerts to + ALERT_BOT_NOTIFY_SECRET= Optional HMAC secret for the notify URL +` + +export interface AlertBotConfig { + wsUrl: string + assetA: string + assetB: string + threshold: number + direction: ThresholdDirection + payment?: string + notifyUrl?: string + notifySecret: string +} + +interface PriceUpdateMessage { + type: string + message?: string + assetA: string + assetB: string + previousPrice: number + currentPrice: number + timestamp: string +} + +function parseDirection(raw: string | undefined): ThresholdDirection { + if (raw === 'above' || raw === 'below') return raw + throw new Error('ALERT_BOT_DIRECTION must be "above" or "below"') +} + +export function readConfig(): AlertBotConfig { + const wsUrl = process.env.ALERT_BOT_WS_URL ?? 'ws://localhost:3002/ws' + const assetA = (process.env.ALERT_BOT_ASSET_A ?? 'XLM').toUpperCase() + const assetB = (process.env.ALERT_BOT_ASSET_B ?? 'USDC').toUpperCase() + const threshold = Number(process.env.ALERT_BOT_THRESHOLD) + const direction = parseDirection(process.env.ALERT_BOT_DIRECTION ?? 'above') + + if (!Number.isFinite(threshold)) { + throw new Error('ALERT_BOT_THRESHOLD must be a number (e.g. 0.15)') + } + + return { + wsUrl, + assetA, + assetB, + threshold, + direction, + payment: process.env.ALERT_BOT_PAYMENT || undefined, + notifyUrl: process.env.ALERT_BOT_NOTIFY_URL || undefined, + notifySecret: process.env.ALERT_BOT_NOTIFY_SECRET ?? 'alert-bot', + } +} + +/** True when a price update is for the pair we are watching (either order). */ +export function matchesPair(config: AlertBotConfig, msg: Pick): boolean { + const want = [config.assetA, config.assetB].sort().join('/') + const got = [msg.assetA.toUpperCase(), msg.assetB.toUpperCase()].sort().join('/') + return want === got +} + +async function fireAlert(config: AlertBotConfig, currentPrice: number, timestamp: string): Promise { + const payload = buildThresholdAlertPayload(config, config.assetA, config.assetB, currentPrice, timestamp) + + console.log( + `[alert-bot] ALERT ${payload.assetA}/${payload.assetB} is ${config.direction} ${config.threshold} ` + + `— price ${currentPrice} at ${timestamp}` + ) + + if (config.notifyUrl) { + await deliverJsonWithRetries(config.notifyUrl, payload, config.notifySecret) + } +} + +export async function runAlertBot(config = readConfig(), once = false): Promise { + const headers = config.payment ? { 'X-PAYMENT': config.payment } : undefined + const ws = new WebSocket(config.wsUrl, { headers }) + + await new Promise((resolve, reject) => { + ws.on('open', () => { + console.log( + `[alert-bot] connected to ${config.wsUrl} — watching ${config.assetA}/${config.assetB} ` + + `${config.direction} ${config.threshold}` + ) + resolve() + }) + ws.on('error', reject) + }) + + await new Promise((resolve, reject) => { + ws.on('message', (data: WebSocket.RawData) => { + let msg: PriceUpdateMessage + try { + msg = JSON.parse(data.toString()) + } catch { + return + } + + if (msg.type === 'error') { + reject(new Error(msg.message ?? 'stream error')) + return + } + if (msg.type !== 'price_update' || !matchesPair(config, msg)) return + + if (crossesThreshold(config, msg.previousPrice, msg.currentPrice)) { + void fireAlert(config, msg.currentPrice, msg.timestamp) + if (once) { + ws.close() + resolve() + } + } + }) + + ws.on('close', () => resolve()) + ws.on('error', reject) + }) +} + +async function main() { + if (process.argv.includes('--help')) { + console.log(HELP_TEXT.trim()) + return + } + + const once = process.argv.includes('--once') + await runAlertBot(undefined, once) +} + +if (process.argv[1] && pathToFileURL(process.argv[1]).href === import.meta.url) { + main().catch(err => { + console.error('[alert-bot] fatal error:', (err as Error).message) + process.exit(1) + }) +} diff --git a/package.json b/package.json index ea78527..e8e351b 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "version-packages": "changeset version", "release": "changeset tag", "oracle:relay": "tsx examples/oracle-relay/relay.ts", + "alert:bot": "tsx examples/alert-bot/alert-bot.ts", "key:issue": "tsx scripts/issue-api-key.ts", "db:push": "prisma db push", "db:generate": "prisma generate" From 1c8e46b08afbf2346d75355800a559c0b7b0dc19 Mon Sep 17 00:00:00 2001 From: adaofweb3 Date: Mon, 29 Jun 2026 18:17:12 +0100 Subject: [PATCH 6/6] feat: add CEX adapter interface and mock implementation (#110) Introduces src/ingest/cex/ with a CexAdapter interface (fetchTicker, fetchOrderBook, adapter registry) and a fully deterministic MockCexAdapter for testing. 14 unit tests cover ticker fields, order-book depth, unknown pairs, case normalisation, and registry round-trips. closes #99 Co-authored-by: adaofweb3 --- src/__tests__/cexIngester.test.ts | 116 ++++++++++++++++++++++++++++++ src/ingest/cex/index.ts | 84 ++++++++++++++++++++++ src/ingest/cex/mock.ts | 91 +++++++++++++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 src/__tests__/cexIngester.test.ts create mode 100644 src/ingest/cex/index.ts create mode 100644 src/ingest/cex/mock.ts diff --git a/src/__tests__/cexIngester.test.ts b/src/__tests__/cexIngester.test.ts new file mode 100644 index 0000000..e9447c9 --- /dev/null +++ b/src/__tests__/cexIngester.test.ts @@ -0,0 +1,116 @@ +import { describe, it, expect, beforeEach } from 'vitest' +import { + registerCexAdapter, + getCexAdapter, + getAllCexAdapters, + type CexAdapter, +} from '../ingest/cex/index' +import { MockCexAdapter } from '../ingest/cex/mock' + +describe('CexAdapter interface', () => { + it('MockCexAdapter satisfies CexAdapter shape', () => { + const adapter: CexAdapter = new MockCexAdapter() + expect(typeof adapter.name).toBe('string') + expect(typeof adapter.fetchTicker).toBe('function') + expect(typeof adapter.fetchOrderBook).toBe('function') + }) +}) + +describe('MockCexAdapter.fetchTicker', () => { + let adapter: MockCexAdapter + + beforeEach(() => { + adapter = new MockCexAdapter('mock') + }) + + it('returns a ticker with correct pair string', async () => { + const ticker = await adapter.fetchTicker('XLM', 'USDC') + expect(ticker).not.toBeNull() + expect(ticker!.pair).toBe('XLM/USDC') + }) + + it('returns default seed values', async () => { + const ticker = await adapter.fetchTicker('XLM', 'USDC') + expect(ticker!.bid).toBeCloseTo(0.098) + expect(ticker!.ask).toBeCloseTo(0.102) + expect(ticker!.last).toBeCloseTo(0.100) + }) + + it('respects per-pair seed overrides', async () => { + adapter.setSeed('XLM/USDC', { last: 0.42, bid: 0.41, ask: 0.43 }) + const ticker = await adapter.fetchTicker('XLM', 'USDC') + expect(ticker!.last).toBeCloseTo(0.42) + expect(ticker!.bid).toBeCloseTo(0.41) + expect(ticker!.ask).toBeCloseTo(0.43) + }) + + it('returns null for unknown pairs', async () => { + adapter.markUnknown('BTC/USD') + const ticker = await adapter.fetchTicker('BTC', 'USD') + expect(ticker).toBeNull() + }) + + it('normalises pair identifiers to uppercase', async () => { + const ticker = await adapter.fetchTicker('xlm', 'usdc') + expect(ticker!.pair).toBe('XLM/USDC') + }) + + it('ticker includes a timestamp', async () => { + const before = Date.now() + const ticker = await adapter.fetchTicker('XLM', 'USDC') + expect(ticker!.timestamp.getTime()).toBeGreaterThanOrEqual(before) + }) +}) + +describe('MockCexAdapter.fetchOrderBook', () => { + let adapter: MockCexAdapter + + beforeEach(() => { + adapter = new MockCexAdapter('mock') + }) + + it('returns bids and asks arrays', async () => { + const book = await adapter.fetchOrderBook('XLM', 'USDC') + expect(book).not.toBeNull() + expect(Array.isArray(book!.bids)).toBe(true) + expect(Array.isArray(book!.asks)).toBe(true) + }) + + it('respects depth parameter', async () => { + const book = await adapter.fetchOrderBook('XLM', 'USDC', 5) + expect(book!.bids).toHaveLength(5) + expect(book!.asks).toHaveLength(5) + }) + + it('bids are below asks (no crossed book)', async () => { + const book = await adapter.fetchOrderBook('XLM', 'USDC') + const topBid = book!.bids[0].price + const topAsk = book!.asks[0].price + expect(topBid).toBeLessThan(topAsk) + }) + + it('returns null for unknown pairs', async () => { + adapter.markUnknown('ETH/USD') + const book = await adapter.fetchOrderBook('ETH', 'USD') + expect(book).toBeNull() + }) +}) + +describe('adapter registry', () => { + it('registerCexAdapter + getCexAdapter round-trip', () => { + const adapter = new MockCexAdapter('test-exchange') + registerCexAdapter(adapter) + expect(getCexAdapter('test-exchange')).toBe(adapter) + }) + + it('getCexAdapter returns undefined for unregistered name', () => { + expect(getCexAdapter('nonexistent-exchange')).toBeUndefined() + }) + + it('getAllCexAdapters includes registered adapters', () => { + const adapter = new MockCexAdapter('exchange-a') + registerCexAdapter(adapter) + const all = getAllCexAdapters() + expect(all.some(a => a.name === 'exchange-a')).toBe(true) + }) +}) diff --git a/src/ingest/cex/index.ts b/src/ingest/cex/index.ts new file mode 100644 index 0000000..a5f3fc9 --- /dev/null +++ b/src/ingest/cex/index.ts @@ -0,0 +1,84 @@ +/** + * CEX Adapter Interface + * + * Defines the contract every centralized-exchange adapter must satisfy. + * Concrete adapters (Coinbase, Binance, Kraken, …) implement CexAdapter and + * are registered via createCexAdapter so callers stay exchange-agnostic. + * + * Issue: #99 + */ + +/** Best bid/ask + last-trade snapshot for a single trading pair. */ +export interface CexTicker { + /** Exchange-normalized pair identifier, e.g. "XLM/USDC". */ + pair: string + bid: number + ask: number + /** Price of the most recent trade. */ + last: number + /** 24 h base-asset volume. */ + baseVolume: number + /** 24 h quote-asset volume. */ + quoteVolume: number + timestamp: Date +} + +/** Single resting order in the order book. */ +export interface CexOrderBookLevel { + price: number + quantity: number +} + +/** Aggregated order book snapshot. */ +export interface CexOrderBook { + pair: string + bids: CexOrderBookLevel[] + asks: CexOrderBookLevel[] + timestamp: Date +} + +/** + * Every CEX adapter must implement this interface. + * + * Adapters are stateless fetch wrappers — they must not cache or store data + * themselves; caching is the caller's responsibility. + */ +export interface CexAdapter { + /** Human-readable exchange identifier used in logs and metrics labels. */ + readonly name: string + + /** + * Fetch the latest ticker for a trading pair. + * Returns null when the pair is unknown or the exchange is unreachable. + */ + fetchTicker(base: string, quote: string): Promise + + /** + * Fetch an order book snapshot. + * Optional — adapters that do not expose an order book may omit this method. + * + * @param depth Maximum number of levels on each side (default 20). + */ + fetchOrderBook?(base: string, quote: string, depth?: number): Promise +} + +/** + * Registry of all registered adapters, keyed by name. + * Populated via registerCexAdapter; consumed via getCexAdapter / getAllCexAdapters. + */ +const _registry = new Map() + +/** Register a CEX adapter so it can be retrieved by name. */ +export function registerCexAdapter(adapter: CexAdapter): void { + _registry.set(adapter.name, adapter) +} + +/** Retrieve a registered adapter by exchange name, or undefined if not found. */ +export function getCexAdapter(name: string): CexAdapter | undefined { + return _registry.get(name) +} + +/** Return every registered adapter. */ +export function getAllCexAdapters(): CexAdapter[] { + return [..._registry.values()] +} diff --git a/src/ingest/cex/mock.ts b/src/ingest/cex/mock.ts new file mode 100644 index 0000000..36ce7ec --- /dev/null +++ b/src/ingest/cex/mock.ts @@ -0,0 +1,91 @@ +/** + * Mock CEX Adapter + * + * In-memory implementation of CexAdapter for unit tests and local development. + * Returns deterministic, configurable data — no HTTP calls are made. + * + * Issue: #99 + */ + +import type { CexAdapter, CexOrderBook, CexTicker } from './index' + +export interface MockTickerSeed { + bid: number + ask: number + last: number + baseVolume: number + quoteVolume: number +} + +const DEFAULT_SEED: MockTickerSeed = { + bid: 0.098, + ask: 0.102, + last: 0.100, + baseVolume: 1_000_000, + quoteVolume: 100_000, +} + +/** + * MockCexAdapter satisfies CexAdapter with fully controllable responses. + * + * Seed data can be overridden per-pair so tests can exercise edge cases + * (stale prices, missing pairs, order book depth limits) without touching + * real exchange APIs. + */ +export class MockCexAdapter implements CexAdapter { + readonly name: string + + private readonly seeds = new Map() + private readonly unknownPairs = new Set() + + constructor(name = 'mock') { + this.name = name + } + + /** Override the seed data for a specific pair key (e.g. "XLM/USDC"). */ + setSeed(pair: string, seed: Partial): void { + this.seeds.set(pair, { ...DEFAULT_SEED, ...seed }) + } + + /** Mark a pair as unknown so fetchTicker returns null for it. */ + markUnknown(pair: string): void { + this.unknownPairs.add(pair) + } + + async fetchTicker(base: string, quote: string): Promise { + const pair = `${base.toUpperCase()}/${quote.toUpperCase()}` + if (this.unknownPairs.has(pair)) return null + + const seed = this.seeds.get(pair) ?? DEFAULT_SEED + return { + pair, + bid: seed.bid, + ask: seed.ask, + last: seed.last, + baseVolume: seed.baseVolume, + quoteVolume: seed.quoteVolume, + timestamp: new Date(), + } + } + + async fetchOrderBook(base: string, quote: string, depth = 20): Promise { + const pair = `${base.toUpperCase()}/${quote.toUpperCase()}` + if (this.unknownPairs.has(pair)) return null + + const seed = this.seeds.get(pair) ?? DEFAULT_SEED + const mid = (seed.bid + seed.ask) / 2 + + const bids = Array.from({ length: depth }, (_, i) => ({ + price: parseFloat((seed.bid - i * 0.001).toFixed(6)), + quantity: parseFloat(((i + 1) * 1000).toFixed(2)), + })) + + const asks = Array.from({ length: depth }, (_, i) => ({ + price: parseFloat((seed.ask + i * 0.001).toFixed(6)), + quantity: parseFloat(((i + 1) * 1000).toFixed(2)), + })) + + void mid + return { pair, bids, asks, timestamp: new Date() } + } +}