diff --git a/.eslintignore b/.eslintignore index 72bee91..9ec5f19 100644 --- a/.eslintignore +++ b/.eslintignore @@ -3,3 +3,5 @@ dist coverage migrations src/tests/jest.setup.js +src/utils/demo.ts +__mocks__/redis.js diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..4b7fd04 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,84 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + +jobs: + build-and-test: + runs-on: ubuntu-latest + + services: + postgres: + image: postgres:15 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: remitlend + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + redis: + image: redis:7 + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + env: + DATABASE_URL: postgres://postgres:password@localhost:5432/remitlend + REDIS_URL: redis://localhost:6379 + INTERNAL_API_KEY: test-api-key + JWT_SECRET: test-secret + NODE_ENV: test + STELLAR_RPC_URL: https://soroban-testnet.stellar.org + STELLAR_NETWORK_PASSPHRASE: "Test SDF Network ; September 2015" + LOAN_MANAGER_CONTRACT_ID: CDUMMYLOANMANAGERCONTRACTIDXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + LENDING_POOL_CONTRACT_ID: CDUMMYLENDINGPOOLCONTRACTIDXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + POOL_TOKEN_ADDRESS: CDUMMYPOOLTOKENADDRESSXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + LOAN_MANAGER_ADMIN_SECRET: SDUMMYSECRETXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + FRONTEND_URL: http://localhost:3000 + SCORE_DELTA_REPAY: "10" + SCORE_DELTA_DEFAULT: "-50" + SCORE_DELTA_LATE: "-10" + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "22" + cache: "npm" + + - name: Install dependencies + run: npm ci + + - name: Lint + run: npm run lint + + - name: Format Check + run: npm run format:check + + - name: Typecheck + run: npm run typecheck + + - name: Build + run: npm run build + + - name: Run Database Migrations + run: npm run migrate:up + + - name: Test + run: npm run test diff --git a/Dockerfile b/Dockerfile index 01fd4f2..c963ff6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,7 @@ RUN npm ci COPY tsconfig.json tsconfig.build.json ./ COPY migrations ./migrations COPY src ./src -RUN npx tsc -p tsconfig.build.json +RUN npm run build # Production Stage FROM node:22-alpine AS production @@ -32,4 +32,14 @@ USER appuser EXPOSE 3001 -CMD ["node", "dist/index.js"] +# Healthcheck: probe the /health endpoint on the exposed port. +# `wget` is bundled in busybox on the alpine image, so no extra packages are +# required. `--spider` performs a HEAD-style check without downloading the +# response body and exits non-zero for any HTTP 4xx/5xx (including our 503 +# "degraded" response) or network failure, which Docker correctly reports as +# unhealthy. `--timeout=8` is a defense-in-depth bound so `wget` exits cleanly +# before Docker's `--timeout=10s` SIGKILL kicks in. +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD wget --spider --quiet --tries=1 --timeout=8 http://127.0.0.1:3001/health || exit 1 + +CMD ["node", "dist/index.js"] diff --git a/README.md b/README.md index ac6867b..56ee6f7 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,47 @@ Core tables are created by these migrations (run in filename order): | `1773000000002_loan-history.js` | `loan_history` | | `1773000000003_indexed-events.js` | `indexed_events` | | `1774000000004_scores-add-created-at.js` | adds `created_at` to `scores` (idempotent) | -| `1777000000007_unique-loan-status-events.js` | dedupes and enforces unique status events per loan | +| `1777000000008_unique-loan-status-events.js` | dedupes and enforces unique status events per loan | + +### Migration naming convention + +Each migration file follows the pattern `_.js`. The timestamp prefix is a 13-digit Unix millisecond value (generated by `Date.now().toString()`) that determines apply order — migrations run in strictly ascending timestamp order. + +To avoid collisions: + +- **Always** use `npm run migrate:create ` to generate new migrations — it calls `Date.now()` automatically and guarantees uniqueness. +- If you must create a migration file manually, first check the largest existing timestamp in `migrations/` and ensure your new prefix is strictly larger. +- Never submit a PR where two migration files share the same numeric prefix — the apply order becomes non-deterministic. + +### Renaming an already-applied migration (existing databases) + +If you rename a migration file that has already been applied to a database, `node-pg-migrate` will reject the run with: + +``` +Error: Not run migration is preceding already run migration +``` + +This happens because `pgmigrations` still references the old filename while the filesystem has the new one. Fix it by syncing the tracking table before re-running `npm run migrate:up`: + +```sql +UPDATE pgmigrations +SET name = '1777000000008_unique-loan-status-events' +WHERE name = '1777000000007_unique-loan-status-events'; + +UPDATE pgmigrations +SET name = '1778000000009_transaction-submissions' +WHERE name = '1778000000008_transaction-submissions'; + +UPDATE pgmigrations +SET name = '1786000000017_webhook-max-attempts' +WHERE name = '1786000000016_webhook-max-attempts'; + +UPDATE pgmigrations +SET name = '1788000000019_unified-contract-events' +WHERE name = '1788000000018_unified-contract-events'; +``` + +Afterwards, `npm run migrate:up` will run normally. With Docker Compose from the repo root, the `backend` service runs `migrate:up` before `npm run dev` so the schema is applied automatically when the database is healthy. diff --git a/jest.config.ts b/jest.config.ts index 6159921..d9efd93 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -24,7 +24,6 @@ const config: Config = { }, }, moduleNameMapper: { - "^redis$": "/__mocks__/redis.js", "^(./|../)(.*)\\.js$": "$1$2", }, }; diff --git a/migrations/1777000000007_unique-loan-status-events.js b/migrations/1777000000008_unique-loan-status-events.js similarity index 100% rename from migrations/1777000000007_unique-loan-status-events.js rename to migrations/1777000000008_unique-loan-status-events.js diff --git a/migrations/1778000000008_transaction-submissions.js b/migrations/1778000000009_transaction-submissions.js similarity index 100% rename from migrations/1778000000008_transaction-submissions.js rename to migrations/1778000000009_transaction-submissions.js diff --git a/migrations/1786000000016_webhook-max-attempts.js b/migrations/1786000000017_webhook-max-attempts.js similarity index 100% rename from migrations/1786000000016_webhook-max-attempts.js rename to migrations/1786000000017_webhook-max-attempts.js diff --git a/migrations/1788000000018_unified-contract-events.js b/migrations/1788000000019_unified-contract-events.js similarity index 100% rename from migrations/1788000000018_unified-contract-events.js rename to migrations/1788000000019_unified-contract-events.js diff --git a/package.json b/package.json index 8c8d1cd..f2457b5 100644 --- a/package.json +++ b/package.json @@ -6,8 +6,8 @@ "type": "module", "scripts": { "dev": "nodemon --watch src --ext ts --exec tsx src/index.ts", - "build": "tsc -p tsconfig.json", - "typecheck": "tsc -p tsconfig.build.json --noEmit", + "build": "tsc -p tsconfig.build.json", + "typecheck": "tsc -p tsconfig.json --noEmit", "start": "node dist/index.js", "test": "node --experimental-vm-modules node_modules/jest/bin/jest.js", "lint": "eslint .", diff --git a/src/__tests__/adminApproveLoan.test.ts b/src/__tests__/adminApproveLoan.test.ts new file mode 100644 index 0000000..d57a35f --- /dev/null +++ b/src/__tests__/adminApproveLoan.test.ts @@ -0,0 +1,146 @@ +import request from "supertest"; +import { jest } from "@jest/globals"; +import { Keypair } from "@stellar/stellar-sdk"; +import { generateJwtToken } from "../services/authService.js"; + +type MockQueryResult = { rows: unknown[]; rowCount?: number }; + +const VALID_API_KEY = "test-internal-key"; +const TEST_ADMIN = Keypair.random().publicKey(); +const TEST_BORROWER = Keypair.random().publicKey(); + +process.env.JWT_SECRET = "test-jwt-secret-min-32-chars-long!!"; +process.env.INTERNAL_API_KEY = VALID_API_KEY; +process.env.ADMIN_WALLETS = TEST_ADMIN; + +const mockQuery: jest.MockedFunction< + (text: string, params?: unknown[]) => Promise +> = jest.fn(); + +const mockRelease = jest.fn(); +const mockClient: any = { + query: mockQuery, + release: mockRelease, +}; + +jest.unstable_mockModule("../db/connection.js", () => ({ + default: { query: mockQuery }, + query: mockQuery, + getClient: jest + .fn<() => Promise>() + .mockResolvedValue(mockClient), + closePool: jest.fn(), + withTransaction: jest.fn(), +})); + +jest.unstable_mockModule("../services/cacheService.js", () => ({ + cacheService: { + get: jest.fn<() => Promise>().mockResolvedValue(null), + set: jest.fn<() => Promise>().mockResolvedValue(undefined), + delete: jest.fn<() => Promise>().mockResolvedValue(undefined), + ping: jest.fn<() => Promise>().mockResolvedValue("ok"), + }, +})); + +const mockBuildApproveLoanTx = + jest.fn< + ( + adminPublicKey: string, + loanId: number, + ) => Promise<{ unsignedTxXdr: string; networkPassphrase: string }> + >(); +const mockSubmitSignedTx = + jest.fn< + ( + signedTxXdr: string, + ) => Promise<{ txHash: string; status: string; resultXdr?: string }> + >(); + +jest.unstable_mockModule("../services/sorobanService.js", () => ({ + sorobanService: { + buildApproveLoanTx: mockBuildApproveLoanTx, + submitSignedTx: mockSubmitSignedTx, + }, +})); + +await import("../db/connection.js"); +await import("../services/sorobanService.js"); +const { default: app } = await import("../app.js"); + +const mockedQuery = mockQuery; + +const bearer = (publicKey: string) => ({ + Authorization: `Bearer ${generateJwtToken(publicKey)}`, +}); + +beforeEach(() => { + mockedQuery.mockReset(); + jest.clearAllMocks(); +}); + +afterAll(() => { + delete process.env.INTERNAL_API_KEY; + delete process.env.JWT_SECRET; + delete process.env.ADMIN_WALLETS; +}); + +// --------------------------------------------------------------------------- +// POST /admin/approve-loan +// --------------------------------------------------------------------------- +describe("POST /admin/approve-loan", () => { + it("should build an approve_loan transaction for admin", async () => { + mockBuildApproveLoanTx.mockResolvedValueOnce({ + unsignedTxXdr: "unsigned-approve-xdr", + networkPassphrase: "Test SDF Network ; September 2015", + }); + + const response = await request(app) + .post("/api/admin/approve-loan") + .set(bearer(TEST_ADMIN)) + .send({ loanId: 1 }); + + expect(response.status).toBe(200); + expect(response.body.success).toBe(true); + expect(response.body.loanId).toBe(1); + expect(response.body.unsignedTxXdr).toBe("unsigned-approve-xdr"); + expect(response.body.networkPassphrase).toBe( + "Test SDF Network ; September 2015", + ); + expect(mockBuildApproveLoanTx).toHaveBeenCalledWith(TEST_ADMIN, 1); + }); + + it("should reject non-admin user", async () => { + const response = await request(app) + .post("/api/admin/approve-loan") + .set(bearer(TEST_BORROWER)) + .send({ loanId: 1 }); + + expect(response.status).toBe(403); + }); + + it("should reject missing loanId", async () => { + const response = await request(app) + .post("/api/admin/approve-loan") + .set(bearer(TEST_ADMIN)) + .send({}); + + expect(response.status).toBe(400); + }); + + it("should reject invalid loanId (non-positive integer)", async () => { + const response = await request(app) + .post("/api/admin/approve-loan") + .set(bearer(TEST_ADMIN)) + .send({ loanId: -1 }); + + expect(response.status).toBe(400); + }); + + it("should reject missing authentication", async () => { + const response = await request(app) + .post("/api/admin/approve-loan") + .send({ loanId: 1 }); + + expect(response.status).toBe(401); + }); +}); diff --git a/src/__tests__/eventIndexer.test.ts b/src/__tests__/eventIndexer.test.ts index 95f330c..60f557e 100644 --- a/src/__tests__/eventIndexer.test.ts +++ b/src/__tests__/eventIndexer.test.ts @@ -28,6 +28,7 @@ const mockLogger = { info: jest.fn(), warn: jest.fn(), error: jest.fn(), + debug: jest.fn(), }; const supportedWebhookEventTypes = [ "LoanRequested", @@ -62,9 +63,26 @@ const supportedWebhookEventTypes = [ "PoolUnpaused", ] as const; +// Default advisory-lock client returned by getClient(). +// Returns acquired:true for pg_try_advisory_lock so that pollOnce() proceeds +// normally in tests that call it directly. Tests that need a different +// behaviour (e.g. lock-not-acquired) should override getClient on the mock. +const mockLockClient = { + query: jest.fn(async (sql: string) => { + if (sql.includes("pg_try_advisory_lock")) { + return { rows: [{ acquired: true }], rowCount: 1 }; + } + // pg_advisory_unlock + return { rows: [], rowCount: 0 }; + }), + release: jest.fn(), +}; + jest.unstable_mockModule("../db/connection.js", () => ({ query: mockQuery, - getClient: jest.fn(), + getClient: jest + .fn<() => Promise>() + .mockResolvedValue(mockLockClient), closePool: jest.fn(), withTransaction: jest.fn( async ( diff --git a/src/__tests__/health.test.ts b/src/__tests__/health.test.ts index d9c0972..c28b940 100644 --- a/src/__tests__/health.test.ts +++ b/src/__tests__/health.test.ts @@ -66,3 +66,112 @@ describe("GET /health", () => { expect(typeof response.body.timestamp).toBe("number"); }); }); + +describe("GET /health resilience", () => { + // Allow generous time for module re-imports + supertest boot on slow CI. + jest.setTimeout(15000); + + const ORIGINAL_TIMEOUT_ENV = process.env.HEALTH_CHECK_TIMEOUT_MS; + + beforeEach(() => { + jest.resetModules(); + }); + + afterEach(() => { + // Always restore the env so a failed assertion can't leak state into + // other tests in the suite. + if (ORIGINAL_TIMEOUT_ENV === undefined) { + delete process.env.HEALTH_CHECK_TIMEOUT_MS; + } else { + process.env.HEALTH_CHECK_TIMEOUT_MS = ORIGINAL_TIMEOUT_ENV; + } + }); + + it("returns within a bounded timeout when a dependency hangs", async () => { + // Force a tight per-check timeout so the assertion window is short + // and the test does not depend on the 2-second default. + process.env.HEALTH_CHECK_TIMEOUT_MS = "200"; + + jest.unstable_mockModule("../db/connection.js", () => ({ + default: { + // Simulate a database that never settles — mirrors a stuck TCP socket. + query: jest.fn<() => Promise>( + () => new Promise(() => {}) as Promise, + ), + }, + query: jest.fn<() => Promise>( + () => new Promise(() => {}) as Promise, + ), + getClient: jest.fn(), + withTransaction: jest.fn(), + })); + + jest.unstable_mockModule("../services/cacheService.js", () => ({ + cacheService: { + ping: jest.fn<() => Promise>().mockResolvedValue("ok"), + }, + })); + + jest.unstable_mockModule("../services/sorobanService.js", () => ({ + sorobanService: { + ping: jest.fn<() => Promise>().mockResolvedValue("ok"), + }, + })); + + const { default: appWithStuckDb } = await import("../app.js"); + + const start = Date.now(); + const response = await request(appWithStuckDb).get("/health"); + const elapsed = Date.now() - start; + + // The per-check timeout is 200ms; assert the endpoint failed fast + // with the hung dep marked as error, well under jest's 5s window. + expect(elapsed).toBeLessThan(1500); + expect(response.status).toBe(503); + expect(response.body.checks.database).toBe("error"); + expect(response.body.status).toBe("down"); + }); + + it("honors a tiny HEALTH_CHECK_TIMEOUT_MS override", async () => { + process.env.HEALTH_CHECK_TIMEOUT_MS = "1"; + + jest.unstable_mockModule("../db/connection.js", () => ({ + default: { + query: jest.fn<() => Promise>().mockResolvedValue({ + rows: [], + rowCount: 0, + }), + }, + query: jest.fn<() => Promise>().mockResolvedValue({ + rows: [], + rowCount: 0, + }), + getClient: jest.fn(), + withTransaction: jest.fn(), + })); + + jest.unstable_mockModule("../services/cacheService.js", () => ({ + cacheService: { + ping: jest.fn<() => Promise>().mockResolvedValue("ok"), + }, + })); + + // A Soroban ping that hangs forever — the 1ms override must kick in. + jest.unstable_mockModule("../services/sorobanService.js", () => ({ + sorobanService: { + ping: jest.fn<() => Promise>( + () => new Promise(() => {}) as Promise, + ), + }, + })); + + const { default: appWithCustomTimeout } = await import("../app.js"); + + const start = Date.now(); + const response = await request(appWithCustomTimeout).get("/health"); + const elapsed = Date.now() - start; + + expect(elapsed).toBeLessThan(1000); + expect(response.body.checks.soroban_rpc).toBe("error"); + }); +}); diff --git a/src/__tests__/loanDispute.test.ts b/src/__tests__/loanDispute.test.ts index 69de1ff..324ca10 100644 --- a/src/__tests__/loanDispute.test.ts +++ b/src/__tests__/loanDispute.test.ts @@ -10,6 +10,7 @@ import { jest } from "@jest/globals"; const mockQuery: any = jest.fn(); jest.unstable_mockModule("../db/connection.js", () => ({ query: mockQuery, + getClient: jest.fn(), default: { query: mockQuery, connect: jest.fn(), end: jest.fn() }, withTransaction: jest.fn(), })); diff --git a/src/__tests__/remittanceService.test.ts b/src/__tests__/remittanceService.test.ts index b16f787..0a41b8a 100644 --- a/src/__tests__/remittanceService.test.ts +++ b/src/__tests__/remittanceService.test.ts @@ -5,6 +5,7 @@ const mockWithTransaction = jest.fn(); jest.unstable_mockModule("../db/connection.js", () => ({ query: jest.fn(), + getClient: jest.fn(), default: { query: jest.fn(), connect: jest.fn(), end: jest.fn() }, })); diff --git a/src/__tests__/sorobanService.test.ts b/src/__tests__/sorobanService.test.ts new file mode 100644 index 0000000..dbbd63a --- /dev/null +++ b/src/__tests__/sorobanService.test.ts @@ -0,0 +1,587 @@ +import { + jest, + describe, + it, + expect, + beforeEach, + afterEach, + beforeAll, +} from "@jest/globals"; +import { + Keypair, + Account, + nativeToScVal, + TransactionBuilder, + Operation, +} from "@stellar/stellar-sdk"; + +// Mock the logger to prevent cluttering stdout and to check log calls if needed +const mockLogger = { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), +}; + +jest.unstable_mockModule("../utils/logger.js", () => ({ + default: mockLogger, + logger: mockLogger, +})); + +// Mock the stellar config module +const mockCreateSorobanRpcServer = jest.fn(); +const mockGetStellarNetworkPassphrase = jest.fn(); +const mockGetStellarRpcUrl = jest.fn(); + +jest.unstable_mockModule("../config/stellar.js", () => ({ + createSorobanRpcServer: mockCreateSorobanRpcServer, + getStellarNetworkPassphrase: mockGetStellarNetworkPassphrase, + getStellarRpcUrl: mockGetStellarRpcUrl, +})); + +// Now dynamically import the service under test +const { sorobanService } = await import("../services/sorobanService.js"); + +describe("SorobanService", () => { + const originalEnv = { ...process.env }; + + const dummyUser = "GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV"; + const dummyContract = + "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABSC4"; + const dummySecret = + "SAACQXICHBTMMZQXM5ISR4VG6ADO3M3VB5PEJ3PP7CMLSSPO4UZSYRZH"; + + let dummyTxXdr: string; + + // Mocked RPC Server implementation + const mockRpcServer = { + getHealth: jest.fn(), + getLatestLedger: jest.fn(), + getAccount: jest.fn(), + prepareTransaction: jest.fn(), + sendTransaction: jest.fn(), + pollTransaction: jest.fn(), + simulateTransaction: jest.fn(), + }; + + beforeAll(() => { + // Generate a valid dummy transaction envelope XDR for fromXDR parsing + const kp = Keypair.fromSecret(dummySecret); + const acc = new Account(kp.publicKey(), "100"); + const tx = new TransactionBuilder(acc, { + fee: "100", + networkPassphrase: "Test SDF Network ; September 2015", + }) + .addOperation( + Operation.bumpSequence({ + bumpTo: "101", + }), + ) + .setTimeout(30) + .build(); + dummyTxXdr = tx.toXDR(); + }); + + beforeEach(() => { + jest.clearAllMocks(); + + mockRpcServer.getHealth.mockReset(); + mockRpcServer.getLatestLedger.mockReset(); + mockRpcServer.getAccount.mockReset(); + mockRpcServer.prepareTransaction.mockReset(); + mockRpcServer.sendTransaction.mockReset(); + mockRpcServer.pollTransaction.mockReset(); + mockRpcServer.simulateTransaction.mockReset(); + + mockCreateSorobanRpcServer.mockReturnValue(mockRpcServer); + mockGetStellarNetworkPassphrase.mockReturnValue( + "Test SDF Network ; September 2015", + ); + mockGetStellarRpcUrl.mockReturnValue("https://soroban-testnet.stellar.org"); + + // Setup standard test environment variables + process.env.LOAN_MANAGER_CONTRACT_ID = dummyContract; + process.env.LENDING_POOL_CONTRACT_ID = dummyContract; + process.env.POOL_TOKEN_ADDRESS = dummyContract; + process.env.REMITTANCE_NFT_CONTRACT_ID = dummyContract; + process.env.SCORE_RECONCILIATION_SOURCE_SECRET = dummySecret; + process.env.LOAN_MANAGER_ADMIN_SECRET = dummySecret; + process.env.DEFAULT_CREDIT_SCORE = "500"; + process.env.SCORE_DELTA_REPAY = "15"; + process.env.SCORE_DELTA_DEFAULT = "50"; + process.env.SCORE_DELTA_LATE = "5"; + }); + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + describe("validateConfig", () => { + it("should succeed when all environment variables are valid and RPC is reachable", async () => { + mockRpcServer.getHealth.mockResolvedValue({ status: "healthy" }); + + await expect(sorobanService.validateConfig()).resolves.not.toThrow(); + expect(mockRpcServer.getHealth).toHaveBeenCalled(); + }); + + it("should throw AppError if LOAN_MANAGER_CONTRACT_ID is missing", async () => { + delete process.env.LOAN_MANAGER_CONTRACT_ID; + + await expect(sorobanService.validateConfig()).rejects.toThrow( + "LOAN_MANAGER_CONTRACT_ID is not configured", + ); + }); + + it("should throw AppError if LOAN_MANAGER_CONTRACT_ID is invalid", async () => { + process.env.LOAN_MANAGER_CONTRACT_ID = "invalid-contract"; + + await expect(sorobanService.validateConfig()).rejects.toThrow( + 'LOAN_MANAGER_CONTRACT_ID is not a valid Stellar contract address: "invalid-contract"', + ); + }); + + it("should throw AppError if getStellarRpcUrl throws", async () => { + mockGetStellarRpcUrl.mockImplementation(() => { + throw new Error("Missing stellar config"); + }); + + await expect(sorobanService.validateConfig()).rejects.toThrow( + "Missing stellar config", + ); + }); + + it("should throw AppError if RPC getHealth fails", async () => { + mockRpcServer.getHealth.mockRejectedValue( + new Error("Connection refused"), + ); + + await expect(sorobanService.validateConfig()).rejects.toThrow( + "Stellar RPC is unreachable at https://soroban-testnet.stellar.org: Connection refused", + ); + }); + }); + + describe("healthCheck and ping", () => { + it("ping should return 'ok' when RPC is reachable", async () => { + mockRpcServer.getLatestLedger.mockResolvedValue({ sequence: 100 }); + + await expect(sorobanService.ping()).resolves.toBe("ok"); + }); + + it("ping should return 'error' when RPC is unreachable", async () => { + mockRpcServer.getLatestLedger.mockRejectedValue(new Error("Timeout")); + + await expect(sorobanService.ping()).resolves.toBe("error"); + }); + + it("healthCheck should return connected true and the sequence number", async () => { + mockRpcServer.getLatestLedger.mockResolvedValue({ sequence: 999 }); + + const res = await sorobanService.healthCheck(); + expect(res).toEqual({ connected: true, latestLedger: 999 }); + }); + + it("healthCheck should return connected false and error message on failure", async () => { + mockRpcServer.getLatestLedger.mockRejectedValue(new Error("RPC Timeout")); + + const res = await sorobanService.healthCheck(); + expect(res).toEqual({ connected: false, error: "RPC Timeout" }); + }); + }); + + describe("build*Tx methods", () => { + beforeEach(() => { + mockRpcServer.getAccount.mockResolvedValue(new Account(dummyUser, "100")); + mockRpcServer.prepareTransaction.mockResolvedValue({ + toXDR: () => "mocked-prepared-tx-xdr", + }); + }); + + it("buildRequestLoanTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildRequestLoanTx(dummyUser, 1000); + + expect(res).toEqual({ + unsignedTxXdr: "mocked-prepared-tx-xdr", + networkPassphrase: "Test SDF Network ; September 2015", + }); + expect(mockRpcServer.getAccount).toHaveBeenCalledWith(dummyUser); + expect(mockRpcServer.prepareTransaction).toHaveBeenCalled(); + }); + + it("buildRepayTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildRepayTx(dummyUser, 42, 500); + + expect(res).toEqual({ + unsignedTxXdr: "mocked-prepared-tx-xdr", + networkPassphrase: "Test SDF Network ; September 2015", + }); + expect(mockRpcServer.getAccount).toHaveBeenCalledWith(dummyUser); + expect(mockRpcServer.prepareTransaction).toHaveBeenCalled(); + }); + + it("buildDepositTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildDepositTx( + dummyUser, + dummyContract, + 750, + ); + + expect(res.unsignedTxXdr).toBe("mocked-prepared-tx-xdr"); + }); + + it("buildWithdrawTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildWithdrawTx( + dummyUser, + dummyContract, + 100, + ); + + expect(res.unsignedTxXdr).toBe("mocked-prepared-tx-xdr"); + }); + + it("buildApproveLoanTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildApproveLoanTx(dummyUser, 12); + + expect(res.unsignedTxXdr).toBe("mocked-prepared-tx-xdr"); + }); + }); + + describe("submitSignedTx", () => { + it("should submit transaction and poll successfully", async () => { + mockRpcServer.sendTransaction.mockResolvedValue({ + hash: "tx-hash-12345", + status: "PENDING", + }); + mockRpcServer.pollTransaction.mockResolvedValue({ + status: "SUCCESS", + resultXdr: { + toXDR: () => "result-xdr-base64", + }, + }); + + const res = await sorobanService.submitSignedTx(dummyTxXdr); + + expect(res).toEqual({ + txHash: "tx-hash-12345", + status: "SUCCESS", + resultXdr: "result-xdr-base64", + }); + expect(mockRpcServer.sendTransaction).toHaveBeenCalled(); + expect(mockRpcServer.pollTransaction).toHaveBeenCalledWith( + "tx-hash-12345", + expect.any(Object), + ); + }); + + it("should throw AppError if transaction submission does not return a hash", async () => { + mockRpcServer.sendTransaction.mockResolvedValue({ + status: "ERROR", + }); + + await expect(sorobanService.submitSignedTx(dummyTxXdr)).rejects.toThrow( + "Transaction submission returned no hash", + ); + }); + + it("should return status without resultXdr if poll transaction does not succeed", async () => { + mockRpcServer.sendTransaction.mockResolvedValue({ + hash: "tx-hash-12345", + status: "PENDING", + }); + mockRpcServer.pollTransaction.mockResolvedValue({ + status: "FAILED", + }); + + const res = await sorobanService.submitSignedTx(dummyTxXdr); + + expect(res).toEqual({ + txHash: "tx-hash-12345", + status: "FAILED", + }); + }); + }); + + describe("getOnChainCreditScore (Score Fallback Logic)", () => { + const adminPublicKey = + "GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV"; + + beforeEach(() => { + mockRpcServer.getAccount.mockResolvedValue( + new Account(adminPublicKey, "100"), + ); + }); + + it("should return the score on successful simulation", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal(620), + }, + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(620); + expect(mockRpcServer.simulateTransaction).toHaveBeenCalledTimes(1); + }); + + it("should fallback to default score when simulation returns a missing-score error", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "HostError: Error(Value, NotFound)", + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); // Default score configured in process.env + expect(mockLogger.warn).toHaveBeenCalledWith( + "Falling back to default credit score", + expect.objectContaining({ + reason: "HostError: Error(Value, NotFound)", + }), + ); + }); + + it("should retry on transient simulation error and succeed if second attempt succeeds", async () => { + mockRpcServer.simulateTransaction + .mockResolvedValueOnce({ + error: "RPC Timeout calling simulateTransaction", + }) + .mockResolvedValueOnce({ + result: { + retval: nativeToScVal(700), + }, + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(700); + expect(mockRpcServer.simulateTransaction).toHaveBeenCalledTimes(2); + expect(mockLogger.warn).toHaveBeenCalledWith( + "Retrying get_score simulation after transient RPC failure", + expect.objectContaining({ + attempt: 1, + error: "RPC Timeout calling simulateTransaction", + }), + ); + }); + + it("should retry and fallback to default score if all attempts return transient errors", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "503 Service Unavailable", + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); // Fallback + expect(mockRpcServer.simulateTransaction).toHaveBeenCalledTimes(2); // Retries once (attempts: 1, 2) + expect(mockLogger.warn).toHaveBeenCalledWith( + "Falling back to default credit score", + expect.objectContaining({ + reason: "503 Service Unavailable", + }), + ); + }); + + it("should throw AppError if simulation returns a hard error", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "Contract panic: index out of bounds", + }); + + await expect( + sorobanService.getOnChainCreditScore(dummyUser), + ).rejects.toThrow( + "Failed to simulate get_score for GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV: Contract panic: index out of bounds", + ); + expect(mockRpcServer.simulateTransaction).toHaveBeenCalledTimes(1); // No retry for hard errors + }); + + it("should fallback to default score if simulation returns empty object or no result/error", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({} as any); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); + }); + + it("should fallback to default score if simulation retval is missing", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: {}, + } as any); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); + }); + + it("should fallback to default score if simulation retval is not a finite number", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal("not-a-number"), + }, + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); + }); + }); + + describe("getOnChainScoreHistory", () => { + const adminPublicKey = + "GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV"; + + beforeEach(() => { + mockRpcServer.getAccount.mockResolvedValue( + new Account(adminPublicKey, "100"), + ); + }); + + it("should return history sorted ascending by timestamp", async () => { + const mockHistory = [ + { ledger: 200, old_score: 550, new_score: 565, reason: "Repay" }, + { ledger: 150, old_score: 500, new_score: 550, reason: "Repay" }, + ]; + + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal(mockHistory), + }, + }); + + const history = await sorobanService.getOnChainScoreHistory(dummyUser); + + expect(history).toEqual([ + { score: 550, timestamp: 150, reason: "Repay" }, + { score: 565, timestamp: 200, reason: "Repay" }, + ]); + }); + + it("should filter out invalid history entries", async () => { + const mockHistory = [ + { ledger: 200, old_score: 550, new_score: 565, reason: "Repay" }, + { ledger: "invalid", old_score: 500, new_score: 550, reason: "Repay" }, // filtered out + { ledger: 150, old_score: 500, new_score: 550, reason: "" }, // filtered out (empty reason) + ]; + + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal(mockHistory), + }, + }); + + const history = await sorobanService.getOnChainScoreHistory(dummyUser); + + expect(history).toEqual([ + { score: 565, timestamp: 200, reason: "Repay" }, + ]); + }); + + it("should return empty array if simulation retval is missing", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: {}, + } as any); + + const history = await sorobanService.getOnChainScoreHistory(dummyUser); + + expect(history).toEqual([]); + }); + + it("should throw AppError if simulation returns error", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "Contract execution failed", + }); + + await expect( + sorobanService.getOnChainScoreHistory(dummyUser), + ).rejects.toThrow( + "Failed to simulate get_score_history for GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV: Contract execution failed", + ); + }); + }); + + describe("getPoolBalance", () => { + const adminPublicKey = + "GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV"; + + beforeEach(() => { + mockRpcServer.getAccount.mockResolvedValue( + new Account(adminPublicKey, "100"), + ); + }); + + it("should return pool balance on success", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal(1000000000000n), + }, + }); + + const balance = await sorobanService.getPoolBalance(); + + expect(balance).toBe(1000000000000); + }); + + it("should throw AppError if simulation fails", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "Simulation failed", + }); + + await expect(sorobanService.getPoolBalance()).rejects.toThrow( + "Failed to simulate pool balance: Simulation failed", + ); + }); + + it("should throw AppError if retval is missing", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: {}, + } as any); + + await expect(sorobanService.getPoolBalance()).rejects.toThrow( + "No balance returned by pool token", + ); + }); + + it("should throw AppError if balance is not a finite number", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal("not-a-number"), + }, + }); + + await expect(sorobanService.getPoolBalance()).rejects.toThrow( + "Invalid on-chain balance returned", + ); + }); + }); + + describe("getScoreConfig and validateScoreConfig", () => { + it("should successfully return score configs", () => { + const config = sorobanService.getScoreConfig(); + + expect(config).toEqual({ + repaymentDelta: 15, + defaultPenalty: 50, + latePenalty: 5, + }); + }); + + it("should succeed validation with default values", () => { + expect(() => sorobanService.validateScoreConfig()).not.toThrow(); + }); + + it("should throw AppError if a config is not a valid integer", () => { + process.env.SCORE_DELTA_REPAY = "abc"; + + expect(() => sorobanService.validateScoreConfig()).toThrow( + 'SCORE_DELTA_REPAY must be a valid integer: "abc"', + ); + }); + + it("should throw AppError if a positive-required config is negative or zero", () => { + process.env.SCORE_DELTA_DEFAULT = "0"; + + expect(() => sorobanService.validateScoreConfig()).toThrow( + "SCORE_DELTA_DEFAULT must be a positive integer: 0", + ); + }); + }); +}); diff --git a/src/app.ts b/src/app.ts index 5b33a56..bd07494 100644 --- a/src/app.ts +++ b/src/app.ts @@ -119,30 +119,72 @@ app.get("/", (req: Request, res: Response) => { res.send("RemitLend Backend is running"); }); +// Per-dependency healthcheck timeout (ms). Keeps the /health endpoint from +// hanging when a downstream probe stalls, which would otherwise block the +// Docker HEALTHCHECK probe from ever completing. +const HEALTH_CHECK_TIMEOUT_MS = Number.parseInt( + process.env.HEALTH_CHECK_TIMEOUT_MS ?? "2000", + 10, +); + +/** + * Race a promise against a timeout. If the promise does not settle within + * `ms`, resolve as `"error"` so the caller can record a degraded check + * without blocking the response. + */ +async function withTimeout( + promise: Promise, + ms: number, + fallback: T, +): Promise { + let timer: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + promise, + new Promise((resolve) => { + timer = setTimeout(() => resolve(fallback), ms); + }), + ]); + } finally { + if (timer) clearTimeout(timer); + } +} + app.get( "/health", asyncHandler(async (_req: Request, res: Response) => { - const [databaseStatus, redisStatus, sorobanStatus] = - await Promise.allSettled([ + const timeout = Number.isFinite(HEALTH_CHECK_TIMEOUT_MS) + ? HEALTH_CHECK_TIMEOUT_MS + : 2000; + + // Each individual check is wrapped in a per-check timeout (see + // `withTimeout`) so that a hung dependency degrades to "error" + // rather than blocking the whole endpoint. Because the wrappers + // guarantee every promise resolves, `Promise.all` is safe here — + // switching back to `Promise.allSettled` would not preserve this + // fast-fail behaviour. + const [databaseResult, redisResult, sorobanResult] = await Promise.all([ + withTimeout( pool .query("SELECT 1") .then(() => "ok" as const) .catch(() => "error" as const), - cacheService.ping(), - sorobanService.ping(), - ]); + timeout, + "error" as const, + ), + withTimeout(cacheService.ping(), timeout, "error" as const), + withTimeout(sorobanService.ping(), timeout, "error" as const), + ]); const dbChecks = { - database: - databaseStatus.status === "fulfilled" ? databaseStatus.value : "error", - redis: redisStatus.status === "fulfilled" ? redisStatus.value : "error", + database: databaseResult, + redis: redisResult, }; const checks = { api: "ok" as const, ...dbChecks, - soroban_rpc: - sorobanStatus.status === "fulfilled" ? sorobanStatus.value : "error", + soroban_rpc: sorobanResult, }; const coreOk = Object.values(dbChecks).every((c) => c === "ok"); diff --git a/src/config/stellar.ts b/src/config/stellar.ts index 74e8081..f256c9b 100644 --- a/src/config/stellar.ts +++ b/src/config/stellar.ts @@ -112,6 +112,10 @@ export function getStellarNetworkPassphrase(): string { return getStellarConfig().networkPassphrase; } +/** + * Creates and returns a new Soroban RPC server instance. + * Automatically enables HTTP (in addition to HTTPS) if the RPC URL starts with http://. + */ export function createSorobanRpcServer(): rpc.Server { const rpcUrl = getStellarRpcUrl(); const allowHttp = rpcUrl.startsWith("http://"); diff --git a/src/controllers/indexerController.ts b/src/controllers/indexerController.ts index 8650a2d..dfd41ae 100644 --- a/src/controllers/indexerController.ts +++ b/src/controllers/indexerController.ts @@ -19,6 +19,8 @@ import { import { parseCappedLimit } from "../utils/queryHelpers.js"; import logger from "../utils/logger.js"; import { getStellarRpcUrl } from "../config/stellar.js"; +import { sorobanService } from "../services/sorobanService.js"; +import { asyncHandler } from "../utils/asyncHandler.js"; const buildEventFilters = ( req: Request, @@ -838,3 +840,38 @@ export const reprocessQuarantinedEvents = async ( }); } }; + +/** + * POST /admin/approve-loan + * Builds an unsigned Soroban `approve_loan(loanId)` transaction + * for the admin to sign with their wallet. + */ +export const approveLoan = asyncHandler(async (req: Request, res: Response) => { + const { loanId } = req.body as { loanId: number }; + const adminPublicKey = req.user?.publicKey; + + if (!adminPublicKey) { + res.status(401).json({ + success: false, + message: "Admin authentication required", + }); + return; + } + + const result = await sorobanService.buildApproveLoanTx( + adminPublicKey, + loanId, + ); + + logger.info("Admin approve_loan transaction built", { + admin: adminPublicKey, + loanId, + }); + + res.json({ + success: true, + loanId, + unsignedTxXdr: result.unsignedTxXdr, + networkPassphrase: result.networkPassphrase, + }); +}); diff --git a/src/index.ts b/src/index.ts index f99f9e4..896ccfa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,9 +17,9 @@ import { stopDefaultCheckerScheduler, } from "./services/defaultChecker.js"; import { - startWebhookRetryScheduler, - stopWebhookRetryScheduler, -} from "./services/webhookRetryScheduler.js"; + startWebhookRetryProcessor, + stopWebhookRetryProcessor, +} from "./services/webhookRetryProcessor.js"; import { eventStreamService } from "./services/eventStreamService.js"; import { startNotificationCleanupScheduler, @@ -61,8 +61,8 @@ const server = app.listen(port, () => { // Start periodic on-chain default checks (if configured) startDefaultCheckerScheduler(); - // Start webhook retry scheduler - startWebhookRetryScheduler(); + // Start webhook retry processor + startWebhookRetryProcessor(); // Start scheduled score reconciliation against on-chain state startScoreReconciliationScheduler(); @@ -87,7 +87,7 @@ const shutdown = async (signal: "SIGTERM" | "SIGINT") => { try { await stopIndexer(); stopDefaultCheckerScheduler(); - stopWebhookRetryScheduler(); + stopWebhookRetryProcessor(); stopScoreReconciliationScheduler(); stopNotificationCleanupScheduler(); diff --git a/src/routes/adminRoutes.ts b/src/routes/adminRoutes.ts index 3bb8ef8..6475dce 100644 --- a/src/routes/adminRoutes.ts +++ b/src/routes/adminRoutes.ts @@ -15,6 +15,7 @@ import { listWebhookSubscriptions, reprocessQuarantinedEvents, reindexLedgerRange, + approveLoan, } from "../controllers/indexerController.js"; import { listLoanDisputes, @@ -22,6 +23,7 @@ import { getLoanDispute, rejectLoanDispute, } from "../controllers/adminDisputeController.js"; +import { approveLoanBodySchema } from "../schemas/loanSchemas.js"; import { query } from "../db/connection.js"; const router = Router(); @@ -108,6 +110,60 @@ router.post( rejectLoanDispute, ); +/** + * @swagger + * /admin/approve-loan: + * post: + * summary: Build an unsigned approve_loan transaction for admin signing + * description: > + * Builds a Soroban `approve_loan(loanId)` transaction that the admin + * must sign with their wallet and submit via the generic submit endpoint. + * tags: [Admin] + * security: + * - BearerAuth: [] + * requestBody: + * required: true + * content: + * application/json: + * schema: + * type: object + * required: [loanId] + * properties: + * loanId: + * type: integer + * minimum: 1 + * description: ID of the loan to approve + * responses: + * 200: + * description: Unsigned transaction built successfully + * content: + * application/json: + * schema: + * type: object + * properties: + * success: + * type: boolean + * loanId: + * type: integer + * unsignedTxXdr: + * type: string + * networkPassphrase: + * type: string + * 400: + * description: Validation error + * 401: + * description: Admin authentication required + */ +router.post( + "/approve-loan", + requireJwtAuth, + requireRoles("admin"), + strictRateLimiter, + auditLog, + validateBody(approveLoanBodySchema), + approveLoan, +); + const checkDefaultsBodySchema = z.object({ loanIds: z .array(z.number().int().positive()) diff --git a/src/schemas/loanSchemas.ts b/src/schemas/loanSchemas.ts index 7ae03c3..91e3e20 100644 --- a/src/schemas/loanSchemas.ts +++ b/src/schemas/loanSchemas.ts @@ -37,3 +37,7 @@ export const submitTxSchema = z.object({ .min(1, "signedTxXdr is required") .regex(base64Regex, "Must be a valid base64 string"), }); + +export const approveLoanBodySchema = z.object({ + loanId: z.number().int().positive("Loan ID must be a positive integer"), +}); diff --git a/src/services/__tests__/eventIndexer.test.ts b/src/services/__tests__/eventIndexer.test.ts index 47fbb53..0bbbfab 100644 --- a/src/services/__tests__/eventIndexer.test.ts +++ b/src/services/__tests__/eventIndexer.test.ts @@ -25,6 +25,8 @@ import { // eslint-disable-next-line @typescript-eslint/no-explicit-any let mockWithTransaction: jest.Mock; // eslint-disable-next-line @typescript-eslint/no-explicit-any +let mockGetClient: jest.Mock; +// eslint-disable-next-line @typescript-eslint/no-explicit-any let mockUpdateUserScoresBulk: jest.Mock; // eslint-disable-next-line @typescript-eslint/no-explicit-any let mockSorobanGetScoreConfig: jest.Mock; @@ -90,6 +92,7 @@ let EventIndexer: any; beforeAll(async () => { mockWithTransaction = jest.fn(); // eslint-disable-line @typescript-eslint/no-explicit-any + mockGetClient = jest.fn(); // eslint-disable-line @typescript-eslint/no-explicit-any mockUpdateUserScoresBulk = jest.fn().mockResolvedValue(undefined); // eslint-disable-line @typescript-eslint/no-explicit-any mockSorobanGetScoreConfig = jest .fn() // eslint-disable-line @typescript-eslint/no-explicit-any @@ -101,7 +104,7 @@ beforeAll(async () => { jest.unstable_mockModule("../../db/connection.js", () => ({ // eslint-disable-next-line @typescript-eslint/no-explicit-any query: jest.fn().mockResolvedValue({ rows: [], rowCount: 0 }), - getClient: jest.fn(), + getClient: mockGetClient, withTransaction: mockWithTransaction, TRANSIENT_ERROR_CODES: new Set(["08006", "57P01", "40001"]), })); @@ -362,3 +365,181 @@ describe("EventIndexer – transaction atomicity via ingestRawEvents", () => { expect(mockWithTransaction).toHaveBeenCalledTimes(1); }); }); + +// -------------------------------------------------------------------------- +// Advisory lock — concurrent poll cycle deduplication +// -------------------------------------------------------------------------- + +describe("EventIndexer — advisory lock prevents concurrent poll cycles", () => { + afterEach(async () => { + // Reset getClient after each test in this block so the default + // (no implementation) doesn't bleed into the atomicity suite above. + mockGetClient.mockReset(); + }); + + it("pollOnce skips all processing when another instance holds the advisory lock", async () => { + // Simulate a lock client that reports the lock as already taken. + const mockLockClient = { + query: jest.fn().mockImplementation((sql: string) => { + if (sql.includes("pg_try_advisory_lock")) { + return Promise.resolve({ rows: [{ acquired: false }] }); + } + return Promise.resolve({ rows: [], rowCount: 0 }); + }), + release: jest.fn(), + }; + mockGetClient.mockResolvedValue(mockLockClient); + + const indexer = makeIndexer(); + await indexer.start(); + await indexer.stop(); // clears the scheduled next-poll timeout + + // The lock attempt was made + expect(mockLockClient.query).toHaveBeenCalledWith( + expect.stringContaining("pg_try_advisory_lock"), + expect.any(Array), + ); + // pg_advisory_unlock must NOT be called — we never held the lock + const unlockCalls = ( + (mockLockClient.query as jest.Mock).mock.calls as [string][] + ).filter(([sql]) => sql.includes("pg_advisory_unlock")); + expect(unlockCalls).toHaveLength(0); + // Lock client always released back to the pool + expect(mockLockClient.release).toHaveBeenCalledTimes(1); + // No event processing occurred + expect(mockWithTransaction).not.toHaveBeenCalled(); + expect(mockWebhookDispatch).not.toHaveBeenCalled(); + expect(mockNotificationCreate).not.toHaveBeenCalled(); + }); + + it("lock is always released back to the pool even when processing throws", async () => { + const mockLockClient = { + query: jest.fn().mockImplementation((sql: string) => { + if (sql.includes("pg_try_advisory_lock")) { + return Promise.resolve({ rows: [{ acquired: true }] }); + } + // pg_advisory_unlock succeeds + return Promise.resolve({ rows: [], rowCount: 0 }); + }), + release: jest.fn(), + }; + mockGetClient.mockResolvedValue(mockLockClient); + + // Make the pool-level query (getLastIndexedLedger) throw + const mockQuery = (await import("../../db/connection.js")) + .query as jest.Mock<() => Promise>; + mockQuery.mockRejectedValueOnce(new Error("db unavailable")); + + const indexer = makeIndexer(); + // start() catches the error from pollOnce internally and schedules next poll + await indexer.start().catch(() => {}); + await indexer.stop(); + + // Lock was acquired and then unlocked despite the error + const unlockCalls = ( + (mockLockClient.query as jest.Mock).mock.calls as [string][] + ).filter(([sql]) => sql.includes("pg_advisory_unlock")); + expect(unlockCalls).toHaveLength(1); + // Client always returned to pool + expect(mockLockClient.release).toHaveBeenCalledTimes(1); + }); + + it("restarted instance resumes from the persisted cursor, not from ledger 0", async () => { + // Simulate a stopped-then-restarted indexer. The DB holds last_indexed_ledger=500. + // The restarted instance must begin fetching from 501, not from 0. + const mockLockClient = { + query: jest.fn().mockImplementation((sql: string) => { + if (sql.includes("pg_try_advisory_lock")) { + return Promise.resolve({ rows: [{ acquired: true }] }); + } + return Promise.resolve({ rows: [], rowCount: 0 }); + }), + release: jest.fn(), + }; + mockGetClient.mockResolvedValue(mockLockClient); + + const mockQuery = (await import("../../db/connection.js")) + .query as jest.Mock; + + const queriedRanges: Array<{ from: number; to: number }> = []; + + ( + mockQuery as jest.Mock< + ( + sql: string, + params?: unknown[], + ) => Promise<{ rows: unknown[]; rowCount: number }> + > + ).mockImplementation(async (sql: string, _params?: unknown[]) => { + if (sql.includes("SELECT last_indexed_ledger")) { + return { rows: [{ last_indexed_ledger: 500 }], rowCount: 1 }; + } + if (sql.includes("UPDATE indexer_state")) { + return { rows: [], rowCount: 1 }; + } + return { rows: [], rowCount: 0 }; + }); + + // Patch rpc to record the ledger range actually fetched + const indexer = makeIndexer(); + ( + indexer as unknown as { + rpc: { getLatestLedger: unknown; getEvents: unknown }; + } + ).rpc = { + getLatestLedger: async () => ({ sequence: 600 }), + getEvents: async ({ + startLedger, + endLedger, + }: { + startLedger: number; + endLedger: number; + }) => { + queriedRanges.push({ from: startLedger, to: endLedger }); + return { events: [] }; + }, + }; + + await indexer.start(); + await indexer.stop(); + + // Must have fetched starting from 501, not 0 or 1 + expect(queriedRanges.length).toBeGreaterThan(0); + expect(queriedRanges[0]?.from).toBe(501); + }); + + it("only the instance whose insert wins dispatches webhooks and notifications", async () => { + // Two indexers ingest the same event concurrently. Instance A's insert + // succeeds (rowCount=1); instance B hits ON CONFLICT DO NOTHING (rowCount=0). + // Only A should dispatch webhooks and notifications. + const event = makeRawRepaidEvent("shared-event-001"); + + const mockClientA: MockClient = { + query: jest.fn().mockResolvedValue({ + rowCount: 1, + rows: [{ event_id: "shared-event-001" }], + }), + }; + const mockClientB: MockClient = { + query: jest.fn().mockResolvedValue({ rowCount: 0, rows: [] }), + }; + + const indexerA = makeIndexer(); + const indexerB = makeIndexer(); + + mockWithTransaction.mockImplementationOnce(async (fn: TxCallback) => + fn(mockClientA), + ); + await indexerA.ingestRawEvents([event]); + + mockWithTransaction.mockImplementationOnce(async (fn: TxCallback) => + fn(mockClientB), + ); + await indexerB.ingestRawEvents([event]); + + // Webhook dispatched exactly once — by the instance that inserted the row + expect(mockWebhookDispatch).toHaveBeenCalledTimes(1); + // Notification created exactly once + expect(mockNotificationCreate).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/services/__tests__/webhookRetryProcessor.test.ts b/src/services/__tests__/webhookRetryProcessor.test.ts new file mode 100644 index 0000000..6b669ed --- /dev/null +++ b/src/services/__tests__/webhookRetryProcessor.test.ts @@ -0,0 +1,33 @@ +import { jest, describe, it, expect } from "@jest/globals"; +import { WEBHOOK_RETRY_CONFIG, getRetryDelayMs } from "../webhookService.js"; + +describe("Webhook Retry Processor", () => { + it("respects the configured backoff delays", () => { + expect(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS).toEqual([ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, + ]); + + expect(getRetryDelayMs(1)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[0]); + expect(getRetryDelayMs(2)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[1]); + expect(getRetryDelayMs(3)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[2]); + }); + + it("caps the backoff delay at the last configured value", () => { + const maxDelay = + WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[ + WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS.length - 1 + ]; + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS)).toBe( + maxDelay, + ); + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS + 5)).toBe( + maxDelay, + ); + }); + + it("configures exactly 4 max attempts", () => { + expect(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS).toBe(4); + }); +}); diff --git a/src/services/eventIndexer.ts b/src/services/eventIndexer.ts index 301e594..2e9ccde 100644 --- a/src/services/eventIndexer.ts +++ b/src/services/eventIndexer.ts @@ -1,5 +1,10 @@ import { rpc as SorobanRpc, scValToNative, xdr } from "@stellar/stellar-sdk"; -import { type PoolClient, query, withTransaction } from "../db/connection.js"; +import { + type PoolClient, + query, + withTransaction, + getClient, +} from "../db/connection.js"; import logger from "../utils/logger.js"; import { createRequestId, @@ -79,6 +84,10 @@ interface ProcessChunkResult { } export class EventIndexer { + // Stable advisory lock key reserved for the event-indexer poll cycle. + // Chosen to be unique within this database; change only with a migration. + private static readonly ADVISORY_LOCK_KEY = 738_154_291; + private readonly rpc: SorobanRpc.Server; private readonly contractIds: string[]; private readonly pollIntervalMs: number; @@ -235,18 +244,51 @@ export class EventIndexer { private async pollOnce(): Promise { if (!this.running) return; - const lastIndexedLedger = await this.getLastIndexedLedger(); - const latestLedger = await this.getLatestLedgerSequence(); + // Acquire a session-level Postgres advisory lock so that only one instance + // advances the cursor at a time. pg_try_advisory_lock returns false + // immediately when another session already holds the lock, so this instance + // skips the cycle rather than blocking. + const lockClient = await getClient(); + try { + const lockResult = await lockClient.query<{ acquired: boolean }>( + "SELECT pg_try_advisory_lock($1) AS acquired", + [EventIndexer.ADVISORY_LOCK_KEY], + ); - if (latestLedger <= lastIndexedLedger) { - return; - } + if (!lockResult.rows[0]?.acquired) { + logger.debug( + "Indexer poll skipped — another instance holds the advisory lock", + ); + return; + } - const fromLedger = lastIndexedLedger + 1; - const toLedger = Math.min(fromLedger + this.batchSize - 1, latestLedger); + logger.debug("Indexer advisory lock acquired — starting poll cycle"); - const result = await this.processChunk(fromLedger, toLedger); - await this.updateLastIndexedLedger(result.lastProcessedLedger); + try { + const lastIndexedLedger = await this.getLastIndexedLedger(); + const latestLedger = await this.getLatestLedgerSequence(); + + if (latestLedger <= lastIndexedLedger) { + return; + } + + const fromLedger = lastIndexedLedger + 1; + const toLedger = Math.min( + fromLedger + this.batchSize - 1, + latestLedger, + ); + + const result = await this.processChunk(fromLedger, toLedger); + await this.updateLastIndexedLedger(result.lastProcessedLedger); + } finally { + // Always release the advisory lock on the same connection that acquired it. + await lockClient.query("SELECT pg_advisory_unlock($1)", [ + EventIndexer.ADVISORY_LOCK_KEY, + ]); + } + } finally { + lockClient.release(); + } } private async getLatestLedgerSequence(): Promise { diff --git a/src/services/sorobanService.ts b/src/services/sorobanService.ts index 22d30eb..2df9420 100644 --- a/src/services/sorobanService.ts +++ b/src/services/sorobanService.ts @@ -654,10 +654,13 @@ class SorobanService { latestLedger?: number; error?: string; }> { + let timeoutId: NodeJS.Timeout | undefined; try { const server = this.getRpcServer(); const timeoutPromise = new Promise<{ connected: boolean; error: string }>( - (_, reject) => setTimeout(() => reject(new Error("RPC timeout")), 5000), + (_, reject) => { + timeoutId = setTimeout(() => reject(new Error("RPC timeout")), 5000); + }, ); const ledgerPromise = server.getLatestLedger().then((res) => ({ @@ -665,15 +668,16 @@ class SorobanService { latestLedger: res.sequence, })); - return await Promise.race([ - ledgerPromise, - timeoutPromise as Promise, - ]); + return await Promise.race([ledgerPromise, timeoutPromise]); } catch (error) { return { connected: false, error: error instanceof Error ? error.message : String(error), }; + } finally { + if (timeoutId) { + clearTimeout(timeoutId); + } } } diff --git a/src/services/webhookRetryScheduler.ts b/src/services/webhookRetryScheduler.ts deleted file mode 100644 index cf62b26..0000000 --- a/src/services/webhookRetryScheduler.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { query } from "../db/connection.js"; -import logger from "../utils/logger.js"; -import { WebhookService, type WebhookEventType } from "./webhookService.js"; -import { cacheService } from "./cacheService.js"; - -const BACKOFF = [60, 300, 1800]; // seconds - -const LOCK_KEY = "webhook_retry_scheduler:running"; -const LOCK_TTL_SECONDS = 120; // 2 minutes - -let schedulerInterval: NodeJS.Timeout | null = null; - -async function markAsFailed(deliveryId: number) { - await query( - `UPDATE webhook_deliveries - SET next_retry_at = NULL, - last_error = $1, - updated_at = NOW() - WHERE id = $2`, - ["Permanently failed after max attempts reached", deliveryId], - ); - logger.error(`Webhook delivery ${deliveryId} marked as permanently failed.`); -} - -function shouldRetry(delivery: any, delay: number): boolean { - const lastAttempt = new Date(delivery.updated_at).getTime(); - const now = Date.now(); - return now >= lastAttempt + delay * 1000; -} - -async function sendWebhookAgain(delivery: any) { - logger.info( - `Retrying webhook delivery ${delivery.id} (attempt ${delivery.attempt_count + 1})`, - ); - - await WebhookService.retryWebhookDelivery( - delivery.id, - delivery.subscription_id, - delivery.callback_url, - delivery.secret || undefined, - delivery.event_id, - delivery.event_type as WebhookEventType, - delivery.payload, - delivery.attempt_count, - ); -} - -export async function retryFailedWebhooks() { - let lockAcquired = false; - try { - const lockValue = `${Date.now()}-${Math.random().toString(16).slice(2)}`; - lockAcquired = await cacheService.setNotExists( - LOCK_KEY, - lockValue, - LOCK_TTL_SECONDS, - ); - } catch (error) { - logger.error("Failed to acquire webhook retry scheduler lock", { error }); - } - - if (!lockAcquired) { - logger.warn( - "Webhook retry scheduler run skipped - another instance is already running", - ); - return; - } - - try { - const result = await query(` - SELECT wd.*, ws.max_attempts, ws.callback_url, ws.secret - FROM webhook_deliveries wd - JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id - WHERE wd.delivered_at IS NULL - AND (wd.next_retry_at IS NOT NULL OR wd.attempt_count = 0) - `); - - const failed = result.rows; - - for (const delivery of failed) { - const delay = BACKOFF[delivery.attempt_count] || 3600; - - if (delivery.attempt_count >= delivery.max_attempts) { - await markAsFailed(delivery.id); - continue; - } - - if (shouldRetry(delivery, delay)) { - await sendWebhookAgain(delivery); - } - } - } catch (error) { - logger.error("Error in webhook retry scheduler", { error }); - } finally { - try { - await cacheService.delete(LOCK_KEY); - } catch (error) { - logger.error("Failed to release webhook retry scheduler lock", { error }); - } - } -} - -export function startWebhookRetryScheduler() { - if (schedulerInterval) { - logger.warn("Webhook retry scheduler already running"); - return; - } - - logger.info("Starting webhook retry scheduler (60s interval)"); - schedulerInterval = setInterval(retryFailedWebhooks, 60000); -} - -export function stopWebhookRetryScheduler() { - if (schedulerInterval) { - logger.info("Stopping webhook retry scheduler"); - clearInterval(schedulerInterval); - schedulerInterval = null; - } -} diff --git a/src/services/webhookService.ts b/src/services/webhookService.ts index 964c14c..41d97b2 100644 --- a/src/services/webhookService.ts +++ b/src/services/webhookService.ts @@ -1,6 +1,7 @@ import crypto from "node:crypto"; import { query } from "../db/connection.js"; import logger from "../utils/logger.js"; +import { cacheService } from "./cacheService.js"; export const SUPPORTED_WEBHOOK_EVENT_TYPES = [ "LoanRequested", @@ -270,34 +271,55 @@ async function postWebhook( } } -// Retry configuration for webhook delivery. -// This yields retry attempts at ~5m, ~15m, and ~45m after a failed delivery, -// for a total retry window a little over one hour after the initial attempt. const RETRY_DELAYS_MS = [ 5 * 60 * 1000, 15 * 60 * 1000, 45 * 60 * 1000, ] as const; -const MAX_RETRY_ATTEMPTS = RETRY_DELAYS_MS.length + 1; +export const WEBHOOK_RETRY_CONFIG = { + RETRY_DELAYS_MS, + MAX_RETRY_ATTEMPTS: RETRY_DELAYS_MS.length + 1, +}; export const getRetryDelayMs = (attemptNumber: number): number => { - const delayIndex = Math.min(attemptNumber - 1, RETRY_DELAYS_MS.length - 1); - return ( - RETRY_DELAYS_MS[delayIndex] ?? RETRY_DELAYS_MS[RETRY_DELAYS_MS.length - 1]! - ); + const delays = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS; + const delayIndex = Math.min(attemptNumber - 1, delays.length - 1); + return delays[delayIndex] ?? delays[delays.length - 1]!; }; export class WebhookService { // Retry processor that polls for pending retries static async processRetries(): Promise { + const LOCK_KEY = "webhook_retry_scheduler:running"; + const LOCK_TTL_SECONDS = 120; // 2 minutes + + let lockAcquired = false; + try { + const lockValue = `${Date.now()}-${Math.random().toString(16).slice(2)}`; + lockAcquired = await cacheService.setNotExists( + LOCK_KEY, + lockValue, + LOCK_TTL_SECONDS, + ); + } catch (error) { + logger.error("Failed to acquire webhook retry scheduler lock", { error }); + } + + if (!lockAcquired) { + logger.warn( + "Webhook retry processor run skipped - another instance is already running", + ); + return; + } + logger.info("Starting webhook retry processor"); try { const now = new Date(); const result = await query( - `SELECT id, subscription_id, callback_url, secret, event_id, event_type, - payload, attempt_count + `SELECT wd.id, wd.subscription_id, ws.callback_url, ws.secret, wd.event_id, wd.event_type, + wd.payload, wd.attempt_count FROM webhook_deliveries wd JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id WHERE wd.delivered_at IS NULL @@ -306,7 +328,7 @@ export class WebhookService { AND wd.attempt_count < $2 ORDER BY wd.next_retry_at ASC LIMIT 100`, - [now, MAX_RETRY_ATTEMPTS], + [now, WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS], ); if (result.rows.length === 0) { @@ -340,6 +362,14 @@ export class WebhookService { } } catch (error) { logger.error("Error in webhook retry processor", { error }); + } finally { + try { + await cacheService.delete(LOCK_KEY); + } catch (error) { + logger.error("Failed to release webhook retry scheduler lock", { + error, + }); + } } } @@ -397,7 +427,7 @@ export class WebhookService { } else { // Schedule next retry or mark as permanently failed const nextRetryTime = - newAttemptCount < MAX_RETRY_ATTEMPTS + newAttemptCount < WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS ? new Date(Date.now() + getRetryDelayMs(newAttemptCount)) : null; @@ -446,7 +476,7 @@ export class WebhookService { } catch (error) { const newAttemptCount = attemptCount + 1; const nextRetryTime = - newAttemptCount < MAX_RETRY_ATTEMPTS + newAttemptCount < WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS ? new Date(Date.now() + getRetryDelayMs(newAttemptCount)) : null; diff --git a/src/tests/auditLog.test.ts b/src/tests/auditLog.test.ts index 5a33b70..20e77d6 100644 --- a/src/tests/auditLog.test.ts +++ b/src/tests/auditLog.test.ts @@ -3,6 +3,7 @@ import { jest } from "@jest/globals"; // Use unstable_mockModule for robust ESM mocking of the connection module. jest.unstable_mockModule("../db/connection.js", () => ({ query: jest.fn(), + getClient: jest.fn(), default: { query: jest.fn(), }, diff --git a/src/tests/distributedLock.test.ts b/src/tests/distributedLock.test.ts index 02c3d7e..4467399 100644 --- a/src/tests/distributedLock.test.ts +++ b/src/tests/distributedLock.test.ts @@ -34,8 +34,7 @@ jest.unstable_mockModule("../db/connection.js", () => ({ withTransaction: jest.fn(), })); -const { retryFailedWebhooks } = - await import("../services/webhookRetryScheduler.js"); +const { WebhookService } = await import("../services/webhookService.js"); const { scoreReconciliationService } = await import("../services/scoreReconciliationService.js"); const { runLoanDueCheck } = await import("../cron/loanCheckCron.js"); @@ -48,10 +47,10 @@ describe("distributed lock: schedulers skip when lock is held", () => { mockWarn.mockClear(); }); - describe("webhookRetryScheduler", () => { + describe("webhookRetryProcessor", () => { it("skips run when lock is not acquired", async () => { activeLocks.add("webhook_retry_scheduler:running"); - const result = await retryFailedWebhooks(); + const result = await WebhookService.processRetries(); expect(result).toBeUndefined(); expect(mockWarn).toHaveBeenCalledWith(expect.stringContaining("skipped")); }); @@ -90,8 +89,8 @@ describe("distributed lock: schedulers skip when lock is held", () => { activeLocks.clear(); // Trigger two concurrent runs - const promise1 = retryFailedWebhooks(); - const promise2 = retryFailedWebhooks(); + const promise1 = WebhookService.processRetries(); + const promise2 = WebhookService.processRetries(); await Promise.all([promise1, promise2]); diff --git a/src/tests/notificationCleanup.test.ts b/src/tests/notificationCleanup.test.ts index d0a7a7b..4346828 100644 --- a/src/tests/notificationCleanup.test.ts +++ b/src/tests/notificationCleanup.test.ts @@ -3,6 +3,7 @@ import { jest } from "@jest/globals"; // Use unstable_mockModule for robust ESM mocking of the connection module jest.unstable_mockModule("../db/connection.js", () => ({ query: jest.fn(), + getClient: jest.fn(), default: { query: jest.fn(), },