diff --git a/.env.example b/.env.example index f28978c..7de7d91 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,5 @@ # Database Configuration +# Docker (recommended): docker compose up -d db DATABASE_URL="postgresql://postgres:password@localhost:5432/tradeflow?schema=public" # Alternative Database Configuration (for TypeORM) DB_HOST=localhost diff --git a/README.md b/README.md index 28755cc..65cf312 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,98 @@ Get historical Total Value Locked (TVL) data for the analytics dashboard. curl http://localhost:3000/api/v1/stats/tvl/history ``` +### Admin Metrics + +Aggregated protocol-wide statistics for administrative dashboards. Metrics are recomputed every **5 minutes** by a background scheduler and served from Redis cache (with PostgreSQL snapshot fallback). + +All endpoints require a valid admin JWT obtained via `POST /api/v1/admin/login`. + +#### `POST /api/v1/admin/login` + +Authenticate with the configured `ADMIN_PASSWORD` to obtain a Bearer token. + +```bash +curl -X POST http://localhost:3000/api/v1/admin/login \ + -H "Content-Type: application/json" \ + -d '{"password":"your-admin-password"}' +``` + +**Response:** +```json +{ + "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." +} +``` + +#### `GET /api/v1/admin/metrics/tvl` + +Returns global Total Value Locked (sum of USD value across all active pools). + +**Response Format:** +```json +{ + "success": true, + "data": { + "tvlUSD": 14500000.50, + "poolCount": 12, + "lastUpdated": "2026-06-24T12:00:00.000Z" + }, + "timestamp": "2026-06-24T12:00:00.000Z" +} +``` + +**Example:** +```bash +curl http://localhost:3000/api/v1/admin/metrics/tvl \ + -H "Authorization: Bearer " +``` + +#### `GET /api/v1/admin/metrics/revenue` + +Returns total cumulative protocol fees routed to the Treasury (derived from indexed swaps × pool fee tiers). + +**Response Format:** +```json +{ + "success": true, + "data": { + "totalRevenueUSD": 125000.75, + "lastUpdated": "2026-06-24T12:00:00.000Z" + }, + "timestamp": "2026-06-24T12:00:00.000Z" +} +``` + +**Example:** +```bash +curl http://localhost:3000/api/v1/admin/metrics/revenue \ + -H "Authorization: Bearer " +``` + +#### `GET /api/v1/admin/metrics/active-users` + +Returns distinct active trader counts for rolling 24-hour, 7-day, and 30-day windows. + +**Response Format:** +```json +{ + "success": true, + "data": { + "activeUsers24h": 142, + "activeUsers7d": 890, + "activeUsers30d": 3200, + "lastUpdated": "2026-06-24T12:00:00.000Z" + }, + "timestamp": "2026-06-24T12:00:00.000Z" +} +``` + +**Example:** +```bash +curl http://localhost:3000/api/v1/admin/metrics/active-users \ + -H "Authorization: Bearer " +``` + ### Analytics Endpoints #### `GET /api/v1/analytics/leaderboard` diff --git a/docker-compose.yml b/docker-compose.yml index 8f71e83..36ecb54 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,14 +1,25 @@ -version: '3.8' services: api: build: . ports: ["3000:3000"] environment: - DATABASE_HOST=db + - DATABASE_URL=postgresql://postgres:password@db:5432/tradeflow?schema=public depends_on: - db + - redis db: image: postgres:15-alpine ports: ["5432:5432"] environment: - - POSTGRES_PASSWORD=password + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: tradeflow + volumes: + - tradeflow_pgdata:/var/lib/postgresql/data + redis: + image: redis:7-alpine + ports: ["6379:6379"] + +volumes: + tradeflow_pgdata: diff --git a/package-lock.json b/package-lock.json index 50bc4d8..e7d0d7c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25,6 +25,7 @@ "class-transformer": "^0.5.1", "class-validator": "^0.14.4", "compression": "^1.7.4", + "express-rate-limit": "^8.5.2", "ioredis": "^5.10.1", "jsonwebtoken": "^9.0.3", "lru-cache": "^11.2.7", @@ -4520,6 +4521,24 @@ "node": "^18.14.0 || ^20.0.0 || ^22.0.0 || >=24.0.0" } }, + "node_modules/express-rate-limit": { + "version": "8.5.2", + "resolved": "https://registry.npmjs.org/express-rate-limit/-/express-rate-limit-8.5.2.tgz", + "integrity": "sha512-5Kb34ipNX694DH48vN9irak1Qx30nb0PLYHXfJgw4YEjiC3ZEmZJhwOp+VfiCYwFzvFTdB9QkArYS5kXa2cx2A==", + "license": "MIT", + "dependencies": { + "ip-address": "^10.2.0" + }, + "engines": { + "node": ">= 16" + }, + "funding": { + "url": "https://github.com/sponsors/express-rate-limit" + }, + "peerDependencies": { + "express": ">= 4.11" + } + }, "node_modules/fast-deep-equal": { "version": "3.1.3", "dev": true, @@ -5122,6 +5141,15 @@ "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", "license": "MIT" }, + "node_modules/ip-address": { + "version": "10.2.0", + "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-10.2.0.tgz", + "integrity": "sha512-/+S6j4E9AHvW9SWMSEY9Xfy66O5PWvVEJ08O0y5JGyEKQpojb0K0GKpz/v5HJ/G0vi3D2sjGK78119oXZeE0qA==", + "license": "MIT", + "engines": { + "node": ">= 12" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "license": "MIT", diff --git a/package.json b/package.json index 2c3f9fa..5efcb5d 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ "class-transformer": "^0.5.1", "class-validator": "^0.14.4", "compression": "^1.7.4", + "express-rate-limit": "^8.5.2", "ioredis": "^5.10.1", "jsonwebtoken": "^9.0.3", "lru-cache": "^11.2.7", diff --git a/prisma/migrations/20260624000000_init/migration.sql b/prisma/migrations/20260624000000_init/migration.sql new file mode 100644 index 0000000..2ed3694 --- /dev/null +++ b/prisma/migrations/20260624000000_init/migration.sql @@ -0,0 +1,74 @@ +-- CreateEnum +CREATE TYPE "OrderStatus" AS ENUM ('PENDING', 'EXECUTED', 'CANCELLED'); + +-- CreateTable +CREATE TABLE "orders" ( + "id" TEXT NOT NULL, + "userId" TEXT NOT NULL, + "assetPair" TEXT NOT NULL, + "side" TEXT NOT NULL, + "price" TEXT NOT NULL, + "quantity" TEXT NOT NULL, + "status" "OrderStatus" NOT NULL DEFAULT 'PENDING', + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "executedAt" TIMESTAMP(3), + + CONSTRAINT "orders_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "pools" ( + "id" TEXT NOT NULL, + "address" TEXT NOT NULL, + "tokenA" TEXT NOT NULL, + "tokenB" TEXT NOT NULL, + "fee" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "pools_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "trades" ( + "id" TEXT NOT NULL, + "poolId" TEXT NOT NULL, + "userAddress" TEXT NOT NULL, + "amountIn" TEXT NOT NULL, + "amountOut" TEXT NOT NULL, + "timestamp" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "trades_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "tokens" ( + "id" TEXT NOT NULL, + "address" TEXT NOT NULL, + "symbol" TEXT NOT NULL, + "name" TEXT NOT NULL, + "decimals" INTEGER NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "tokens_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "orders_userId_idx" ON "orders"("userId"); + +-- CreateIndex +CREATE INDEX "orders_status_idx" ON "orders"("status"); + +-- CreateIndex +CREATE INDEX "orders_userId_status_idx" ON "orders"("userId", "status"); + +-- CreateIndex +CREATE UNIQUE INDEX "pools_address_key" ON "pools"("address"); + +-- CreateIndex +CREATE UNIQUE INDEX "tokens_address_key" ON "tokens"("address"); + +-- AddForeignKey +ALTER TABLE "trades" ADD CONSTRAINT "trades_poolId_fkey" FOREIGN KEY ("poolId") REFERENCES "pools"("id") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/prisma/migrations/20260624120000_add_protocol_metrics/migration.sql b/prisma/migrations/20260624120000_add_protocol_metrics/migration.sql new file mode 100644 index 0000000..41881e9 --- /dev/null +++ b/prisma/migrations/20260624120000_add_protocol_metrics/migration.sql @@ -0,0 +1,25 @@ +-- AlterTable +ALTER TABLE "pools" ADD COLUMN "reserveA" TEXT, +ADD COLUMN "reserveB" TEXT, +ADD COLUMN "tvlUsd" DECIMAL(20,8), +ADD COLUMN "isActive" BOOLEAN NOT NULL DEFAULT true, +ADD COLUMN "lastSyncedAt" TIMESTAMP(3); + +-- CreateTable +CREATE TABLE "protocol_metrics_snapshots" ( + "id" TEXT NOT NULL, + "globalTvlUsd" DECIMAL(20,8) NOT NULL, + "totalRevenueUsd" DECIMAL(20,8) NOT NULL, + "activeUsers24h" INTEGER NOT NULL, + "activeUsers7d" INTEGER NOT NULL, + "activeUsers30d" INTEGER NOT NULL, + "computedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "protocol_metrics_snapshots_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "protocol_metrics_snapshots_computedAt_idx" ON "protocol_metrics_snapshots"("computedAt"); + +-- CreateIndex +CREATE INDEX "trades_userAddress_timestamp_idx" ON "trades"("userAddress", "timestamp"); diff --git a/prisma/migrations/migration_lock.toml b/prisma/migrations/migration_lock.toml new file mode 100644 index 0000000..99e4f20 --- /dev/null +++ b/prisma/migrations/migration_lock.toml @@ -0,0 +1,3 @@ +# Please do not edit this file manually +# It should be added in your version-control system (i.e. Git) +provider = "postgresql" diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 23fdd26..e5baae0 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -1,4 +1,4 @@ -// This is your Prisma schema file, +// This is your Prisma schema file, // learn more about it in the docs: https://pris.ly/d/prisma-schema generator client { @@ -44,23 +44,42 @@ model Trade { pool Pool @relation(fields: [poolId], references: [id]) + @@index([userAddress, timestamp]) @@map("trades") } model Pool { - id String @id @default(cuid()) - address String @unique - tokenA String - tokenB String - fee String - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt - + id String @id @default(cuid()) + address String @unique + tokenA String + tokenB String + fee String + reserveA String? + reserveB String? + tvlUsd Decimal? @db.Decimal(20, 8) + isActive Boolean @default(true) + lastSyncedAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + trades Trade[] - + @@map("pools") } +model ProtocolMetricsSnapshot { + id String @id @default(cuid()) + globalTvlUsd Decimal @db.Decimal(20, 8) + totalRevenueUsd Decimal @db.Decimal(20, 8) + activeUsers24h Int + activeUsers7d Int + activeUsers30d Int + computedAt DateTime @default(now()) + + @@index([computedAt]) + @@map("protocol_metrics_snapshots") +} + model Token { id String @id @default(cuid()) address String @unique @@ -69,6 +88,6 @@ model Token { decimals Int createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - + @@map("tokens") } diff --git a/services/eventIndexer.js b/services/eventIndexer.js index f0882c0..30f96c8 100644 --- a/services/eventIndexer.js +++ b/services/eventIndexer.js @@ -110,10 +110,22 @@ async function handleContractEvent(event) { console.log('Decoded Payload:', JSON.stringify(payload, null, 2)); + // Ensure pool exists and resolve FK to Pool.id (not contract address) + const pool = await prisma.pool.upsert({ + where: { address: event.contractId }, + update: {}, + create: { + address: event.contractId, + tokenA: 'unknown', + tokenB: 'unknown', + fee: '30', + }, + }); + // Map Soroban event data to our Prisma Trade model // Expected structure from SwapEvent: { user, amount_in, amount_out } const tradeData = { - poolId: event.contractId, + poolId: pool.id, userAddress: payload.user || payload.address || 'Unknown', amountIn: (payload.amount_in || payload.amountIn || '0').toString(), amountOut: (payload.amount_out || payload.amountOut || '0').toString(), @@ -122,7 +134,7 @@ async function handleContractEvent(event) { // Save to Database via Prisma const savedTrade = await prisma.trade.create({ - data: tradeData + data: tradeData, }); console.log(`💾 Indexed Trade saved. DB ID: ${savedTrade.id}`); diff --git a/src/app.module.ts b/src/app.module.ts index ccfa9b4..4ec0173 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -14,6 +14,7 @@ import { OgModule } from './og/og.module'; import { TradeModule } from './trade/trade.module'; import { OrdersModule } from './orders/orders.module'; import { GasModule } from './gas/gas.module'; +import { MetricsModule } from './metrics/metrics.module'; import { ConfigModule } from '@nestjs/config'; import { MaintenanceMiddleware } from './common/middleware/maintenance.middleware'; import { RequestIdMiddleware } from './common/middleware/request-id.middleware'; @@ -34,6 +35,7 @@ import { RedisModule } from './common/redis/redis.module'; TradeModule, OrdersModule, GasModule, + MetricsModule, ], controllers: [AppController], providers: [ diff --git a/src/auth/auth.module.ts b/src/auth/auth.module.ts index 6448d1a..8b33b35 100644 --- a/src/auth/auth.module.ts +++ b/src/auth/auth.module.ts @@ -3,10 +3,11 @@ import { AuthController } from './auth.controller'; import { AdminController } from './admin.controller'; import { WebhookController } from './webhook.controller'; import { AuthService } from './auth.service'; +import { AdminGuard } from './guards/admin.guard'; @Module({ controllers: [AuthController, AdminController, WebhookController], - providers: [AuthService], - exports: [AuthService], // Export for middleware use + providers: [AuthService, AdminGuard], + exports: [AuthService, AdminGuard], }) export class AuthModule {} diff --git a/src/auth/guards/admin.guard.spec.ts b/src/auth/guards/admin.guard.spec.ts new file mode 100644 index 0000000..dc90e1e --- /dev/null +++ b/src/auth/guards/admin.guard.spec.ts @@ -0,0 +1,82 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ExecutionContext, UnauthorizedException } from '@nestjs/common'; +import { AdminGuard } from './admin.guard'; +import { AuthService } from '../auth.service'; + +/** + * Unit tests for the AdminGuard. + * Verifies JWT validation and admin role enforcement. + */ +describe('AdminGuard', () => { + let guard: AdminGuard; + let authService: { verifyJWT: jest.Mock }; + + const createContext = (authHeader?: string): ExecutionContext => { + const request: { headers: Record; user?: unknown } = { + headers: {}, + }; + if (authHeader) { + request.headers.authorization = authHeader; + } + + return { + switchToHttp: () => ({ + getRequest: () => request, + }), + } as ExecutionContext; + }; + + beforeEach(async () => { + authService = { + verifyJWT: jest.fn(), + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + AdminGuard, + { provide: AuthService, useValue: authService }, + ], + }).compile(); + + guard = module.get(AdminGuard); + }); + + /** + * Basic sanity check to ensure the guard is correctly instantiated. + */ + it('should be defined', () => { + expect(guard).toBeDefined(); + }); + + /** + * Rejects requests without an Authorization header. + */ + it('should throw UnauthorizedException when Authorization header is missing', () => { + expect(() => guard.canActivate(createContext())).toThrow(UnauthorizedException); + }); + + /** + * Rejects wallet JWTs that lack the admin role. + */ + it('should reject non-admin JWT payloads', () => { + authService.verifyJWT.mockReturnValue({ publicKey: 'GABC123', sub: 'GABC123' }); + + expect(() => guard.canActivate(createContext('Bearer wallet-token'))).toThrow( + UnauthorizedException, + ); + }); + + /** + * Accepts valid admin JWTs and attaches the payload to the request. + */ + it('should allow admin JWT with role admin (Acceptance Criteria)', () => { + const adminPayload = { role: 'admin', iat: 1234567890 }; + authService.verifyJWT.mockReturnValue(adminPayload); + + const context = createContext('Bearer admin-token'); + const result = guard.canActivate(context); + + expect(result).toBe(true); + expect(context.switchToHttp().getRequest()['user']).toEqual(adminPayload); + }); +}); diff --git a/src/auth/guards/admin.guard.ts b/src/auth/guards/admin.guard.ts new file mode 100644 index 0000000..a213012 --- /dev/null +++ b/src/auth/guards/admin.guard.ts @@ -0,0 +1,58 @@ +import { + Injectable, + CanActivate, + ExecutionContext, + UnauthorizedException, +} from '@nestjs/common'; +import { Request } from 'express'; +import { AuthService } from '../auth.service'; + +/** + * Admin JWT Verification Guard. + * + * Protects administrative routes by requiring a valid Bearer JWT + * with `role: 'admin'` in the payload (issued by POST /api/v1/admin/login). + * + * Requirements: + * - Authorization header must be present and use Bearer scheme + * - Token must be a valid, non-expired JWT signed with JWT_SECRET + * - Token payload must include role === 'admin' + */ +@Injectable() +export class AdminGuard implements CanActivate { + constructor(private readonly authService: AuthService) {} + + /** + * Validates the incoming request by checking the admin JWT. + * + * @param context - The execution context of the request. + * @returns true if the request is authorized. + * @throws UnauthorizedException if the token is missing, invalid, or not an admin token. + */ + canActivate(context: ExecutionContext): boolean { + const request = context.switchToHttp().getRequest(); + const authHeader = request.headers.authorization; + + if (!authHeader || !authHeader.startsWith('Bearer ')) { + throw new UnauthorizedException('Missing or invalid Authorization header'); + } + + const token = authHeader.split(' ')[1]; + + try { + const payload = this.authService.verifyJWT(token); + + if (payload.role !== 'admin') { + throw new UnauthorizedException('Admin access required'); + } + + request['user'] = payload; + return true; + } catch (error) { + if (error instanceof UnauthorizedException) { + throw error; + } + throw new UnauthorizedException('Invalid or expired token'); + } + } +} diff --git a/src/common/logger/custom.logger.ts b/src/common/logger/custom.logger.ts index 14108fe..bf04bec 100644 --- a/src/common/logger/custom.logger.ts +++ b/src/common/logger/custom.logger.ts @@ -1,4 +1,4 @@ -import { ConsoleLogger, Injectable, Scope } from '@nestjs/common'; +import { ConsoleLogger, Injectable, Scope, LogLevel } from '@nestjs/common'; import { requestContextStorage } from '../storage/request-context.storage'; /** @@ -19,7 +19,7 @@ export class CustomLogger extends ConsoleLogger { * @returns The fully formatted log string. */ formatMessage( - logLevel: string, + logLevel: LogLevel, message: unknown, pidMessage: string, formattedLogLevel: string, diff --git a/src/dynamic/dynamic.module.ts b/src/dynamic/dynamic.module.ts index 057f800..88e070f 100644 --- a/src/dynamic/dynamic.module.ts +++ b/src/dynamic/dynamic.module.ts @@ -1,9 +1,7 @@ import { Module } from '@nestjs/common'; -import { InvoicesController } from './invoices.controller'; -import { PdfService } from './pdf.service'; +import { NetworkController } from './dynamic.controller'; @Module({ - controllers: [InvoicesController], - providers: [PdfService], + controllers: [NetworkController], }) -export class InvoicesModule {} +export class DynamicModule {} diff --git a/src/gas/gas.service.ts b/src/gas/gas.service.ts index f90272b..964fe43 100644 --- a/src/gas/gas.service.ts +++ b/src/gas/gas.service.ts @@ -1,6 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; +import { Horizon } from '@stellar/stellar-sdk'; import { RedisService } from '../common/redis/redis.service'; -import { Server } from '@stellar/stellar-sdk/rpc'; export interface FeeTiers { low: string; @@ -32,10 +32,14 @@ export class GasService { private async fetchAndCache() { try { - const rpcUrl = process.env.STELLAR_RPC_URL || 'https://soroban-testnet.stellar.org'; - const server = new Server(rpcUrl); - const ledger = await server.getLatestLedger(); - const baseFee = parseInt(ledger.baseFeeInStroops || '100', 10); + const horizonUrl = + process.env.HORIZON_URL || 'https://horizon-testnet.stellar.org'; + const server = new Horizon.Server(horizonUrl); + const ledger = await server.ledgers().order('desc').limit(1).call(); + const baseFee = parseInt( + ledger.records[0]?.base_fee_in_stroops?.toString() || '100', + 10, + ); const tiers: FeeTiers = { low: Math.ceil(baseFee * 1.0).toString(), diff --git a/src/main.ts b/src/main.ts index 87169f5..d7376a2 100644 --- a/src/main.ts +++ b/src/main.ts @@ -6,7 +6,7 @@ import { IndexerJob } from './jobs/indexer'; import { CustomLogger } from './common/logger/custom.logger'; import rateLimit from 'express-rate-limit'; import RedisStore from 'rate-limit-redis'; -import compression from 'compression'; +import * as compression from 'compression'; // Redis is optional - gracefully fall back to in-memory limiting if unavailable let redis: any = null; diff --git a/src/metrics/metrics.controller.ts b/src/metrics/metrics.controller.ts new file mode 100644 index 0000000..adeb7fb --- /dev/null +++ b/src/metrics/metrics.controller.ts @@ -0,0 +1,137 @@ +import { Controller, Get, HttpStatus, UseGuards } from '@nestjs/common'; +import { ApiBearerAuth, ApiOperation, ApiResponse, ApiTags } from '@nestjs/swagger'; +import { AdminGuard } from '../auth/guards/admin.guard'; +import { + MetricsService, + TvlMetrics, + RevenueMetrics, + ActiveUsersMetrics, +} from './metrics.service'; + +/** + * Controller for administrative protocol metrics endpoints. + * Provides global TVL, cumulative protocol revenue, and active trader statistics. + * All routes require a valid admin JWT. + */ +@ApiTags('admin') +@ApiBearerAuth() +@UseGuards(AdminGuard) +@Controller('api/v1/admin/metrics') +export class MetricsController { + constructor(private readonly metricsService: MetricsService) {} + + /** + * Retrieves global Total Value Locked across all active pools. + * + * @returns Wrapped TVL metrics response. + */ + @Get('tvl') + @ApiOperation({ summary: 'Get global protocol TVL' }) + @ApiResponse({ + status: HttpStatus.OK, + description: 'Successfully retrieved global TVL', + schema: { + type: 'object', + properties: { + success: { type: 'boolean', example: true }, + data: { + type: 'object', + properties: { + tvlUSD: { type: 'number', example: 14500000.5 }, + poolCount: { type: 'number', example: 12 }, + lastUpdated: { type: 'string', example: '2026-06-24T12:00:00.000Z' }, + }, + }, + timestamp: { type: 'string', example: '2026-06-24T12:00:00.000Z' }, + }, + }, + }) + @ApiResponse({ status: HttpStatus.UNAUTHORIZED, description: 'Unauthorized' }) + @ApiResponse({ status: HttpStatus.SERVICE_UNAVAILABLE, description: 'Metrics not yet computed' }) + async getTvl(): Promise<{ success: boolean; data: TvlMetrics; timestamp: string }> { + const data = await this.metricsService.getTvl(); + return { + success: true, + data, + timestamp: new Date().toISOString(), + }; + } + + /** + * Retrieves total cumulative protocol fees routed to the Treasury. + * + * @returns Wrapped revenue metrics response. + */ + @Get('revenue') + @ApiOperation({ summary: 'Get cumulative protocol revenue' }) + @ApiResponse({ + status: HttpStatus.OK, + description: 'Successfully retrieved protocol revenue', + schema: { + type: 'object', + properties: { + success: { type: 'boolean', example: true }, + data: { + type: 'object', + properties: { + totalRevenueUSD: { type: 'number', example: 125000.75 }, + lastUpdated: { type: 'string', example: '2026-06-24T12:00:00.000Z' }, + }, + }, + timestamp: { type: 'string', example: '2026-06-24T12:00:00.000Z' }, + }, + }, + }) + @ApiResponse({ status: HttpStatus.UNAUTHORIZED, description: 'Unauthorized' }) + @ApiResponse({ status: HttpStatus.SERVICE_UNAVAILABLE, description: 'Metrics not yet computed' }) + async getRevenue(): Promise<{ success: boolean; data: RevenueMetrics; timestamp: string }> { + const data = await this.metricsService.getRevenue(); + return { + success: true, + data, + timestamp: new Date().toISOString(), + }; + } + + /** + * Retrieves active unique trader counts for 24h, 7d, and 30d windows. + * + * @returns Wrapped active users metrics response. + */ + @Get('active-users') + @ApiOperation({ summary: 'Get active unique trader counts' }) + @ApiResponse({ + status: HttpStatus.OK, + description: 'Successfully retrieved active user counts', + schema: { + type: 'object', + properties: { + success: { type: 'boolean', example: true }, + data: { + type: 'object', + properties: { + activeUsers24h: { type: 'number', example: 142 }, + activeUsers7d: { type: 'number', example: 890 }, + activeUsers30d: { type: 'number', example: 3200 }, + lastUpdated: { type: 'string', example: '2026-06-24T12:00:00.000Z' }, + }, + }, + timestamp: { type: 'string', example: '2026-06-24T12:00:00.000Z' }, + }, + }, + }) + @ApiResponse({ status: HttpStatus.UNAUTHORIZED, description: 'Unauthorized' }) + @ApiResponse({ status: HttpStatus.SERVICE_UNAVAILABLE, description: 'Metrics not yet computed' }) + async getActiveUsers(): Promise<{ + success: boolean; + data: ActiveUsersMetrics; + timestamp: string; + }> { + const data = await this.metricsService.getActiveUsers(); + return { + success: true, + data, + timestamp: new Date().toISOString(), + }; + } +} diff --git a/src/metrics/metrics.module.ts b/src/metrics/metrics.module.ts new file mode 100644 index 0000000..2856ba8 --- /dev/null +++ b/src/metrics/metrics.module.ts @@ -0,0 +1,14 @@ +import { Module } from '@nestjs/common'; +import { PrismaModule } from '../prisma/prisma.module'; +import { AuthModule } from '../auth/auth.module'; +import { PricesModule } from '../prices/prices.module'; +import { MetricsController } from './metrics.controller'; +import { MetricsService } from './metrics.service'; +import { MetricsScheduler } from './metrics.scheduler'; + +@Module({ + imports: [PrismaModule, AuthModule, PricesModule], + controllers: [MetricsController], + providers: [MetricsService, MetricsScheduler], +}) +export class MetricsModule {} diff --git a/src/metrics/metrics.scheduler.ts b/src/metrics/metrics.scheduler.ts new file mode 100644 index 0000000..1681b17 --- /dev/null +++ b/src/metrics/metrics.scheduler.ts @@ -0,0 +1,28 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import * as cron from 'node-cron'; +import { MetricsService } from './metrics.service'; + +/** + * Schedules periodic recomputation of protocol macro metrics. + * Runs every 5 minutes, matching the existing IndexerJob cron interval. + */ +@Injectable() +export class MetricsScheduler implements OnModuleInit { + private readonly logger = new Logger(MetricsScheduler.name); + + constructor(private readonly metricsService: MetricsService) {} + + /** + * Registers the cron job and runs an initial computation on startup. + */ + onModuleInit() { + cron.schedule('*/5 * * * *', () => { + this.metricsService.computeAndCache(); + }); + + this.logger.log('Metrics scheduler initialized (every 5 minutes)'); + + // Warm cache on startup + this.metricsService.computeAndCache(); + } +} diff --git a/src/metrics/metrics.service.spec.ts b/src/metrics/metrics.service.spec.ts new file mode 100644 index 0000000..207d4ec --- /dev/null +++ b/src/metrics/metrics.service.spec.ts @@ -0,0 +1,154 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ServiceUnavailableException } from '@nestjs/common'; +import { MetricsService } from './metrics.service'; +import { PrismaService } from '../prisma/prisma.service'; +import { RedisService } from '../common/redis/redis.service'; +import { PricesService } from '../prices/prices.service'; + +/** + * Unit tests for the MetricsService. + * Verifies aggregation logic, cache reads, and acceptance criteria behavior. + */ +describe('MetricsService', () => { + let service: MetricsService; + + const mockRedisPublisher = { + get: jest.fn(), + set: jest.fn(), + }; + + const mockPrisma = { + pool: { + findMany: jest.fn(), + count: jest.fn(), + update: jest.fn(), + }, + trade: { + findMany: jest.fn(), + }, + token: { + findMany: jest.fn(), + }, + protocolMetricsSnapshot: { + create: jest.fn(), + findFirst: jest.fn(), + }, + $queryRaw: jest.fn(), + }; + + const mockPricesService = { + getPrices: jest.fn().mockResolvedValue({ + data: [ + { symbol: 'USDC', price: 1.0 }, + { symbol: 'XLM', price: 0.12 }, + ], + cached: false, + }), + }; + + beforeEach(async () => { + jest.clearAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + MetricsService, + { provide: PrismaService, useValue: mockPrisma }, + { + provide: RedisService, + useValue: { redisPublisher: mockRedisPublisher }, + }, + { provide: PricesService, useValue: mockPricesService }, + ], + }).compile(); + + service = module.get(MetricsService); + }); + + /** + * Basic sanity check to ensure the service is correctly instantiated. + */ + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + /** + * Returns TVL from Redis cache when available. + */ + it('should return TVL metrics from Redis cache (Acceptance Criteria)', async () => { + const cached = { + tvlUSD: 5000000, + poolCount: 3, + lastUpdated: '2026-06-24T12:00:00.000Z', + }; + mockRedisPublisher.get.mockResolvedValue(JSON.stringify(cached)); + + const result = await service.getTvl(); + + expect(result).toEqual(cached); + expect(mockPrisma.protocolMetricsSnapshot.findFirst).not.toHaveBeenCalled(); + }); + + /** + * Throws when metrics have never been computed and Redis is empty. + */ + it('should throw ServiceUnavailableException when no metrics exist', async () => { + mockRedisPublisher.get.mockResolvedValue(null); + mockPrisma.protocolMetricsSnapshot.findFirst.mockResolvedValue(null); + + await expect(service.getTvl()).rejects.toThrow(ServiceUnavailableException); + }); + + /** + * Aggregates global TVL from active pools with reserves and caches results. + */ + it('should compute and cache global TVL from active pools', async () => { + mockPrisma.pool.findMany.mockResolvedValue([ + { + id: 'pool-1', + reserveA: '1000000000', + reserveB: '500000000', + tokenA: 'usdc-addr', + tokenB: 'xlm-addr', + fee: '30', + isActive: true, + }, + ]); + mockPrisma.pool.update.mockResolvedValue({}); + mockPrisma.token.findMany.mockResolvedValue([ + { address: 'usdc-addr', symbol: 'USDC', decimals: 7 }, + { address: 'xlm-addr', symbol: 'XLM', decimals: 7 }, + ]); + mockPrisma.trade.findMany.mockResolvedValue([]); + mockPrisma.$queryRaw + .mockResolvedValueOnce([{ count: BigInt(10) }]) + .mockResolvedValueOnce([{ count: BigInt(50) }]) + .mockResolvedValueOnce([{ count: BigInt(200) }]); + mockPrisma.protocolMetricsSnapshot.create.mockResolvedValue({ id: 'snap-1' }); + + await service.computeAndCache(); + + expect(mockPrisma.protocolMetricsSnapshot.create).toHaveBeenCalled(); + expect(mockRedisPublisher.set).toHaveBeenCalledWith( + 'metrics:global:tvl', + expect.any(String), + 'EX', + 360, + ); + }); + + /** + * Falls back to latest DB snapshot for revenue when Redis is cold. + */ + it('should fall back to DB snapshot for revenue metrics', async () => { + mockRedisPublisher.get.mockResolvedValue(null); + mockPrisma.protocolMetricsSnapshot.findFirst.mockResolvedValue({ + totalRevenueUsd: { toString: () => '12500.50' }, + computedAt: new Date('2026-06-24T12:00:00.000Z'), + }); + + const result = await service.getRevenue(); + + expect(result.totalRevenueUSD).toBe(12500.5); + expect(result.lastUpdated).toBe('2026-06-24T12:00:00.000Z'); + }); +}); diff --git a/src/metrics/metrics.service.ts b/src/metrics/metrics.service.ts new file mode 100644 index 0000000..5fe8a19 --- /dev/null +++ b/src/metrics/metrics.service.ts @@ -0,0 +1,331 @@ +import { Injectable, Logger, ServiceUnavailableException } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { RedisService } from '../common/redis/redis.service'; +import { PricesService } from '../prices/prices.service'; + +export interface TvlMetrics { + tvlUSD: number; + poolCount: number; + lastUpdated: string; +} + +export interface RevenueMetrics { + totalRevenueUSD: number; + lastUpdated: string; +} + +export interface ActiveUsersMetrics { + activeUsers24h: number; + activeUsers7d: number; + activeUsers30d: number; + lastUpdated: string; +} + +const CACHE_TTL_SECONDS = 360; +const DEFAULT_FEE_BPS = 30; + +/** + * Service responsible for computing and serving protocol-wide metrics. + * Heavy aggregation runs on a schedule; API reads from Redis with DB fallback. + */ +@Injectable() +export class MetricsService { + private readonly logger = new Logger(MetricsService.name); + private readonly CACHE_KEY_TVL = 'metrics:global:tvl'; + private readonly CACHE_KEY_REVENUE = 'metrics:protocol:revenue'; + private readonly CACHE_KEY_ACTIVE_USERS = 'metrics:active-users'; + + constructor( + private readonly prisma: PrismaService, + private readonly redis: RedisService, + private readonly pricesService: PricesService, + ) {} + + /** + * Retrieves cached global TVL metrics. + * + * @returns TVL metrics including pool count and last update timestamp. + * @throws ServiceUnavailableException if metrics have never been computed. + */ + async getTvl(): Promise { + const cached = await this.getFromRedis(this.CACHE_KEY_TVL); + if (cached) return cached; + + const snapshot = await this.getLatestSnapshot(); + if (!snapshot) { + throw new ServiceUnavailableException('Metrics not yet available'); + } + + return { + tvlUSD: this.decimalToNumber(snapshot.globalTvlUsd), + poolCount: await this.prisma.pool.count({ where: { isActive: true } }), + lastUpdated: snapshot.computedAt.toISOString(), + }; + } + + /** + * Retrieves cached cumulative protocol revenue metrics. + * + * @returns Total protocol revenue routed to treasury in USD. + * @throws ServiceUnavailableException if metrics have never been computed. + */ + async getRevenue(): Promise { + const cached = await this.getFromRedis(this.CACHE_KEY_REVENUE); + if (cached) return cached; + + const snapshot = await this.getLatestSnapshot(); + if (!snapshot) { + throw new ServiceUnavailableException('Metrics not yet available'); + } + + return { + totalRevenueUSD: this.decimalToNumber(snapshot.totalRevenueUsd), + lastUpdated: snapshot.computedAt.toISOString(), + }; + } + + /** + * Retrieves cached active unique trader counts for 24h, 7d, and 30d windows. + * + * @returns Active user counts per time window. + * @throws ServiceUnavailableException if metrics have never been computed. + */ + async getActiveUsers(): Promise { + const cached = await this.getFromRedis(this.CACHE_KEY_ACTIVE_USERS); + if (cached) return cached; + + const snapshot = await this.getLatestSnapshot(); + if (!snapshot) { + throw new ServiceUnavailableException('Metrics not yet available'); + } + + return { + activeUsers24h: snapshot.activeUsers24h, + activeUsers7d: snapshot.activeUsers7d, + activeUsers30d: snapshot.activeUsers30d, + lastUpdated: snapshot.computedAt.toISOString(), + }; + } + + /** + * Computes all macro metrics, persists a snapshot, and writes to Redis cache. + * Called by MetricsScheduler every 5 minutes. + */ + async computeAndCache(): Promise { + const start = Date.now(); + this.logger.log('Starting metrics aggregation...'); + + try { + const priceMap = await this.buildPriceMap(); + const tokenMap = await this.buildTokenMap(); + + const activePools = await this.prisma.pool.findMany({ + where: { isActive: true }, + }); + + let globalTvlUsd = 0; + + for (const pool of activePools) { + const poolTvl = this.calculatePoolTvlUsd(pool, tokenMap, priceMap); + globalTvlUsd += poolTvl; + + await this.prisma.pool.update({ + where: { id: pool.id }, + data: { tvlUsd: poolTvl }, + }); + } + + const totalRevenueUsd = await this.calculateTotalRevenueUsd(priceMap, tokenMap); + + const [activeUsers24h, activeUsers7d, activeUsers30d] = await Promise.all([ + this.countActiveUsers(24), + this.countActiveUsers(24 * 7), + this.countActiveUsers(24 * 30), + ]); + + const computedAt = new Date(); + const lastUpdated = computedAt.toISOString(); + + const snapshot = await this.prisma.protocolMetricsSnapshot.create({ + data: { + globalTvlUsd, + totalRevenueUsd, + activeUsers24h, + activeUsers7d, + activeUsers30d, + computedAt, + }, + }); + + const tvlMetrics: TvlMetrics = { + tvlUSD: globalTvlUsd, + poolCount: activePools.length, + lastUpdated, + }; + + const revenueMetrics: RevenueMetrics = { + totalRevenueUSD: totalRevenueUsd, + lastUpdated, + }; + + const activeUsersMetrics: ActiveUsersMetrics = { + activeUsers24h, + activeUsers7d, + activeUsers30d, + lastUpdated, + }; + + await Promise.all([ + this.setInRedis(this.CACHE_KEY_TVL, tvlMetrics), + this.setInRedis(this.CACHE_KEY_REVENUE, revenueMetrics), + this.setInRedis(this.CACHE_KEY_ACTIVE_USERS, activeUsersMetrics), + ]); + + this.logger.log( + `Metrics aggregation completed in ${Date.now() - start}ms (snapshot: ${snapshot.id})`, + ); + } catch (error) { + this.logger.error('Metrics aggregation failed', error.message); + } + } + + /** + * Calculates USD TVL for a single pool from reserves and token prices. + */ + private calculatePoolTvlUsd( + pool: { + reserveA: string | null; + reserveB: string | null; + tokenA: string; + tokenB: string; + }, + tokenMap: Map, + priceMap: Map, + ): number { + if (!pool.reserveA || !pool.reserveB) { + return 0; + } + + const tokenA = tokenMap.get(pool.tokenA.toLowerCase()); + const tokenB = tokenMap.get(pool.tokenB.toLowerCase()); + + const reserveAUsd = this.amountToUsd( + pool.reserveA, + tokenA?.decimals ?? 7, + priceMap.get(tokenA?.symbol ?? '') ?? 0, + ); + + const reserveBUsd = this.amountToUsd( + pool.reserveB, + tokenB?.decimals ?? 7, + priceMap.get(tokenB?.symbol ?? '') ?? 0, + ); + + return reserveAUsd + reserveBUsd; + } + + /** + * Sums protocol fees across all trades using pool fee tiers. + */ + private async calculateTotalRevenueUsd( + priceMap: Map, + tokenMap: Map, + ): Promise { + const trades = await this.prisma.trade.findMany({ + include: { pool: true }, + }); + + let total = 0; + + for (const trade of trades) { + const feeBps = this.parseFeeBps(trade.pool.fee); + const amountIn = parseFloat(trade.amountIn) || 0; + const feeAmount = amountIn * (feeBps / 10_000); + + const tokenA = tokenMap.get(trade.pool.tokenA.toLowerCase()); + const symbol = tokenA?.symbol ?? 'XLM'; + const decimals = tokenA?.decimals ?? 7; + const price = priceMap.get(symbol) ?? 0; + + total += (feeAmount / Math.pow(10, decimals)) * price; + } + + return total; + } + + /** + * Counts distinct active traders within a rolling hour window. + */ + private async countActiveUsers(hours: number): Promise { + const since = new Date(Date.now() - hours * 60 * 60 * 1000); + + const result = await this.prisma.$queryRaw<{ count: bigint }[]>` + SELECT COUNT(DISTINCT "userAddress") as count + FROM trades + WHERE timestamp >= ${since} + AND "userAddress" != 'Unknown' + `; + + return Number(result[0]?.count ?? 0); + } + + private parseFeeBps(fee: string): number { + const parsed = parseInt(fee, 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : DEFAULT_FEE_BPS; + } + + private amountToUsd(rawAmount: string, decimals: number, price: number): number { + const amount = parseFloat(rawAmount) || 0; + return (amount / Math.pow(10, decimals)) * price; + } + + private async buildPriceMap(): Promise> { + const { data } = await this.pricesService.getPrices(); + const map = new Map(); + for (const item of data) { + map.set(item.symbol, item.price); + } + return map; + } + + private async buildTokenMap(): Promise> { + const tokens = await this.prisma.token.findMany(); + const map = new Map(); + for (const token of tokens) { + map.set(token.address.toLowerCase(), { + symbol: token.symbol, + decimals: token.decimals, + }); + } + return map; + } + + private async getLatestSnapshot() { + return this.prisma.protocolMetricsSnapshot.findFirst({ + orderBy: { computedAt: 'desc' }, + }); + } + + private decimalToNumber(value: { toString(): string }): number { + return parseFloat(value.toString()); + } + + private async getFromRedis(key: string): Promise { + try { + const cached = await this.redis.redisPublisher.get(key); + if (cached) return JSON.parse(cached) as T; + } catch (err) { + this.logger.warn(`Redis read failed for ${key}`); + } + return null; + } + + private async setInRedis(key: string, data: unknown): Promise { + await this.redis.redisPublisher.set( + key, + JSON.stringify(data), + 'EX', + CACHE_TTL_SECONDS, + ); + } +} diff --git a/src/prices/prices.module.ts b/src/prices/prices.module.ts index ded9096..c7876b4 100644 --- a/src/prices/prices.module.ts +++ b/src/prices/prices.module.ts @@ -5,5 +5,6 @@ import { PricesService } from './prices.service'; @Module({ controllers: [PricesController], providers: [PricesService], + exports: [PricesService], }) export class PricesModule {} diff --git a/src/trade/trade.gateway.ts b/src/trade/trade.gateway.ts index 902b7f5..a8d63f9 100644 --- a/src/trade/trade.gateway.ts +++ b/src/trade/trade.gateway.ts @@ -1,7 +1,7 @@ -import { WebSocketGateway, WebSocketServer, OnModuleInit } from '@nestjs/websockets'; +import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets'; import { Server } from 'socket.io'; import { RedisService } from '../common/redis/redis.service'; -import { Logger } from '@nestjs/common'; +import { Logger, OnModuleInit } from '@nestjs/common'; /** * WebSocket Gateway for real-time trade updates.