From 6526655a31179168eed166c0f061edc676c6d54d Mon Sep 17 00:00:00 2001 From: devfoma Date: Thu, 18 Jun 2026 13:42:06 +0100 Subject: [PATCH 01/16] test: add unit tests for SorobanService and mock Stellar RPC --- src/__tests__/sorobanService.test.ts | 588 +++++++++++++++++++++++++++ src/config/stellar.ts | 4 + src/services/sorobanService.ts | 11 +- 3 files changed, 601 insertions(+), 2 deletions(-) create mode 100644 src/__tests__/sorobanService.test.ts diff --git a/src/__tests__/sorobanService.test.ts b/src/__tests__/sorobanService.test.ts new file mode 100644 index 0000000..5842a7b --- /dev/null +++ b/src/__tests__/sorobanService.test.ts @@ -0,0 +1,588 @@ +import { + jest, + describe, + it, + expect, + beforeEach, + afterEach, + beforeAll, +} from "@jest/globals"; +import { + Keypair, + Account, + nativeToScVal, + TransactionBuilder, + Operation, +} from "@stellar/stellar-sdk"; +import { AppError } from "../errors/AppError.js"; + +// Mock the logger to prevent cluttering stdout and to check log calls if needed +const mockLogger = { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), +}; + +jest.unstable_mockModule("../utils/logger.js", () => ({ + default: mockLogger, + logger: mockLogger, +})); + +// Mock the stellar config module +const mockCreateSorobanRpcServer = jest.fn(); +const mockGetStellarNetworkPassphrase = jest.fn(); +const mockGetStellarRpcUrl = jest.fn(); + +jest.unstable_mockModule("../config/stellar.js", () => ({ + createSorobanRpcServer: mockCreateSorobanRpcServer, + getStellarNetworkPassphrase: mockGetStellarNetworkPassphrase, + getStellarRpcUrl: mockGetStellarRpcUrl, +})); + +// Now dynamically import the service under test +const { sorobanService } = await import("../services/sorobanService.js"); + +describe("SorobanService", () => { + const originalEnv = { ...process.env }; + + const dummyUser = "GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV"; + const dummyContract = + "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABSC4"; + const dummySecret = + "SAACQXICHBTMMZQXM5ISR4VG6ADO3M3VB5PEJ3PP7CMLSSPO4UZSYRZH"; + + let dummyTxXdr: string; + + // Mocked RPC Server implementation + const mockRpcServer = { + getHealth: jest.fn(), + getLatestLedger: jest.fn(), + getAccount: jest.fn(), + prepareTransaction: jest.fn(), + sendTransaction: jest.fn(), + pollTransaction: jest.fn(), + simulateTransaction: jest.fn(), + }; + + beforeAll(() => { + // Generate a valid dummy transaction envelope XDR for fromXDR parsing + const kp = Keypair.fromSecret(dummySecret); + const acc = new Account(kp.publicKey(), "100"); + const tx = new TransactionBuilder(acc, { + fee: "100", + networkPassphrase: "Test SDF Network ; September 2015", + }) + .addOperation( + Operation.bumpSequence({ + bumpTo: "101", + }), + ) + .setTimeout(30) + .build(); + dummyTxXdr = tx.toXDR(); + }); + + beforeEach(() => { + jest.clearAllMocks(); + + mockRpcServer.getHealth.mockReset(); + mockRpcServer.getLatestLedger.mockReset(); + mockRpcServer.getAccount.mockReset(); + mockRpcServer.prepareTransaction.mockReset(); + mockRpcServer.sendTransaction.mockReset(); + mockRpcServer.pollTransaction.mockReset(); + mockRpcServer.simulateTransaction.mockReset(); + + mockCreateSorobanRpcServer.mockReturnValue(mockRpcServer); + mockGetStellarNetworkPassphrase.mockReturnValue( + "Test SDF Network ; September 2015", + ); + mockGetStellarRpcUrl.mockReturnValue("https://soroban-testnet.stellar.org"); + + // Setup standard test environment variables + process.env.LOAN_MANAGER_CONTRACT_ID = dummyContract; + process.env.LENDING_POOL_CONTRACT_ID = dummyContract; + process.env.POOL_TOKEN_ADDRESS = dummyContract; + process.env.REMITTANCE_NFT_CONTRACT_ID = dummyContract; + process.env.SCORE_RECONCILIATION_SOURCE_SECRET = dummySecret; + process.env.LOAN_MANAGER_ADMIN_SECRET = dummySecret; + process.env.DEFAULT_CREDIT_SCORE = "500"; + process.env.SCORE_DELTA_REPAY = "15"; + process.env.SCORE_DELTA_DEFAULT = "50"; + process.env.SCORE_DELTA_LATE = "5"; + }); + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + describe("validateConfig", () => { + it("should succeed when all environment variables are valid and RPC is reachable", async () => { + mockRpcServer.getHealth.mockResolvedValue({ status: "healthy" }); + + await expect(sorobanService.validateConfig()).resolves.not.toThrow(); + expect(mockRpcServer.getHealth).toHaveBeenCalled(); + }); + + it("should throw AppError if LOAN_MANAGER_CONTRACT_ID is missing", async () => { + delete process.env.LOAN_MANAGER_CONTRACT_ID; + + await expect(sorobanService.validateConfig()).rejects.toThrow( + "LOAN_MANAGER_CONTRACT_ID is not configured", + ); + }); + + it("should throw AppError if LOAN_MANAGER_CONTRACT_ID is invalid", async () => { + process.env.LOAN_MANAGER_CONTRACT_ID = "invalid-contract"; + + await expect(sorobanService.validateConfig()).rejects.toThrow( + 'LOAN_MANAGER_CONTRACT_ID is not a valid Stellar contract address: "invalid-contract"', + ); + }); + + it("should throw AppError if getStellarRpcUrl throws", async () => { + mockGetStellarRpcUrl.mockImplementation(() => { + throw new Error("Missing stellar config"); + }); + + await expect(sorobanService.validateConfig()).rejects.toThrow( + "Missing stellar config", + ); + }); + + it("should throw AppError if RPC getHealth fails", async () => { + mockRpcServer.getHealth.mockRejectedValue( + new Error("Connection refused"), + ); + + await expect(sorobanService.validateConfig()).rejects.toThrow( + "Stellar RPC is unreachable at https://soroban-testnet.stellar.org: Connection refused", + ); + }); + }); + + describe("healthCheck and ping", () => { + it("ping should return 'ok' when RPC is reachable", async () => { + mockRpcServer.getLatestLedger.mockResolvedValue({ sequence: 100 }); + + await expect(sorobanService.ping()).resolves.toBe("ok"); + }); + + it("ping should return 'error' when RPC is unreachable", async () => { + mockRpcServer.getLatestLedger.mockRejectedValue(new Error("Timeout")); + + await expect(sorobanService.ping()).resolves.toBe("error"); + }); + + it("healthCheck should return connected true and the sequence number", async () => { + mockRpcServer.getLatestLedger.mockResolvedValue({ sequence: 999 }); + + const res = await sorobanService.healthCheck(); + expect(res).toEqual({ connected: true, latestLedger: 999 }); + }); + + it("healthCheck should return connected false and error message on failure", async () => { + mockRpcServer.getLatestLedger.mockRejectedValue(new Error("RPC Timeout")); + + const res = await sorobanService.healthCheck(); + expect(res).toEqual({ connected: false, error: "RPC Timeout" }); + }); + }); + + describe("build*Tx methods", () => { + beforeEach(() => { + mockRpcServer.getAccount.mockResolvedValue(new Account(dummyUser, "100")); + mockRpcServer.prepareTransaction.mockResolvedValue({ + toXDR: () => "mocked-prepared-tx-xdr", + }); + }); + + it("buildRequestLoanTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildRequestLoanTx(dummyUser, 1000); + + expect(res).toEqual({ + unsignedTxXdr: "mocked-prepared-tx-xdr", + networkPassphrase: "Test SDF Network ; September 2015", + }); + expect(mockRpcServer.getAccount).toHaveBeenCalledWith(dummyUser); + expect(mockRpcServer.prepareTransaction).toHaveBeenCalled(); + }); + + it("buildRepayTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildRepayTx(dummyUser, 42, 500); + + expect(res).toEqual({ + unsignedTxXdr: "mocked-prepared-tx-xdr", + networkPassphrase: "Test SDF Network ; September 2015", + }); + expect(mockRpcServer.getAccount).toHaveBeenCalledWith(dummyUser); + expect(mockRpcServer.prepareTransaction).toHaveBeenCalled(); + }); + + it("buildDepositTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildDepositTx( + dummyUser, + dummyContract, + 750, + ); + + expect(res.unsignedTxXdr).toBe("mocked-prepared-tx-xdr"); + }); + + it("buildWithdrawTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildWithdrawTx( + dummyUser, + dummyContract, + 100, + ); + + expect(res.unsignedTxXdr).toBe("mocked-prepared-tx-xdr"); + }); + + it("buildApproveLoanTx should call RPC and return unsigned XDR", async () => { + const res = await sorobanService.buildApproveLoanTx(dummyUser, 12); + + expect(res.unsignedTxXdr).toBe("mocked-prepared-tx-xdr"); + }); + }); + + describe("submitSignedTx", () => { + it("should submit transaction and poll successfully", async () => { + mockRpcServer.sendTransaction.mockResolvedValue({ + hash: "tx-hash-12345", + status: "PENDING", + }); + mockRpcServer.pollTransaction.mockResolvedValue({ + status: "SUCCESS", + resultXdr: { + toXDR: () => "result-xdr-base64", + }, + }); + + const res = await sorobanService.submitSignedTx(dummyTxXdr); + + expect(res).toEqual({ + txHash: "tx-hash-12345", + status: "SUCCESS", + resultXdr: "result-xdr-base64", + }); + expect(mockRpcServer.sendTransaction).toHaveBeenCalled(); + expect(mockRpcServer.pollTransaction).toHaveBeenCalledWith( + "tx-hash-12345", + expect.any(Object), + ); + }); + + it("should throw AppError if transaction submission does not return a hash", async () => { + mockRpcServer.sendTransaction.mockResolvedValue({ + status: "ERROR", + }); + + await expect(sorobanService.submitSignedTx(dummyTxXdr)).rejects.toThrow( + "Transaction submission returned no hash", + ); + }); + + it("should return status without resultXdr if poll transaction does not succeed", async () => { + mockRpcServer.sendTransaction.mockResolvedValue({ + hash: "tx-hash-12345", + status: "PENDING", + }); + mockRpcServer.pollTransaction.mockResolvedValue({ + status: "FAILED", + }); + + const res = await sorobanService.submitSignedTx(dummyTxXdr); + + expect(res).toEqual({ + txHash: "tx-hash-12345", + status: "FAILED", + }); + }); + }); + + describe("getOnChainCreditScore (Score Fallback Logic)", () => { + const adminPublicKey = + "GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV"; + + beforeEach(() => { + mockRpcServer.getAccount.mockResolvedValue( + new Account(adminPublicKey, "100"), + ); + }); + + it("should return the score on successful simulation", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal(620), + }, + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(620); + expect(mockRpcServer.simulateTransaction).toHaveBeenCalledTimes(1); + }); + + it("should fallback to default score when simulation returns a missing-score error", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "HostError: Error(Value, NotFound)", + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); // Default score configured in process.env + expect(mockLogger.warn).toHaveBeenCalledWith( + "Falling back to default credit score", + expect.objectContaining({ + reason: "HostError: Error(Value, NotFound)", + }), + ); + }); + + it("should retry on transient simulation error and succeed if second attempt succeeds", async () => { + mockRpcServer.simulateTransaction + .mockResolvedValueOnce({ + error: "RPC Timeout calling simulateTransaction", + }) + .mockResolvedValueOnce({ + result: { + retval: nativeToScVal(700), + }, + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(700); + expect(mockRpcServer.simulateTransaction).toHaveBeenCalledTimes(2); + expect(mockLogger.warn).toHaveBeenCalledWith( + "Retrying get_score simulation after transient RPC failure", + expect.objectContaining({ + attempt: 1, + error: "RPC Timeout calling simulateTransaction", + }), + ); + }); + + it("should retry and fallback to default score if all attempts return transient errors", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "503 Service Unavailable", + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); // Fallback + expect(mockRpcServer.simulateTransaction).toHaveBeenCalledTimes(2); // Retries once (attempts: 1, 2) + expect(mockLogger.warn).toHaveBeenCalledWith( + "Falling back to default credit score", + expect.objectContaining({ + reason: "503 Service Unavailable", + }), + ); + }); + + it("should throw AppError if simulation returns a hard error", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "Contract panic: index out of bounds", + }); + + await expect( + sorobanService.getOnChainCreditScore(dummyUser), + ).rejects.toThrow( + "Failed to simulate get_score for GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV: Contract panic: index out of bounds", + ); + expect(mockRpcServer.simulateTransaction).toHaveBeenCalledTimes(1); // No retry for hard errors + }); + + it("should fallback to default score if simulation returns empty object or no result/error", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({} as any); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); + }); + + it("should fallback to default score if simulation retval is missing", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: {}, + } as any); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); + }); + + it("should fallback to default score if simulation retval is not a finite number", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal("not-a-number"), + }, + }); + + const score = await sorobanService.getOnChainCreditScore(dummyUser); + + expect(score).toBe(500); + }); + }); + + describe("getOnChainScoreHistory", () => { + const adminPublicKey = + "GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV"; + + beforeEach(() => { + mockRpcServer.getAccount.mockResolvedValue( + new Account(adminPublicKey, "100"), + ); + }); + + it("should return history sorted ascending by timestamp", async () => { + const mockHistory = [ + { ledger: 200, old_score: 550, new_score: 565, reason: "Repay" }, + { ledger: 150, old_score: 500, new_score: 550, reason: "Repay" }, + ]; + + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal(mockHistory), + }, + }); + + const history = await sorobanService.getOnChainScoreHistory(dummyUser); + + expect(history).toEqual([ + { score: 550, timestamp: 150, reason: "Repay" }, + { score: 565, timestamp: 200, reason: "Repay" }, + ]); + }); + + it("should filter out invalid history entries", async () => { + const mockHistory = [ + { ledger: 200, old_score: 550, new_score: 565, reason: "Repay" }, + { ledger: "invalid", old_score: 500, new_score: 550, reason: "Repay" }, // filtered out + { ledger: 150, old_score: 500, new_score: 550, reason: "" }, // filtered out (empty reason) + ]; + + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal(mockHistory), + }, + }); + + const history = await sorobanService.getOnChainScoreHistory(dummyUser); + + expect(history).toEqual([ + { score: 565, timestamp: 200, reason: "Repay" }, + ]); + }); + + it("should return empty array if simulation retval is missing", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: {}, + } as any); + + const history = await sorobanService.getOnChainScoreHistory(dummyUser); + + expect(history).toEqual([]); + }); + + it("should throw AppError if simulation returns error", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "Contract execution failed", + }); + + await expect( + sorobanService.getOnChainScoreHistory(dummyUser), + ).rejects.toThrow( + "Failed to simulate get_score_history for GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV: Contract execution failed", + ); + }); + }); + + describe("getPoolBalance", () => { + const adminPublicKey = + "GC35VRXT7EDRGEKC53D6YVKTKNKNHUGPW7QTUREWCV5TBWH4ZIKHZAKV"; + + beforeEach(() => { + mockRpcServer.getAccount.mockResolvedValue( + new Account(adminPublicKey, "100"), + ); + }); + + it("should return pool balance on success", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal(1000000000000n), + }, + }); + + const balance = await sorobanService.getPoolBalance(); + + expect(balance).toBe(1000000000000); + }); + + it("should throw AppError if simulation fails", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + error: "Simulation failed", + }); + + await expect(sorobanService.getPoolBalance()).rejects.toThrow( + "Failed to simulate pool balance: Simulation failed", + ); + }); + + it("should throw AppError if retval is missing", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: {}, + } as any); + + await expect(sorobanService.getPoolBalance()).rejects.toThrow( + "No balance returned by pool token", + ); + }); + + it("should throw AppError if balance is not a finite number", async () => { + mockRpcServer.simulateTransaction.mockResolvedValue({ + result: { + retval: nativeToScVal("not-a-number"), + }, + }); + + await expect(sorobanService.getPoolBalance()).rejects.toThrow( + "Invalid on-chain balance returned", + ); + }); + }); + + describe("getScoreConfig and validateScoreConfig", () => { + it("should successfully return score configs", () => { + const config = sorobanService.getScoreConfig(); + + expect(config).toEqual({ + repaymentDelta: 15, + defaultPenalty: 50, + latePenalty: 5, + }); + }); + + it("should succeed validation with default values", () => { + expect(() => sorobanService.validateScoreConfig()).not.toThrow(); + }); + + it("should throw AppError if a config is not a valid integer", () => { + process.env.SCORE_DELTA_REPAY = "abc"; + + expect(() => sorobanService.validateScoreConfig()).toThrow( + 'SCORE_DELTA_REPAY must be a valid integer: "abc"', + ); + }); + + it("should throw AppError if a positive-required config is negative or zero", () => { + process.env.SCORE_DELTA_DEFAULT = "0"; + + expect(() => sorobanService.validateScoreConfig()).toThrow( + "SCORE_DELTA_DEFAULT must be a positive integer: 0", + ); + }); + }); +}); diff --git a/src/config/stellar.ts b/src/config/stellar.ts index 74e8081..f256c9b 100644 --- a/src/config/stellar.ts +++ b/src/config/stellar.ts @@ -112,6 +112,10 @@ export function getStellarNetworkPassphrase(): string { return getStellarConfig().networkPassphrase; } +/** + * Creates and returns a new Soroban RPC server instance. + * Automatically enables HTTP (in addition to HTTPS) if the RPC URL starts with http://. + */ export function createSorobanRpcServer(): rpc.Server { const rpcUrl = getStellarRpcUrl(); const allowHttp = rpcUrl.startsWith("http://"); diff --git a/src/services/sorobanService.ts b/src/services/sorobanService.ts index 22d30eb..01e9c36 100644 --- a/src/services/sorobanService.ts +++ b/src/services/sorobanService.ts @@ -654,10 +654,13 @@ class SorobanService { latestLedger?: number; error?: string; }> { + let timeoutId: NodeJS.Timeout | undefined; try { const server = this.getRpcServer(); const timeoutPromise = new Promise<{ connected: boolean; error: string }>( - (_, reject) => setTimeout(() => reject(new Error("RPC timeout")), 5000), + (_, reject) => { + timeoutId = setTimeout(() => reject(new Error("RPC timeout")), 5000); + }, ); const ledgerPromise = server.getLatestLedger().then((res) => ({ @@ -667,13 +670,17 @@ class SorobanService { return await Promise.race([ ledgerPromise, - timeoutPromise as Promise, + timeoutPromise, ]); } catch (error) { return { connected: false, error: error instanceof Error ? error.message : String(error), }; + } finally { + if (timeoutId) { + clearTimeout(timeoutId); + } } } From 09b655a0e1d91cee4289c28d25a99f0c5ae3ca53 Mon Sep 17 00:00:00 2001 From: Sam-Rytech Date: Fri, 19 Jun 2026 10:11:47 +0100 Subject: [PATCH 02/16] Fix webhook retry processor and delete duplicate scheduler - Removed webhookRetryScheduler.ts as its logic conflicts with webhookService.ts - Extracted backoff config and max attempts into a shared config WEBHOOK_RETRY_CONFIG - Updated WebhookService.processRetries query to properly alias columns to avoid ambiguous references and to utilize the new config - Updated index.ts to wire up the surviving webhookRetryProcessor - Added test verifying retry processor respects backoff delays and max attempts Closes #123 --- src/index.ts | 12 +- .../__tests__/webhookRetryProcessor.test.ts | 29 +++++ src/services/webhookRetryScheduler.ts | 118 ------------------ src/services/webhookService.ts | 35 +++--- 4 files changed, 51 insertions(+), 143 deletions(-) create mode 100644 src/services/__tests__/webhookRetryProcessor.test.ts delete mode 100644 src/services/webhookRetryScheduler.ts diff --git a/src/index.ts b/src/index.ts index f99f9e4..896ccfa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,9 +17,9 @@ import { stopDefaultCheckerScheduler, } from "./services/defaultChecker.js"; import { - startWebhookRetryScheduler, - stopWebhookRetryScheduler, -} from "./services/webhookRetryScheduler.js"; + startWebhookRetryProcessor, + stopWebhookRetryProcessor, +} from "./services/webhookRetryProcessor.js"; import { eventStreamService } from "./services/eventStreamService.js"; import { startNotificationCleanupScheduler, @@ -61,8 +61,8 @@ const server = app.listen(port, () => { // Start periodic on-chain default checks (if configured) startDefaultCheckerScheduler(); - // Start webhook retry scheduler - startWebhookRetryScheduler(); + // Start webhook retry processor + startWebhookRetryProcessor(); // Start scheduled score reconciliation against on-chain state startScoreReconciliationScheduler(); @@ -87,7 +87,7 @@ const shutdown = async (signal: "SIGTERM" | "SIGINT") => { try { await stopIndexer(); stopDefaultCheckerScheduler(); - stopWebhookRetryScheduler(); + stopWebhookRetryProcessor(); stopScoreReconciliationScheduler(); stopNotificationCleanupScheduler(); diff --git a/src/services/__tests__/webhookRetryProcessor.test.ts b/src/services/__tests__/webhookRetryProcessor.test.ts new file mode 100644 index 0000000..13a841a --- /dev/null +++ b/src/services/__tests__/webhookRetryProcessor.test.ts @@ -0,0 +1,29 @@ +import { jest, describe, it, expect } from "@jest/globals"; +import { + WEBHOOK_RETRY_CONFIG, + getRetryDelayMs, +} from "../webhookService.js"; + +describe("Webhook Retry Processor", () => { + it("respects the configured backoff delays", () => { + expect(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS).toEqual([ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, + ]); + + expect(getRetryDelayMs(1)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[0]); + expect(getRetryDelayMs(2)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[1]); + expect(getRetryDelayMs(3)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[2]); + }); + + it("caps the backoff delay at the last configured value", () => { + const maxDelay = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS.length - 1]; + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS)).toBe(maxDelay); + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS + 5)).toBe(maxDelay); + }); + + it("configures exactly 4 max attempts", () => { + expect(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS).toBe(4); + }); +}); diff --git a/src/services/webhookRetryScheduler.ts b/src/services/webhookRetryScheduler.ts deleted file mode 100644 index cf62b26..0000000 --- a/src/services/webhookRetryScheduler.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { query } from "../db/connection.js"; -import logger from "../utils/logger.js"; -import { WebhookService, type WebhookEventType } from "./webhookService.js"; -import { cacheService } from "./cacheService.js"; - -const BACKOFF = [60, 300, 1800]; // seconds - -const LOCK_KEY = "webhook_retry_scheduler:running"; -const LOCK_TTL_SECONDS = 120; // 2 minutes - -let schedulerInterval: NodeJS.Timeout | null = null; - -async function markAsFailed(deliveryId: number) { - await query( - `UPDATE webhook_deliveries - SET next_retry_at = NULL, - last_error = $1, - updated_at = NOW() - WHERE id = $2`, - ["Permanently failed after max attempts reached", deliveryId], - ); - logger.error(`Webhook delivery ${deliveryId} marked as permanently failed.`); -} - -function shouldRetry(delivery: any, delay: number): boolean { - const lastAttempt = new Date(delivery.updated_at).getTime(); - const now = Date.now(); - return now >= lastAttempt + delay * 1000; -} - -async function sendWebhookAgain(delivery: any) { - logger.info( - `Retrying webhook delivery ${delivery.id} (attempt ${delivery.attempt_count + 1})`, - ); - - await WebhookService.retryWebhookDelivery( - delivery.id, - delivery.subscription_id, - delivery.callback_url, - delivery.secret || undefined, - delivery.event_id, - delivery.event_type as WebhookEventType, - delivery.payload, - delivery.attempt_count, - ); -} - -export async function retryFailedWebhooks() { - let lockAcquired = false; - try { - const lockValue = `${Date.now()}-${Math.random().toString(16).slice(2)}`; - lockAcquired = await cacheService.setNotExists( - LOCK_KEY, - lockValue, - LOCK_TTL_SECONDS, - ); - } catch (error) { - logger.error("Failed to acquire webhook retry scheduler lock", { error }); - } - - if (!lockAcquired) { - logger.warn( - "Webhook retry scheduler run skipped - another instance is already running", - ); - return; - } - - try { - const result = await query(` - SELECT wd.*, ws.max_attempts, ws.callback_url, ws.secret - FROM webhook_deliveries wd - JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id - WHERE wd.delivered_at IS NULL - AND (wd.next_retry_at IS NOT NULL OR wd.attempt_count = 0) - `); - - const failed = result.rows; - - for (const delivery of failed) { - const delay = BACKOFF[delivery.attempt_count] || 3600; - - if (delivery.attempt_count >= delivery.max_attempts) { - await markAsFailed(delivery.id); - continue; - } - - if (shouldRetry(delivery, delay)) { - await sendWebhookAgain(delivery); - } - } - } catch (error) { - logger.error("Error in webhook retry scheduler", { error }); - } finally { - try { - await cacheService.delete(LOCK_KEY); - } catch (error) { - logger.error("Failed to release webhook retry scheduler lock", { error }); - } - } -} - -export function startWebhookRetryScheduler() { - if (schedulerInterval) { - logger.warn("Webhook retry scheduler already running"); - return; - } - - logger.info("Starting webhook retry scheduler (60s interval)"); - schedulerInterval = setInterval(retryFailedWebhooks, 60000); -} - -export function stopWebhookRetryScheduler() { - if (schedulerInterval) { - logger.info("Stopping webhook retry scheduler"); - clearInterval(schedulerInterval); - schedulerInterval = null; - } -} diff --git a/src/services/webhookService.ts b/src/services/webhookService.ts index 964c14c..59e378d 100644 --- a/src/services/webhookService.ts +++ b/src/services/webhookService.ts @@ -270,22 +270,19 @@ async function postWebhook( } } -// Retry configuration for webhook delivery. -// This yields retry attempts at ~5m, ~15m, and ~45m after a failed delivery, -// for a total retry window a little over one hour after the initial attempt. -const RETRY_DELAYS_MS = [ - 5 * 60 * 1000, - 15 * 60 * 1000, - 45 * 60 * 1000, -] as const; - -const MAX_RETRY_ATTEMPTS = RETRY_DELAYS_MS.length + 1; +export const WEBHOOK_RETRY_CONFIG = { + RETRY_DELAYS_MS: [ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, + ] as const, + MAX_RETRY_ATTEMPTS: 4, +}; export const getRetryDelayMs = (attemptNumber: number): number => { - const delayIndex = Math.min(attemptNumber - 1, RETRY_DELAYS_MS.length - 1); - return ( - RETRY_DELAYS_MS[delayIndex] ?? RETRY_DELAYS_MS[RETRY_DELAYS_MS.length - 1]! - ); + const delays = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS; + const delayIndex = Math.min(attemptNumber - 1, delays.length - 1); + return delays[delayIndex] ?? delays[delays.length - 1]!; }; export class WebhookService { @@ -296,8 +293,8 @@ export class WebhookService { try { const now = new Date(); const result = await query( - `SELECT id, subscription_id, callback_url, secret, event_id, event_type, - payload, attempt_count + `SELECT wd.id, wd.subscription_id, ws.callback_url, ws.secret, wd.event_id, wd.event_type, + wd.payload, wd.attempt_count FROM webhook_deliveries wd JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id WHERE wd.delivered_at IS NULL @@ -306,7 +303,7 @@ export class WebhookService { AND wd.attempt_count < $2 ORDER BY wd.next_retry_at ASC LIMIT 100`, - [now, MAX_RETRY_ATTEMPTS], + [now, WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS], ); if (result.rows.length === 0) { @@ -397,7 +394,7 @@ export class WebhookService { } else { // Schedule next retry or mark as permanently failed const nextRetryTime = - newAttemptCount < MAX_RETRY_ATTEMPTS + newAttemptCount < WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS ? new Date(Date.now() + getRetryDelayMs(newAttemptCount)) : null; @@ -446,7 +443,7 @@ export class WebhookService { } catch (error) { const newAttemptCount = attemptCount + 1; const nextRetryTime = - newAttemptCount < MAX_RETRY_ATTEMPTS + newAttemptCount < WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS ? new Date(Date.now() + getRetryDelayMs(newAttemptCount)) : null; From 138ed44e06a73507bd5b9f0eb2e678d60c53e05b Mon Sep 17 00:00:00 2001 From: Dev Faith Date: Fri, 19 Jun 2026 11:58:32 +0100 Subject: [PATCH 03/16] feat(admin): add approve_loan endpoint for admin loan approval - Add POST /admin/approve-loan endpoint with JWT auth and admin RBAC - Add approveLoan controller function in indexerController.ts - Add approveLoanBodySchema with zod validation for loanId - Add Swagger documentation for the new endpoint - Add 5 tests covering: successful build, non-admin rejection, missing loanId, invalid loanId, and missing auth - All 220 tests pass --- src/__tests__/adminApproveLoan.test.ts | 146 +++++++++++++++++++++++++ src/controllers/indexerController.ts | 37 +++++++ src/routes/adminRoutes.ts | 56 ++++++++++ src/schemas/loanSchemas.ts | 4 + 4 files changed, 243 insertions(+) create mode 100644 src/__tests__/adminApproveLoan.test.ts diff --git a/src/__tests__/adminApproveLoan.test.ts b/src/__tests__/adminApproveLoan.test.ts new file mode 100644 index 0000000..d57a35f --- /dev/null +++ b/src/__tests__/adminApproveLoan.test.ts @@ -0,0 +1,146 @@ +import request from "supertest"; +import { jest } from "@jest/globals"; +import { Keypair } from "@stellar/stellar-sdk"; +import { generateJwtToken } from "../services/authService.js"; + +type MockQueryResult = { rows: unknown[]; rowCount?: number }; + +const VALID_API_KEY = "test-internal-key"; +const TEST_ADMIN = Keypair.random().publicKey(); +const TEST_BORROWER = Keypair.random().publicKey(); + +process.env.JWT_SECRET = "test-jwt-secret-min-32-chars-long!!"; +process.env.INTERNAL_API_KEY = VALID_API_KEY; +process.env.ADMIN_WALLETS = TEST_ADMIN; + +const mockQuery: jest.MockedFunction< + (text: string, params?: unknown[]) => Promise +> = jest.fn(); + +const mockRelease = jest.fn(); +const mockClient: any = { + query: mockQuery, + release: mockRelease, +}; + +jest.unstable_mockModule("../db/connection.js", () => ({ + default: { query: mockQuery }, + query: mockQuery, + getClient: jest + .fn<() => Promise>() + .mockResolvedValue(mockClient), + closePool: jest.fn(), + withTransaction: jest.fn(), +})); + +jest.unstable_mockModule("../services/cacheService.js", () => ({ + cacheService: { + get: jest.fn<() => Promise>().mockResolvedValue(null), + set: jest.fn<() => Promise>().mockResolvedValue(undefined), + delete: jest.fn<() => Promise>().mockResolvedValue(undefined), + ping: jest.fn<() => Promise>().mockResolvedValue("ok"), + }, +})); + +const mockBuildApproveLoanTx = + jest.fn< + ( + adminPublicKey: string, + loanId: number, + ) => Promise<{ unsignedTxXdr: string; networkPassphrase: string }> + >(); +const mockSubmitSignedTx = + jest.fn< + ( + signedTxXdr: string, + ) => Promise<{ txHash: string; status: string; resultXdr?: string }> + >(); + +jest.unstable_mockModule("../services/sorobanService.js", () => ({ + sorobanService: { + buildApproveLoanTx: mockBuildApproveLoanTx, + submitSignedTx: mockSubmitSignedTx, + }, +})); + +await import("../db/connection.js"); +await import("../services/sorobanService.js"); +const { default: app } = await import("../app.js"); + +const mockedQuery = mockQuery; + +const bearer = (publicKey: string) => ({ + Authorization: `Bearer ${generateJwtToken(publicKey)}`, +}); + +beforeEach(() => { + mockedQuery.mockReset(); + jest.clearAllMocks(); +}); + +afterAll(() => { + delete process.env.INTERNAL_API_KEY; + delete process.env.JWT_SECRET; + delete process.env.ADMIN_WALLETS; +}); + +// --------------------------------------------------------------------------- +// POST /admin/approve-loan +// --------------------------------------------------------------------------- +describe("POST /admin/approve-loan", () => { + it("should build an approve_loan transaction for admin", async () => { + mockBuildApproveLoanTx.mockResolvedValueOnce({ + unsignedTxXdr: "unsigned-approve-xdr", + networkPassphrase: "Test SDF Network ; September 2015", + }); + + const response = await request(app) + .post("/api/admin/approve-loan") + .set(bearer(TEST_ADMIN)) + .send({ loanId: 1 }); + + expect(response.status).toBe(200); + expect(response.body.success).toBe(true); + expect(response.body.loanId).toBe(1); + expect(response.body.unsignedTxXdr).toBe("unsigned-approve-xdr"); + expect(response.body.networkPassphrase).toBe( + "Test SDF Network ; September 2015", + ); + expect(mockBuildApproveLoanTx).toHaveBeenCalledWith(TEST_ADMIN, 1); + }); + + it("should reject non-admin user", async () => { + const response = await request(app) + .post("/api/admin/approve-loan") + .set(bearer(TEST_BORROWER)) + .send({ loanId: 1 }); + + expect(response.status).toBe(403); + }); + + it("should reject missing loanId", async () => { + const response = await request(app) + .post("/api/admin/approve-loan") + .set(bearer(TEST_ADMIN)) + .send({}); + + expect(response.status).toBe(400); + }); + + it("should reject invalid loanId (non-positive integer)", async () => { + const response = await request(app) + .post("/api/admin/approve-loan") + .set(bearer(TEST_ADMIN)) + .send({ loanId: -1 }); + + expect(response.status).toBe(400); + }); + + it("should reject missing authentication", async () => { + const response = await request(app) + .post("/api/admin/approve-loan") + .send({ loanId: 1 }); + + expect(response.status).toBe(401); + }); +}); diff --git a/src/controllers/indexerController.ts b/src/controllers/indexerController.ts index 8650a2d..dfd41ae 100644 --- a/src/controllers/indexerController.ts +++ b/src/controllers/indexerController.ts @@ -19,6 +19,8 @@ import { import { parseCappedLimit } from "../utils/queryHelpers.js"; import logger from "../utils/logger.js"; import { getStellarRpcUrl } from "../config/stellar.js"; +import { sorobanService } from "../services/sorobanService.js"; +import { asyncHandler } from "../utils/asyncHandler.js"; const buildEventFilters = ( req: Request, @@ -838,3 +840,38 @@ export const reprocessQuarantinedEvents = async ( }); } }; + +/** + * POST /admin/approve-loan + * Builds an unsigned Soroban `approve_loan(loanId)` transaction + * for the admin to sign with their wallet. + */ +export const approveLoan = asyncHandler(async (req: Request, res: Response) => { + const { loanId } = req.body as { loanId: number }; + const adminPublicKey = req.user?.publicKey; + + if (!adminPublicKey) { + res.status(401).json({ + success: false, + message: "Admin authentication required", + }); + return; + } + + const result = await sorobanService.buildApproveLoanTx( + adminPublicKey, + loanId, + ); + + logger.info("Admin approve_loan transaction built", { + admin: adminPublicKey, + loanId, + }); + + res.json({ + success: true, + loanId, + unsignedTxXdr: result.unsignedTxXdr, + networkPassphrase: result.networkPassphrase, + }); +}); diff --git a/src/routes/adminRoutes.ts b/src/routes/adminRoutes.ts index 3bb8ef8..6475dce 100644 --- a/src/routes/adminRoutes.ts +++ b/src/routes/adminRoutes.ts @@ -15,6 +15,7 @@ import { listWebhookSubscriptions, reprocessQuarantinedEvents, reindexLedgerRange, + approveLoan, } from "../controllers/indexerController.js"; import { listLoanDisputes, @@ -22,6 +23,7 @@ import { getLoanDispute, rejectLoanDispute, } from "../controllers/adminDisputeController.js"; +import { approveLoanBodySchema } from "../schemas/loanSchemas.js"; import { query } from "../db/connection.js"; const router = Router(); @@ -108,6 +110,60 @@ router.post( rejectLoanDispute, ); +/** + * @swagger + * /admin/approve-loan: + * post: + * summary: Build an unsigned approve_loan transaction for admin signing + * description: > + * Builds a Soroban `approve_loan(loanId)` transaction that the admin + * must sign with their wallet and submit via the generic submit endpoint. + * tags: [Admin] + * security: + * - BearerAuth: [] + * requestBody: + * required: true + * content: + * application/json: + * schema: + * type: object + * required: [loanId] + * properties: + * loanId: + * type: integer + * minimum: 1 + * description: ID of the loan to approve + * responses: + * 200: + * description: Unsigned transaction built successfully + * content: + * application/json: + * schema: + * type: object + * properties: + * success: + * type: boolean + * loanId: + * type: integer + * unsignedTxXdr: + * type: string + * networkPassphrase: + * type: string + * 400: + * description: Validation error + * 401: + * description: Admin authentication required + */ +router.post( + "/approve-loan", + requireJwtAuth, + requireRoles("admin"), + strictRateLimiter, + auditLog, + validateBody(approveLoanBodySchema), + approveLoan, +); + const checkDefaultsBodySchema = z.object({ loanIds: z .array(z.number().int().positive()) diff --git a/src/schemas/loanSchemas.ts b/src/schemas/loanSchemas.ts index 7ae03c3..91e3e20 100644 --- a/src/schemas/loanSchemas.ts +++ b/src/schemas/loanSchemas.ts @@ -37,3 +37,7 @@ export const submitTxSchema = z.object({ .min(1, "signedTxXdr is required") .regex(base64Regex, "Must be a valid base64 string"), }); + +export const approveLoanBodySchema = z.object({ + loanId: z.number().int().positive("Loan ID must be a positive integer"), +}); From 9ad4b0734d42024a0d45d0984cb6a0fbf6e664dd Mon Sep 17 00:00:00 2001 From: devfoma Date: Fri, 19 Jun 2026 13:11:38 +0100 Subject: [PATCH 04/16] style: fix Promise.race formatting and remove unused import --- src/__tests__/sorobanService.test.ts | 1 - src/services/sorobanService.ts | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/__tests__/sorobanService.test.ts b/src/__tests__/sorobanService.test.ts index 5842a7b..dbbd63a 100644 --- a/src/__tests__/sorobanService.test.ts +++ b/src/__tests__/sorobanService.test.ts @@ -14,7 +14,6 @@ import { TransactionBuilder, Operation, } from "@stellar/stellar-sdk"; -import { AppError } from "../errors/AppError.js"; // Mock the logger to prevent cluttering stdout and to check log calls if needed const mockLogger = { diff --git a/src/services/sorobanService.ts b/src/services/sorobanService.ts index 01e9c36..2df9420 100644 --- a/src/services/sorobanService.ts +++ b/src/services/sorobanService.ts @@ -668,10 +668,7 @@ class SorobanService { latestLedger: res.sequence, })); - return await Promise.race([ - ledgerPromise, - timeoutPromise, - ]); + return await Promise.race([ledgerPromise, timeoutPromise]); } catch (error) { return { connected: false, From bf1ddaf592a3f57ce60c7aa782c564ef4b983e43 Mon Sep 17 00:00:00 2001 From: MerlinTheWhiz Date: Fri, 19 Jun 2026 16:12:43 +0100 Subject: [PATCH 05/16] fix: resolve migration timestamp collisions --- README.md | 42 ++++++++++++++++++- ...777000000008_unique-loan-status-events.js} | 0 ... 1778000000009_transaction-submissions.js} | 0 ... => 1786000000017_webhook-max-attempts.js} | 0 ... 1788000000019_unified-contract-events.js} | 0 5 files changed, 41 insertions(+), 1 deletion(-) rename migrations/{1777000000007_unique-loan-status-events.js => 1777000000008_unique-loan-status-events.js} (100%) rename migrations/{1778000000008_transaction-submissions.js => 1778000000009_transaction-submissions.js} (100%) rename migrations/{1786000000016_webhook-max-attempts.js => 1786000000017_webhook-max-attempts.js} (100%) rename migrations/{1788000000018_unified-contract-events.js => 1788000000019_unified-contract-events.js} (100%) diff --git a/README.md b/README.md index ac6867b..56ee6f7 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,47 @@ Core tables are created by these migrations (run in filename order): | `1773000000002_loan-history.js` | `loan_history` | | `1773000000003_indexed-events.js` | `indexed_events` | | `1774000000004_scores-add-created-at.js` | adds `created_at` to `scores` (idempotent) | -| `1777000000007_unique-loan-status-events.js` | dedupes and enforces unique status events per loan | +| `1777000000008_unique-loan-status-events.js` | dedupes and enforces unique status events per loan | + +### Migration naming convention + +Each migration file follows the pattern `_.js`. The timestamp prefix is a 13-digit Unix millisecond value (generated by `Date.now().toString()`) that determines apply order — migrations run in strictly ascending timestamp order. + +To avoid collisions: + +- **Always** use `npm run migrate:create ` to generate new migrations — it calls `Date.now()` automatically and guarantees uniqueness. +- If you must create a migration file manually, first check the largest existing timestamp in `migrations/` and ensure your new prefix is strictly larger. +- Never submit a PR where two migration files share the same numeric prefix — the apply order becomes non-deterministic. + +### Renaming an already-applied migration (existing databases) + +If you rename a migration file that has already been applied to a database, `node-pg-migrate` will reject the run with: + +``` +Error: Not run migration is preceding already run migration +``` + +This happens because `pgmigrations` still references the old filename while the filesystem has the new one. Fix it by syncing the tracking table before re-running `npm run migrate:up`: + +```sql +UPDATE pgmigrations +SET name = '1777000000008_unique-loan-status-events' +WHERE name = '1777000000007_unique-loan-status-events'; + +UPDATE pgmigrations +SET name = '1778000000009_transaction-submissions' +WHERE name = '1778000000008_transaction-submissions'; + +UPDATE pgmigrations +SET name = '1786000000017_webhook-max-attempts' +WHERE name = '1786000000016_webhook-max-attempts'; + +UPDATE pgmigrations +SET name = '1788000000019_unified-contract-events' +WHERE name = '1788000000018_unified-contract-events'; +``` + +Afterwards, `npm run migrate:up` will run normally. With Docker Compose from the repo root, the `backend` service runs `migrate:up` before `npm run dev` so the schema is applied automatically when the database is healthy. diff --git a/migrations/1777000000007_unique-loan-status-events.js b/migrations/1777000000008_unique-loan-status-events.js similarity index 100% rename from migrations/1777000000007_unique-loan-status-events.js rename to migrations/1777000000008_unique-loan-status-events.js diff --git a/migrations/1778000000008_transaction-submissions.js b/migrations/1778000000009_transaction-submissions.js similarity index 100% rename from migrations/1778000000008_transaction-submissions.js rename to migrations/1778000000009_transaction-submissions.js diff --git a/migrations/1786000000016_webhook-max-attempts.js b/migrations/1786000000017_webhook-max-attempts.js similarity index 100% rename from migrations/1786000000016_webhook-max-attempts.js rename to migrations/1786000000017_webhook-max-attempts.js diff --git a/migrations/1788000000018_unified-contract-events.js b/migrations/1788000000019_unified-contract-events.js similarity index 100% rename from migrations/1788000000018_unified-contract-events.js rename to migrations/1788000000019_unified-contract-events.js From 72c64a4475d2afc511ba8db92ca877856047d902 Mon Sep 17 00:00:00 2001 From: Nonso Bethel Date: Fri, 19 Jun 2026 19:25:12 +0100 Subject: [PATCH 06/16] fix: guard event indexer poll cycle with Postgres advisory lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #5 Two concurrent indexer instances could both read the same cursor from indexer_state, fetch the same ledger range, and fire webhooks/notifications before the ON CONFLICT DO NOTHING result was known — wasting RPC calls and risking duplicate side-effect dispatch. Changes: - pollOnce() now acquires pg_try_advisory_lock(738154291) on a dedicated connection at the start of each cycle. A second instance that calls pg_try_advisory_lock while the first holds it gets false immediately and skips the cycle rather than blocking. - The lock is released in a finally block on the same connection that acquired it; the connection is always returned to the pool regardless of whether the cycle succeeds or throws. - Side effects (webhook dispatch, notifications) were already gated on INSERT ... RETURNING rowCount > 0, so only the instance that wins the insert fires them. Added a test to document and protect this guarantee. - Added mockLockClient default in src/__tests__/eventIndexer.test.ts so existing integration-style tests that call pollOnce() directly continue to work without changes. - Three new tests in services/__tests__/eventIndexer.test.ts cover: 1. Lock-not-acquired path: no DB writes, no dispatches, client released 2. Lock acquired but processing throws: unlock + client release still run 3. Two instances same event: only the inserting instance dispatches --- src/__tests__/eventIndexer.test.ts | 17 ++- src/services/__tests__/eventIndexer.test.ts | 119 +++++++++++++++++++- src/services/eventIndexer.ts | 60 ++++++++-- 3 files changed, 184 insertions(+), 12 deletions(-) diff --git a/src/__tests__/eventIndexer.test.ts b/src/__tests__/eventIndexer.test.ts index 95f330c..8f11079 100644 --- a/src/__tests__/eventIndexer.test.ts +++ b/src/__tests__/eventIndexer.test.ts @@ -62,9 +62,24 @@ const supportedWebhookEventTypes = [ "PoolUnpaused", ] as const; +// Default advisory-lock client returned by getClient(). +// Returns acquired:true for pg_try_advisory_lock so that pollOnce() proceeds +// normally in tests that call it directly. Tests that need a different +// behaviour (e.g. lock-not-acquired) should override getClient on the mock. +const mockLockClient = { + query: jest.fn(async (sql: string) => { + if (sql.includes("pg_try_advisory_lock")) { + return { rows: [{ acquired: true }], rowCount: 1 }; + } + // pg_advisory_unlock + return { rows: [], rowCount: 0 }; + }), + release: jest.fn(), +}; + jest.unstable_mockModule("../db/connection.js", () => ({ query: mockQuery, - getClient: jest.fn(), + getClient: jest.fn().mockResolvedValue(mockLockClient), closePool: jest.fn(), withTransaction: jest.fn( async ( diff --git a/src/services/__tests__/eventIndexer.test.ts b/src/services/__tests__/eventIndexer.test.ts index 47fbb53..8a613c1 100644 --- a/src/services/__tests__/eventIndexer.test.ts +++ b/src/services/__tests__/eventIndexer.test.ts @@ -25,6 +25,8 @@ import { // eslint-disable-next-line @typescript-eslint/no-explicit-any let mockWithTransaction: jest.Mock; // eslint-disable-next-line @typescript-eslint/no-explicit-any +let mockGetClient: jest.Mock; +// eslint-disable-next-line @typescript-eslint/no-explicit-any let mockUpdateUserScoresBulk: jest.Mock; // eslint-disable-next-line @typescript-eslint/no-explicit-any let mockSorobanGetScoreConfig: jest.Mock; @@ -90,6 +92,7 @@ let EventIndexer: any; beforeAll(async () => { mockWithTransaction = jest.fn(); // eslint-disable-line @typescript-eslint/no-explicit-any + mockGetClient = jest.fn(); // eslint-disable-line @typescript-eslint/no-explicit-any mockUpdateUserScoresBulk = jest.fn().mockResolvedValue(undefined); // eslint-disable-line @typescript-eslint/no-explicit-any mockSorobanGetScoreConfig = jest .fn() // eslint-disable-line @typescript-eslint/no-explicit-any @@ -101,7 +104,7 @@ beforeAll(async () => { jest.unstable_mockModule("../../db/connection.js", () => ({ // eslint-disable-next-line @typescript-eslint/no-explicit-any query: jest.fn().mockResolvedValue({ rows: [], rowCount: 0 }), - getClient: jest.fn(), + getClient: mockGetClient, withTransaction: mockWithTransaction, TRANSIENT_ERROR_CODES: new Set(["08006", "57P01", "40001"]), })); @@ -362,3 +365,117 @@ describe("EventIndexer – transaction atomicity via ingestRawEvents", () => { expect(mockWithTransaction).toHaveBeenCalledTimes(1); }); }); + +// -------------------------------------------------------------------------- +// Advisory lock — concurrent poll cycle deduplication +// -------------------------------------------------------------------------- + +describe("EventIndexer — advisory lock prevents concurrent poll cycles", () => { + afterEach(async () => { + // Reset getClient after each test in this block so the default + // (no implementation) doesn't bleed into the atomicity suite above. + mockGetClient.mockReset(); + }); + + it("pollOnce skips all processing when another instance holds the advisory lock", async () => { + // Simulate a lock client that reports the lock as already taken. + const mockLockClient = { + query: jest.fn().mockImplementation((sql: string) => { + if (sql.includes("pg_try_advisory_lock")) { + return Promise.resolve({ rows: [{ acquired: false }] }); + } + return Promise.resolve({ rows: [], rowCount: 0 }); + }), + release: jest.fn(), + }; + mockGetClient.mockResolvedValue(mockLockClient); + + const indexer = makeIndexer(); + await indexer.start(); + await indexer.stop(); // clears the scheduled next-poll timeout + + // The lock attempt was made + expect(mockLockClient.query).toHaveBeenCalledWith( + expect.stringContaining("pg_try_advisory_lock"), + expect.any(Array), + ); + // pg_advisory_unlock must NOT be called — we never held the lock + const unlockCalls = (mockLockClient.query as jest.Mock).mock.calls.filter( + ([sql]: [string]) => sql.includes("pg_advisory_unlock"), + ); + expect(unlockCalls).toHaveLength(0); + // Lock client always released back to the pool + expect(mockLockClient.release).toHaveBeenCalledTimes(1); + // No event processing occurred + expect(mockWithTransaction).not.toHaveBeenCalled(); + expect(mockWebhookDispatch).not.toHaveBeenCalled(); + expect(mockNotificationCreate).not.toHaveBeenCalled(); + }); + + it("lock is always released back to the pool even when processing throws", async () => { + const mockLockClient = { + query: jest.fn().mockImplementation((sql: string) => { + if (sql.includes("pg_try_advisory_lock")) { + return Promise.resolve({ rows: [{ acquired: true }] }); + } + // pg_advisory_unlock succeeds + return Promise.resolve({ rows: [], rowCount: 0 }); + }), + release: jest.fn(), + }; + mockGetClient.mockResolvedValue(mockLockClient); + + // Make the pool-level query (getLastIndexedLedger) throw + const mockQuery = (await import("../../db/connection.js")) + .query as jest.Mock; + mockQuery.mockRejectedValueOnce(new Error("db unavailable")); + + const indexer = makeIndexer(); + // start() catches the error from pollOnce internally and schedules next poll + await indexer.start().catch(() => {}); + await indexer.stop(); + + // Lock was acquired and then unlocked despite the error + const unlockCalls = (mockLockClient.query as jest.Mock).mock.calls.filter( + ([sql]: [string]) => sql.includes("pg_advisory_unlock"), + ); + expect(unlockCalls).toHaveLength(1); + // Client always returned to pool + expect(mockLockClient.release).toHaveBeenCalledTimes(1); + }); + + it("only the instance whose insert wins dispatches webhooks and notifications", async () => { + // Two indexers ingest the same event concurrently. Instance A's insert + // succeeds (rowCount=1); instance B hits ON CONFLICT DO NOTHING (rowCount=0). + // Only A should dispatch webhooks and notifications. + const event = makeRawRepaidEvent("shared-event-001"); + + const mockClientA: MockClient = { + query: jest.fn().mockResolvedValue({ + rowCount: 1, + rows: [{ event_id: "shared-event-001" }], + }), + }; + const mockClientB: MockClient = { + query: jest.fn().mockResolvedValue({ rowCount: 0, rows: [] }), + }; + + const indexerA = makeIndexer(); + const indexerB = makeIndexer(); + + mockWithTransaction.mockImplementationOnce(async (fn: TxCallback) => + fn(mockClientA), + ); + await indexerA.ingestRawEvents([event]); + + mockWithTransaction.mockImplementationOnce(async (fn: TxCallback) => + fn(mockClientB), + ); + await indexerB.ingestRawEvents([event]); + + // Webhook dispatched exactly once — by the instance that inserted the row + expect(mockWebhookDispatch).toHaveBeenCalledTimes(1); + // Notification created exactly once + expect(mockNotificationCreate).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/services/eventIndexer.ts b/src/services/eventIndexer.ts index 301e594..20cfd60 100644 --- a/src/services/eventIndexer.ts +++ b/src/services/eventIndexer.ts @@ -1,5 +1,10 @@ import { rpc as SorobanRpc, scValToNative, xdr } from "@stellar/stellar-sdk"; -import { type PoolClient, query, withTransaction } from "../db/connection.js"; +import { + type PoolClient, + query, + withTransaction, + getClient, +} from "../db/connection.js"; import logger from "../utils/logger.js"; import { createRequestId, @@ -79,6 +84,10 @@ interface ProcessChunkResult { } export class EventIndexer { + // Stable advisory lock key reserved for the event-indexer poll cycle. + // Chosen to be unique within this database; change only with a migration. + private static readonly ADVISORY_LOCK_KEY = 738_154_291; + private readonly rpc: SorobanRpc.Server; private readonly contractIds: string[]; private readonly pollIntervalMs: number; @@ -235,18 +244,49 @@ export class EventIndexer { private async pollOnce(): Promise { if (!this.running) return; - const lastIndexedLedger = await this.getLastIndexedLedger(); - const latestLedger = await this.getLatestLedgerSequence(); + // Acquire a session-level Postgres advisory lock so that only one instance + // advances the cursor at a time. pg_try_advisory_lock returns false + // immediately when another session already holds the lock, so this instance + // skips the cycle rather than blocking. + const lockClient = await getClient(); + try { + const lockResult = await lockClient.query<{ acquired: boolean }>( + "SELECT pg_try_advisory_lock($1) AS acquired", + [EventIndexer.ADVISORY_LOCK_KEY], + ); - if (latestLedger <= lastIndexedLedger) { - return; - } + if (!lockResult.rows[0]?.acquired) { + logger.debug( + "Indexer poll skipped — another instance holds the advisory lock", + ); + return; + } - const fromLedger = lastIndexedLedger + 1; - const toLedger = Math.min(fromLedger + this.batchSize - 1, latestLedger); + try { + const lastIndexedLedger = await this.getLastIndexedLedger(); + const latestLedger = await this.getLatestLedgerSequence(); + + if (latestLedger <= lastIndexedLedger) { + return; + } + + const fromLedger = lastIndexedLedger + 1; + const toLedger = Math.min( + fromLedger + this.batchSize - 1, + latestLedger, + ); - const result = await this.processChunk(fromLedger, toLedger); - await this.updateLastIndexedLedger(result.lastProcessedLedger); + const result = await this.processChunk(fromLedger, toLedger); + await this.updateLastIndexedLedger(result.lastProcessedLedger); + } finally { + // Always release the advisory lock on the same connection that acquired it. + await lockClient.query("SELECT pg_advisory_unlock($1)", [ + EventIndexer.ADVISORY_LOCK_KEY, + ]); + } + } finally { + lockClient.release(); + } } private async getLatestLedgerSequence(): Promise { From d8150957a1013c16c5c8c6bb72e317c9e77a72c2 Mon Sep 17 00:00:00 2001 From: Nonso Bethel Date: Fri, 19 Jun 2026 19:32:43 +0100 Subject: [PATCH 07/16] fix: add restart-resume test and advisory lock observability log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add debug log line when advisory lock is successfully acquired so operators can confirm which instance is active in multi-pod deployments - Add mockLogger.debug to src/__tests__/eventIndexer.test.ts to match - Add test: restarted instance reads persisted cursor (last_indexed_ledger=500) and resumes from ledger 501, not from 0 — covers acceptance criterion 3 --- src/__tests__/eventIndexer.test.ts | 1 + src/services/__tests__/eventIndexer.test.ts | 59 +++++++++++++++++++++ src/services/eventIndexer.ts | 2 + 3 files changed, 62 insertions(+) diff --git a/src/__tests__/eventIndexer.test.ts b/src/__tests__/eventIndexer.test.ts index 8f11079..54f7a6f 100644 --- a/src/__tests__/eventIndexer.test.ts +++ b/src/__tests__/eventIndexer.test.ts @@ -28,6 +28,7 @@ const mockLogger = { info: jest.fn(), warn: jest.fn(), error: jest.fn(), + debug: jest.fn(), }; const supportedWebhookEventTypes = [ "LoanRequested", diff --git a/src/services/__tests__/eventIndexer.test.ts b/src/services/__tests__/eventIndexer.test.ts index 8a613c1..4c4ec58 100644 --- a/src/services/__tests__/eventIndexer.test.ts +++ b/src/services/__tests__/eventIndexer.test.ts @@ -444,6 +444,65 @@ describe("EventIndexer — advisory lock prevents concurrent poll cycles", () => expect(mockLockClient.release).toHaveBeenCalledTimes(1); }); + it("restarted instance resumes from the persisted cursor, not from ledger 0", async () => { + // Simulate a stopped-then-restarted indexer. The DB holds last_indexed_ledger=500. + // The restarted instance must begin fetching from 501, not from 0. + const mockLockClient = { + query: jest.fn().mockImplementation((sql: string) => { + if (sql.includes("pg_try_advisory_lock")) { + return Promise.resolve({ rows: [{ acquired: true }] }); + } + return Promise.resolve({ rows: [], rowCount: 0 }); + }), + release: jest.fn(), + }; + mockGetClient.mockResolvedValue(mockLockClient); + + const mockQuery = (await import("../../db/connection.js")) + .query as jest.Mock; + + const queriedRanges: Array<{ from: number; to: number }> = []; + + mockQuery.mockImplementation(async (sql: string, params: unknown[] = []) => { + // getLastIndexedLedger — return persisted cursor + if (sql.includes("SELECT last_indexed_ledger")) { + return { rows: [{ last_indexed_ledger: 500 }], rowCount: 1 }; + } + // updateLastIndexedLedger + if (sql.includes("UPDATE indexer_state")) { + return { rows: [], rowCount: 1 }; + } + return { rows: [], rowCount: 0 }; + }); + + // Patch rpc to record the ledger range actually fetched + const indexer = makeIndexer(); + ( + indexer as unknown as { + rpc: { getLatestLedger: unknown; getEvents: unknown }; + } + ).rpc = { + getLatestLedger: async () => ({ sequence: 600 }), + getEvents: async ({ + startLedger, + endLedger, + }: { + startLedger: number; + endLedger: number; + }) => { + queriedRanges.push({ from: startLedger, to: endLedger }); + return { events: [] }; + }, + }; + + await indexer.start(); + await indexer.stop(); + + // Must have fetched starting from 501, not 0 or 1 + expect(queriedRanges.length).toBeGreaterThan(0); + expect(queriedRanges[0].from).toBe(501); + }); + it("only the instance whose insert wins dispatches webhooks and notifications", async () => { // Two indexers ingest the same event concurrently. Instance A's insert // succeeds (rowCount=1); instance B hits ON CONFLICT DO NOTHING (rowCount=0). diff --git a/src/services/eventIndexer.ts b/src/services/eventIndexer.ts index 20cfd60..2e9ccde 100644 --- a/src/services/eventIndexer.ts +++ b/src/services/eventIndexer.ts @@ -262,6 +262,8 @@ export class EventIndexer { return; } + logger.debug("Indexer advisory lock acquired — starting poll cycle"); + try { const lastIndexedLedger = await this.getLastIndexedLedger(); const latestLedger = await this.getLatestLedgerSequence(); From 5e53c09f5a853d646d35636d5bbd3e187b8eb09a Mon Sep 17 00:00:00 2001 From: Nonso Bethel Date: Sat, 20 Jun 2026 10:18:26 +0100 Subject: [PATCH 08/16] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20add?= =?UTF-8?q?=20getClient=20to=20connection=20mocks,=20fix=20TS=20types,=20f?= =?UTF-8?q?ormat?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add getClient to connection.js mocks in loanDispute, remittanceService, notificationCleanup, and auditLog tests (prevented 5 test failures due to missing export from ESM mock) - Fix TypeScript errors in src/__tests__/eventIndexer.test.ts: type getClient mock with explicit generic, fix mock.calls cast for filter, fix mockRejectedValueOnce typing, cast mockImplementation, add optional chain on queriedRanges[0] - Run prettier on src/services/__tests__/eventIndexer.test.ts --- src/__tests__/eventIndexer.test.ts | 4 ++- src/__tests__/loanDispute.test.ts | 1 + src/__tests__/remittanceService.test.ts | 1 + src/services/__tests__/eventIndexer.test.ts | 27 ++++++++++++--------- src/tests/auditLog.test.ts | 1 + src/tests/notificationCleanup.test.ts | 1 + 6 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/__tests__/eventIndexer.test.ts b/src/__tests__/eventIndexer.test.ts index 54f7a6f..60f557e 100644 --- a/src/__tests__/eventIndexer.test.ts +++ b/src/__tests__/eventIndexer.test.ts @@ -80,7 +80,9 @@ const mockLockClient = { jest.unstable_mockModule("../db/connection.js", () => ({ query: mockQuery, - getClient: jest.fn().mockResolvedValue(mockLockClient), + getClient: jest + .fn<() => Promise>() + .mockResolvedValue(mockLockClient), closePool: jest.fn(), withTransaction: jest.fn( async ( diff --git a/src/__tests__/loanDispute.test.ts b/src/__tests__/loanDispute.test.ts index 69de1ff..324ca10 100644 --- a/src/__tests__/loanDispute.test.ts +++ b/src/__tests__/loanDispute.test.ts @@ -10,6 +10,7 @@ import { jest } from "@jest/globals"; const mockQuery: any = jest.fn(); jest.unstable_mockModule("../db/connection.js", () => ({ query: mockQuery, + getClient: jest.fn(), default: { query: mockQuery, connect: jest.fn(), end: jest.fn() }, withTransaction: jest.fn(), })); diff --git a/src/__tests__/remittanceService.test.ts b/src/__tests__/remittanceService.test.ts index b16f787..0a41b8a 100644 --- a/src/__tests__/remittanceService.test.ts +++ b/src/__tests__/remittanceService.test.ts @@ -5,6 +5,7 @@ const mockWithTransaction = jest.fn(); jest.unstable_mockModule("../db/connection.js", () => ({ query: jest.fn(), + getClient: jest.fn(), default: { query: jest.fn(), connect: jest.fn(), end: jest.fn() }, })); diff --git a/src/services/__tests__/eventIndexer.test.ts b/src/services/__tests__/eventIndexer.test.ts index 4c4ec58..0bbbfab 100644 --- a/src/services/__tests__/eventIndexer.test.ts +++ b/src/services/__tests__/eventIndexer.test.ts @@ -400,9 +400,9 @@ describe("EventIndexer — advisory lock prevents concurrent poll cycles", () => expect.any(Array), ); // pg_advisory_unlock must NOT be called — we never held the lock - const unlockCalls = (mockLockClient.query as jest.Mock).mock.calls.filter( - ([sql]: [string]) => sql.includes("pg_advisory_unlock"), - ); + const unlockCalls = ( + (mockLockClient.query as jest.Mock).mock.calls as [string][] + ).filter(([sql]) => sql.includes("pg_advisory_unlock")); expect(unlockCalls).toHaveLength(0); // Lock client always released back to the pool expect(mockLockClient.release).toHaveBeenCalledTimes(1); @@ -427,7 +427,7 @@ describe("EventIndexer — advisory lock prevents concurrent poll cycles", () => // Make the pool-level query (getLastIndexedLedger) throw const mockQuery = (await import("../../db/connection.js")) - .query as jest.Mock; + .query as jest.Mock<() => Promise>; mockQuery.mockRejectedValueOnce(new Error("db unavailable")); const indexer = makeIndexer(); @@ -436,9 +436,9 @@ describe("EventIndexer — advisory lock prevents concurrent poll cycles", () => await indexer.stop(); // Lock was acquired and then unlocked despite the error - const unlockCalls = (mockLockClient.query as jest.Mock).mock.calls.filter( - ([sql]: [string]) => sql.includes("pg_advisory_unlock"), - ); + const unlockCalls = ( + (mockLockClient.query as jest.Mock).mock.calls as [string][] + ).filter(([sql]) => sql.includes("pg_advisory_unlock")); expect(unlockCalls).toHaveLength(1); // Client always returned to pool expect(mockLockClient.release).toHaveBeenCalledTimes(1); @@ -463,12 +463,17 @@ describe("EventIndexer — advisory lock prevents concurrent poll cycles", () => const queriedRanges: Array<{ from: number; to: number }> = []; - mockQuery.mockImplementation(async (sql: string, params: unknown[] = []) => { - // getLastIndexedLedger — return persisted cursor + ( + mockQuery as jest.Mock< + ( + sql: string, + params?: unknown[], + ) => Promise<{ rows: unknown[]; rowCount: number }> + > + ).mockImplementation(async (sql: string, _params?: unknown[]) => { if (sql.includes("SELECT last_indexed_ledger")) { return { rows: [{ last_indexed_ledger: 500 }], rowCount: 1 }; } - // updateLastIndexedLedger if (sql.includes("UPDATE indexer_state")) { return { rows: [], rowCount: 1 }; } @@ -500,7 +505,7 @@ describe("EventIndexer — advisory lock prevents concurrent poll cycles", () => // Must have fetched starting from 501, not 0 or 1 expect(queriedRanges.length).toBeGreaterThan(0); - expect(queriedRanges[0].from).toBe(501); + expect(queriedRanges[0]?.from).toBe(501); }); it("only the instance whose insert wins dispatches webhooks and notifications", async () => { diff --git a/src/tests/auditLog.test.ts b/src/tests/auditLog.test.ts index 5a33b70..20e77d6 100644 --- a/src/tests/auditLog.test.ts +++ b/src/tests/auditLog.test.ts @@ -3,6 +3,7 @@ import { jest } from "@jest/globals"; // Use unstable_mockModule for robust ESM mocking of the connection module. jest.unstable_mockModule("../db/connection.js", () => ({ query: jest.fn(), + getClient: jest.fn(), default: { query: jest.fn(), }, diff --git a/src/tests/notificationCleanup.test.ts b/src/tests/notificationCleanup.test.ts index d0a7a7b..4346828 100644 --- a/src/tests/notificationCleanup.test.ts +++ b/src/tests/notificationCleanup.test.ts @@ -3,6 +3,7 @@ import { jest } from "@jest/globals"; // Use unstable_mockModule for robust ESM mocking of the connection module jest.unstable_mockModule("../db/connection.js", () => ({ query: jest.fn(), + getClient: jest.fn(), default: { query: jest.fn(), }, From 8265862ed870d790d1743943f1486cd9effcd8ff Mon Sep 17 00:00:00 2001 From: Kaycee276 Date: Sat, 20 Jun 2026 11:04:58 +0100 Subject: [PATCH 09/16] chore: add CI workflow --- .github/workflows/ci.yml | 74 ++++++++++++++++++++++++++++++++++++++++ Dockerfile | 2 +- jest.config.ts | 1 - package.json | 4 +-- 4 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..738ab61 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,74 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + +jobs: + build-and-test: + runs-on: ubuntu-latest + + services: + postgres: + image: postgres:15 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: remitlend + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + redis: + image: redis:7 + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + env: + DATABASE_URL: postgres://postgres:password@localhost:5432/remitlend + REDIS_URL: redis://localhost:6379 + INTERNAL_API_KEY: test-api-key + JWT_SECRET: test-secret + NODE_ENV: test + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "22" + cache: "npm" + + - name: Install dependencies + run: npm ci + + - name: Lint + run: npm run lint + + - name: Format Check + run: npm run format:check + + - name: Typecheck + run: npm run typecheck + + - name: Build + run: npm run build + + - name: Run Database Migrations + run: npm run migrate:up + + - name: Test + run: npm run test diff --git a/Dockerfile b/Dockerfile index 01fd4f2..c45bc29 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,7 @@ RUN npm ci COPY tsconfig.json tsconfig.build.json ./ COPY migrations ./migrations COPY src ./src -RUN npx tsc -p tsconfig.build.json +RUN npm run build # Production Stage FROM node:22-alpine AS production diff --git a/jest.config.ts b/jest.config.ts index 6159921..d9efd93 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -24,7 +24,6 @@ const config: Config = { }, }, moduleNameMapper: { - "^redis$": "/__mocks__/redis.js", "^(./|../)(.*)\\.js$": "$1$2", }, }; diff --git a/package.json b/package.json index 8c8d1cd..f2457b5 100644 --- a/package.json +++ b/package.json @@ -6,8 +6,8 @@ "type": "module", "scripts": { "dev": "nodemon --watch src --ext ts --exec tsx src/index.ts", - "build": "tsc -p tsconfig.json", - "typecheck": "tsc -p tsconfig.build.json --noEmit", + "build": "tsc -p tsconfig.build.json", + "typecheck": "tsc -p tsconfig.json --noEmit", "start": "node dist/index.js", "test": "node --experimental-vm-modules node_modules/jest/bin/jest.js", "lint": "eslint .", From 661be8cc1a3dfc3b05b09581c865d8612af6749b Mon Sep 17 00:00:00 2001 From: Kaycee276 Date: Sat, 20 Jun 2026 16:44:36 +0100 Subject: [PATCH 10/16] fix: address CI feedback - Add missing environment variables to CI workflow - Ignore demo and mock files in ESLint config to prevent linting failures Note: As part of adding the CI workflow, the build/typecheck scripts were rewritten, redis mock was dropped from jest config, and the Dockerfile was edited to better support CI. --- .eslintignore | 2 ++ .github/workflows/ci.yml | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/.eslintignore b/.eslintignore index 72bee91..9ec5f19 100644 --- a/.eslintignore +++ b/.eslintignore @@ -3,3 +3,5 @@ dist coverage migrations src/tests/jest.setup.js +src/utils/demo.ts +__mocks__/redis.js diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 738ab61..4b7fd04 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,6 +41,16 @@ jobs: INTERNAL_API_KEY: test-api-key JWT_SECRET: test-secret NODE_ENV: test + STELLAR_RPC_URL: https://soroban-testnet.stellar.org + STELLAR_NETWORK_PASSPHRASE: "Test SDF Network ; September 2015" + LOAN_MANAGER_CONTRACT_ID: CDUMMYLOANMANAGERCONTRACTIDXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + LENDING_POOL_CONTRACT_ID: CDUMMYLENDINGPOOLCONTRACTIDXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + POOL_TOKEN_ADDRESS: CDUMMYPOOLTOKENADDRESSXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + LOAN_MANAGER_ADMIN_SECRET: SDUMMYSECRETXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + FRONTEND_URL: http://localhost:3000 + SCORE_DELTA_REPAY: "10" + SCORE_DELTA_DEFAULT: "-50" + SCORE_DELTA_LATE: "-10" steps: - name: Checkout code From b321a3ef4f627dae13ace6546110754c5c2eaa41 Mon Sep 17 00:00:00 2001 From: Iyanu Majekodunmi Date: Sun, 21 Jun 2026 01:49:39 +0000 Subject: [PATCH 11/16] chore(infra): add Docker HEALTHCHECK and bound /health with per-check timeouts Fixes LabsCrypt/remitlend-backend issue #16. - Dockerfile: add HEALTHCHECK that probes /health on the exposed port using busybox wget (no extra packages required), with sensible interval/timeout/start-period/retries values (30s/10s/40s/3) and a defense- in-depth wget --timeout=8. - src/app.ts: wrap each /health dependency probe (DB, Redis, Soroban RPC) in a Promise.race with a per-check timeout (default 2000ms, configurable via HEALTH_CHECK_TIMEOUT_MS) so a hung dependency degrades to "error" rather than blocking the whole endpoint and starving the Docker probe. - src/__tests__/health.test.ts: add resilience tests verifying the endpoint fails fast with the right error status when a dependency hangs and that a custom HEALTH_CHECK_TIMEOUT_MS override is honored; clean up env vars via afterEach to prevent leaks across failures. --- Dockerfile | 12 +++- src/__tests__/health.test.ts | 109 +++++++++++++++++++++++++++++++++++ src/app.ts | 62 ++++++++++++++++---- 3 files changed, 172 insertions(+), 11 deletions(-) diff --git a/Dockerfile b/Dockerfile index 01fd4f2..8ed4751 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,4 +32,14 @@ USER appuser EXPOSE 3001 -CMD ["node", "dist/index.js"] +# Healthcheck: probe the /health endpoint on the exposed port. +# `wget` is bundled in busybox on the alpine image, so no extra packages are +# required. `--spider` performs a HEAD-style check without downloading the +# response body and exits non-zero for any HTTP 4xx/5xx (including our 503 +# "degraded" response) or network failure, which Docker correctly reports as +# unhealthy. `--timeout=8` is a defense-in-depth bound so `wget` exits cleanly +# before Docker's `--timeout=10s` SIGKILL kicks in. +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD wget --spider --quiet --tries=1 --timeout=8 http://127.0.0.1:3001/health || exit 1 + +CMD ["node", "dist/index.js"] diff --git a/src/__tests__/health.test.ts b/src/__tests__/health.test.ts index d9c0972..c28b940 100644 --- a/src/__tests__/health.test.ts +++ b/src/__tests__/health.test.ts @@ -66,3 +66,112 @@ describe("GET /health", () => { expect(typeof response.body.timestamp).toBe("number"); }); }); + +describe("GET /health resilience", () => { + // Allow generous time for module re-imports + supertest boot on slow CI. + jest.setTimeout(15000); + + const ORIGINAL_TIMEOUT_ENV = process.env.HEALTH_CHECK_TIMEOUT_MS; + + beforeEach(() => { + jest.resetModules(); + }); + + afterEach(() => { + // Always restore the env so a failed assertion can't leak state into + // other tests in the suite. + if (ORIGINAL_TIMEOUT_ENV === undefined) { + delete process.env.HEALTH_CHECK_TIMEOUT_MS; + } else { + process.env.HEALTH_CHECK_TIMEOUT_MS = ORIGINAL_TIMEOUT_ENV; + } + }); + + it("returns within a bounded timeout when a dependency hangs", async () => { + // Force a tight per-check timeout so the assertion window is short + // and the test does not depend on the 2-second default. + process.env.HEALTH_CHECK_TIMEOUT_MS = "200"; + + jest.unstable_mockModule("../db/connection.js", () => ({ + default: { + // Simulate a database that never settles — mirrors a stuck TCP socket. + query: jest.fn<() => Promise>( + () => new Promise(() => {}) as Promise, + ), + }, + query: jest.fn<() => Promise>( + () => new Promise(() => {}) as Promise, + ), + getClient: jest.fn(), + withTransaction: jest.fn(), + })); + + jest.unstable_mockModule("../services/cacheService.js", () => ({ + cacheService: { + ping: jest.fn<() => Promise>().mockResolvedValue("ok"), + }, + })); + + jest.unstable_mockModule("../services/sorobanService.js", () => ({ + sorobanService: { + ping: jest.fn<() => Promise>().mockResolvedValue("ok"), + }, + })); + + const { default: appWithStuckDb } = await import("../app.js"); + + const start = Date.now(); + const response = await request(appWithStuckDb).get("/health"); + const elapsed = Date.now() - start; + + // The per-check timeout is 200ms; assert the endpoint failed fast + // with the hung dep marked as error, well under jest's 5s window. + expect(elapsed).toBeLessThan(1500); + expect(response.status).toBe(503); + expect(response.body.checks.database).toBe("error"); + expect(response.body.status).toBe("down"); + }); + + it("honors a tiny HEALTH_CHECK_TIMEOUT_MS override", async () => { + process.env.HEALTH_CHECK_TIMEOUT_MS = "1"; + + jest.unstable_mockModule("../db/connection.js", () => ({ + default: { + query: jest.fn<() => Promise>().mockResolvedValue({ + rows: [], + rowCount: 0, + }), + }, + query: jest.fn<() => Promise>().mockResolvedValue({ + rows: [], + rowCount: 0, + }), + getClient: jest.fn(), + withTransaction: jest.fn(), + })); + + jest.unstable_mockModule("../services/cacheService.js", () => ({ + cacheService: { + ping: jest.fn<() => Promise>().mockResolvedValue("ok"), + }, + })); + + // A Soroban ping that hangs forever — the 1ms override must kick in. + jest.unstable_mockModule("../services/sorobanService.js", () => ({ + sorobanService: { + ping: jest.fn<() => Promise>( + () => new Promise(() => {}) as Promise, + ), + }, + })); + + const { default: appWithCustomTimeout } = await import("../app.js"); + + const start = Date.now(); + const response = await request(appWithCustomTimeout).get("/health"); + const elapsed = Date.now() - start; + + expect(elapsed).toBeLessThan(1000); + expect(response.body.checks.soroban_rpc).toBe("error"); + }); +}); diff --git a/src/app.ts b/src/app.ts index 5b33a56..bd07494 100644 --- a/src/app.ts +++ b/src/app.ts @@ -119,30 +119,72 @@ app.get("/", (req: Request, res: Response) => { res.send("RemitLend Backend is running"); }); +// Per-dependency healthcheck timeout (ms). Keeps the /health endpoint from +// hanging when a downstream probe stalls, which would otherwise block the +// Docker HEALTHCHECK probe from ever completing. +const HEALTH_CHECK_TIMEOUT_MS = Number.parseInt( + process.env.HEALTH_CHECK_TIMEOUT_MS ?? "2000", + 10, +); + +/** + * Race a promise against a timeout. If the promise does not settle within + * `ms`, resolve as `"error"` so the caller can record a degraded check + * without blocking the response. + */ +async function withTimeout( + promise: Promise, + ms: number, + fallback: T, +): Promise { + let timer: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + promise, + new Promise((resolve) => { + timer = setTimeout(() => resolve(fallback), ms); + }), + ]); + } finally { + if (timer) clearTimeout(timer); + } +} + app.get( "/health", asyncHandler(async (_req: Request, res: Response) => { - const [databaseStatus, redisStatus, sorobanStatus] = - await Promise.allSettled([ + const timeout = Number.isFinite(HEALTH_CHECK_TIMEOUT_MS) + ? HEALTH_CHECK_TIMEOUT_MS + : 2000; + + // Each individual check is wrapped in a per-check timeout (see + // `withTimeout`) so that a hung dependency degrades to "error" + // rather than blocking the whole endpoint. Because the wrappers + // guarantee every promise resolves, `Promise.all` is safe here — + // switching back to `Promise.allSettled` would not preserve this + // fast-fail behaviour. + const [databaseResult, redisResult, sorobanResult] = await Promise.all([ + withTimeout( pool .query("SELECT 1") .then(() => "ok" as const) .catch(() => "error" as const), - cacheService.ping(), - sorobanService.ping(), - ]); + timeout, + "error" as const, + ), + withTimeout(cacheService.ping(), timeout, "error" as const), + withTimeout(sorobanService.ping(), timeout, "error" as const), + ]); const dbChecks = { - database: - databaseStatus.status === "fulfilled" ? databaseStatus.value : "error", - redis: redisStatus.status === "fulfilled" ? redisStatus.value : "error", + database: databaseResult, + redis: redisResult, }; const checks = { api: "ok" as const, ...dbChecks, - soroban_rpc: - sorobanStatus.status === "fulfilled" ? sorobanStatus.value : "error", + soroban_rpc: sorobanResult, }; const coreOk = Object.values(dbChecks).every((c) => c === "ok"); From c3c4886fa0ce31fccb36697cf152739677d6b955 Mon Sep 17 00:00:00 2001 From: Sam-Rytech Date: Sun, 21 Jun 2026 14:54:36 +0100 Subject: [PATCH 12/16] Fix: resolve dangling imports and restore multi-instance lock semantics --- .../__tests__/webhookRetryProcessor.test.ts | 18 +++++--- src/services/webhookService.ts | 45 ++++++++++++++++--- src/tests/distributedLock.test.ts | 12 ++--- 3 files changed, 56 insertions(+), 19 deletions(-) diff --git a/src/services/__tests__/webhookRetryProcessor.test.ts b/src/services/__tests__/webhookRetryProcessor.test.ts index 13a841a..6b669ed 100644 --- a/src/services/__tests__/webhookRetryProcessor.test.ts +++ b/src/services/__tests__/webhookRetryProcessor.test.ts @@ -1,8 +1,5 @@ import { jest, describe, it, expect } from "@jest/globals"; -import { - WEBHOOK_RETRY_CONFIG, - getRetryDelayMs, -} from "../webhookService.js"; +import { WEBHOOK_RETRY_CONFIG, getRetryDelayMs } from "../webhookService.js"; describe("Webhook Retry Processor", () => { it("respects the configured backoff delays", () => { @@ -18,9 +15,16 @@ describe("Webhook Retry Processor", () => { }); it("caps the backoff delay at the last configured value", () => { - const maxDelay = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS.length - 1]; - expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS)).toBe(maxDelay); - expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS + 5)).toBe(maxDelay); + const maxDelay = + WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[ + WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS.length - 1 + ]; + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS)).toBe( + maxDelay, + ); + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS + 5)).toBe( + maxDelay, + ); }); it("configures exactly 4 max attempts", () => { diff --git a/src/services/webhookService.ts b/src/services/webhookService.ts index 59e378d..41d97b2 100644 --- a/src/services/webhookService.ts +++ b/src/services/webhookService.ts @@ -1,6 +1,7 @@ import crypto from "node:crypto"; import { query } from "../db/connection.js"; import logger from "../utils/logger.js"; +import { cacheService } from "./cacheService.js"; export const SUPPORTED_WEBHOOK_EVENT_TYPES = [ "LoanRequested", @@ -270,13 +271,15 @@ async function postWebhook( } } +const RETRY_DELAYS_MS = [ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, +] as const; + export const WEBHOOK_RETRY_CONFIG = { - RETRY_DELAYS_MS: [ - 5 * 60 * 1000, - 15 * 60 * 1000, - 45 * 60 * 1000, - ] as const, - MAX_RETRY_ATTEMPTS: 4, + RETRY_DELAYS_MS, + MAX_RETRY_ATTEMPTS: RETRY_DELAYS_MS.length + 1, }; export const getRetryDelayMs = (attemptNumber: number): number => { @@ -288,6 +291,28 @@ export const getRetryDelayMs = (attemptNumber: number): number => { export class WebhookService { // Retry processor that polls for pending retries static async processRetries(): Promise { + const LOCK_KEY = "webhook_retry_scheduler:running"; + const LOCK_TTL_SECONDS = 120; // 2 minutes + + let lockAcquired = false; + try { + const lockValue = `${Date.now()}-${Math.random().toString(16).slice(2)}`; + lockAcquired = await cacheService.setNotExists( + LOCK_KEY, + lockValue, + LOCK_TTL_SECONDS, + ); + } catch (error) { + logger.error("Failed to acquire webhook retry scheduler lock", { error }); + } + + if (!lockAcquired) { + logger.warn( + "Webhook retry processor run skipped - another instance is already running", + ); + return; + } + logger.info("Starting webhook retry processor"); try { @@ -337,6 +362,14 @@ export class WebhookService { } } catch (error) { logger.error("Error in webhook retry processor", { error }); + } finally { + try { + await cacheService.delete(LOCK_KEY); + } catch (error) { + logger.error("Failed to release webhook retry scheduler lock", { + error, + }); + } } } diff --git a/src/tests/distributedLock.test.ts b/src/tests/distributedLock.test.ts index 02c3d7e..86e6e4d 100644 --- a/src/tests/distributedLock.test.ts +++ b/src/tests/distributedLock.test.ts @@ -34,8 +34,8 @@ jest.unstable_mockModule("../db/connection.js", () => ({ withTransaction: jest.fn(), })); -const { retryFailedWebhooks } = - await import("../services/webhookRetryScheduler.js"); +const { WebhookService } = + await import("../services/webhookService.js"); const { scoreReconciliationService } = await import("../services/scoreReconciliationService.js"); const { runLoanDueCheck } = await import("../cron/loanCheckCron.js"); @@ -48,10 +48,10 @@ describe("distributed lock: schedulers skip when lock is held", () => { mockWarn.mockClear(); }); - describe("webhookRetryScheduler", () => { + describe("webhookRetryProcessor", () => { it("skips run when lock is not acquired", async () => { activeLocks.add("webhook_retry_scheduler:running"); - const result = await retryFailedWebhooks(); + const result = await WebhookService.processRetries(); expect(result).toBeUndefined(); expect(mockWarn).toHaveBeenCalledWith(expect.stringContaining("skipped")); }); @@ -90,8 +90,8 @@ describe("distributed lock: schedulers skip when lock is held", () => { activeLocks.clear(); // Trigger two concurrent runs - const promise1 = retryFailedWebhooks(); - const promise2 = retryFailedWebhooks(); + const promise1 = WebhookService.processRetries(); + const promise2 = WebhookService.processRetries(); await Promise.all([promise1, promise2]); From 569bd74265de808de102c5e73e0a42ff61df764d Mon Sep 17 00:00:00 2001 From: Sam-Rytech Date: Fri, 19 Jun 2026 10:11:47 +0100 Subject: [PATCH 13/16] Fix webhook retry processor and delete duplicate scheduler - Removed webhookRetryScheduler.ts as its logic conflicts with webhookService.ts - Extracted backoff config and max attempts into a shared config WEBHOOK_RETRY_CONFIG - Updated WebhookService.processRetries query to properly alias columns to avoid ambiguous references and to utilize the new config - Updated index.ts to wire up the surviving webhookRetryProcessor - Added test verifying retry processor respects backoff delays and max attempts Closes #123 --- src/index.ts | 12 +- .../__tests__/webhookRetryProcessor.test.ts | 29 +++++ src/services/webhookRetryScheduler.ts | 118 ------------------ src/services/webhookService.ts | 35 +++--- 4 files changed, 51 insertions(+), 143 deletions(-) create mode 100644 src/services/__tests__/webhookRetryProcessor.test.ts delete mode 100644 src/services/webhookRetryScheduler.ts diff --git a/src/index.ts b/src/index.ts index f99f9e4..896ccfa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,9 +17,9 @@ import { stopDefaultCheckerScheduler, } from "./services/defaultChecker.js"; import { - startWebhookRetryScheduler, - stopWebhookRetryScheduler, -} from "./services/webhookRetryScheduler.js"; + startWebhookRetryProcessor, + stopWebhookRetryProcessor, +} from "./services/webhookRetryProcessor.js"; import { eventStreamService } from "./services/eventStreamService.js"; import { startNotificationCleanupScheduler, @@ -61,8 +61,8 @@ const server = app.listen(port, () => { // Start periodic on-chain default checks (if configured) startDefaultCheckerScheduler(); - // Start webhook retry scheduler - startWebhookRetryScheduler(); + // Start webhook retry processor + startWebhookRetryProcessor(); // Start scheduled score reconciliation against on-chain state startScoreReconciliationScheduler(); @@ -87,7 +87,7 @@ const shutdown = async (signal: "SIGTERM" | "SIGINT") => { try { await stopIndexer(); stopDefaultCheckerScheduler(); - stopWebhookRetryScheduler(); + stopWebhookRetryProcessor(); stopScoreReconciliationScheduler(); stopNotificationCleanupScheduler(); diff --git a/src/services/__tests__/webhookRetryProcessor.test.ts b/src/services/__tests__/webhookRetryProcessor.test.ts new file mode 100644 index 0000000..13a841a --- /dev/null +++ b/src/services/__tests__/webhookRetryProcessor.test.ts @@ -0,0 +1,29 @@ +import { jest, describe, it, expect } from "@jest/globals"; +import { + WEBHOOK_RETRY_CONFIG, + getRetryDelayMs, +} from "../webhookService.js"; + +describe("Webhook Retry Processor", () => { + it("respects the configured backoff delays", () => { + expect(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS).toEqual([ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, + ]); + + expect(getRetryDelayMs(1)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[0]); + expect(getRetryDelayMs(2)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[1]); + expect(getRetryDelayMs(3)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[2]); + }); + + it("caps the backoff delay at the last configured value", () => { + const maxDelay = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS.length - 1]; + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS)).toBe(maxDelay); + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS + 5)).toBe(maxDelay); + }); + + it("configures exactly 4 max attempts", () => { + expect(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS).toBe(4); + }); +}); diff --git a/src/services/webhookRetryScheduler.ts b/src/services/webhookRetryScheduler.ts deleted file mode 100644 index cf62b26..0000000 --- a/src/services/webhookRetryScheduler.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { query } from "../db/connection.js"; -import logger from "../utils/logger.js"; -import { WebhookService, type WebhookEventType } from "./webhookService.js"; -import { cacheService } from "./cacheService.js"; - -const BACKOFF = [60, 300, 1800]; // seconds - -const LOCK_KEY = "webhook_retry_scheduler:running"; -const LOCK_TTL_SECONDS = 120; // 2 minutes - -let schedulerInterval: NodeJS.Timeout | null = null; - -async function markAsFailed(deliveryId: number) { - await query( - `UPDATE webhook_deliveries - SET next_retry_at = NULL, - last_error = $1, - updated_at = NOW() - WHERE id = $2`, - ["Permanently failed after max attempts reached", deliveryId], - ); - logger.error(`Webhook delivery ${deliveryId} marked as permanently failed.`); -} - -function shouldRetry(delivery: any, delay: number): boolean { - const lastAttempt = new Date(delivery.updated_at).getTime(); - const now = Date.now(); - return now >= lastAttempt + delay * 1000; -} - -async function sendWebhookAgain(delivery: any) { - logger.info( - `Retrying webhook delivery ${delivery.id} (attempt ${delivery.attempt_count + 1})`, - ); - - await WebhookService.retryWebhookDelivery( - delivery.id, - delivery.subscription_id, - delivery.callback_url, - delivery.secret || undefined, - delivery.event_id, - delivery.event_type as WebhookEventType, - delivery.payload, - delivery.attempt_count, - ); -} - -export async function retryFailedWebhooks() { - let lockAcquired = false; - try { - const lockValue = `${Date.now()}-${Math.random().toString(16).slice(2)}`; - lockAcquired = await cacheService.setNotExists( - LOCK_KEY, - lockValue, - LOCK_TTL_SECONDS, - ); - } catch (error) { - logger.error("Failed to acquire webhook retry scheduler lock", { error }); - } - - if (!lockAcquired) { - logger.warn( - "Webhook retry scheduler run skipped - another instance is already running", - ); - return; - } - - try { - const result = await query(` - SELECT wd.*, ws.max_attempts, ws.callback_url, ws.secret - FROM webhook_deliveries wd - JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id - WHERE wd.delivered_at IS NULL - AND (wd.next_retry_at IS NOT NULL OR wd.attempt_count = 0) - `); - - const failed = result.rows; - - for (const delivery of failed) { - const delay = BACKOFF[delivery.attempt_count] || 3600; - - if (delivery.attempt_count >= delivery.max_attempts) { - await markAsFailed(delivery.id); - continue; - } - - if (shouldRetry(delivery, delay)) { - await sendWebhookAgain(delivery); - } - } - } catch (error) { - logger.error("Error in webhook retry scheduler", { error }); - } finally { - try { - await cacheService.delete(LOCK_KEY); - } catch (error) { - logger.error("Failed to release webhook retry scheduler lock", { error }); - } - } -} - -export function startWebhookRetryScheduler() { - if (schedulerInterval) { - logger.warn("Webhook retry scheduler already running"); - return; - } - - logger.info("Starting webhook retry scheduler (60s interval)"); - schedulerInterval = setInterval(retryFailedWebhooks, 60000); -} - -export function stopWebhookRetryScheduler() { - if (schedulerInterval) { - logger.info("Stopping webhook retry scheduler"); - clearInterval(schedulerInterval); - schedulerInterval = null; - } -} diff --git a/src/services/webhookService.ts b/src/services/webhookService.ts index 964c14c..59e378d 100644 --- a/src/services/webhookService.ts +++ b/src/services/webhookService.ts @@ -270,22 +270,19 @@ async function postWebhook( } } -// Retry configuration for webhook delivery. -// This yields retry attempts at ~5m, ~15m, and ~45m after a failed delivery, -// for a total retry window a little over one hour after the initial attempt. -const RETRY_DELAYS_MS = [ - 5 * 60 * 1000, - 15 * 60 * 1000, - 45 * 60 * 1000, -] as const; - -const MAX_RETRY_ATTEMPTS = RETRY_DELAYS_MS.length + 1; +export const WEBHOOK_RETRY_CONFIG = { + RETRY_DELAYS_MS: [ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, + ] as const, + MAX_RETRY_ATTEMPTS: 4, +}; export const getRetryDelayMs = (attemptNumber: number): number => { - const delayIndex = Math.min(attemptNumber - 1, RETRY_DELAYS_MS.length - 1); - return ( - RETRY_DELAYS_MS[delayIndex] ?? RETRY_DELAYS_MS[RETRY_DELAYS_MS.length - 1]! - ); + const delays = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS; + const delayIndex = Math.min(attemptNumber - 1, delays.length - 1); + return delays[delayIndex] ?? delays[delays.length - 1]!; }; export class WebhookService { @@ -296,8 +293,8 @@ export class WebhookService { try { const now = new Date(); const result = await query( - `SELECT id, subscription_id, callback_url, secret, event_id, event_type, - payload, attempt_count + `SELECT wd.id, wd.subscription_id, ws.callback_url, ws.secret, wd.event_id, wd.event_type, + wd.payload, wd.attempt_count FROM webhook_deliveries wd JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id WHERE wd.delivered_at IS NULL @@ -306,7 +303,7 @@ export class WebhookService { AND wd.attempt_count < $2 ORDER BY wd.next_retry_at ASC LIMIT 100`, - [now, MAX_RETRY_ATTEMPTS], + [now, WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS], ); if (result.rows.length === 0) { @@ -397,7 +394,7 @@ export class WebhookService { } else { // Schedule next retry or mark as permanently failed const nextRetryTime = - newAttemptCount < MAX_RETRY_ATTEMPTS + newAttemptCount < WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS ? new Date(Date.now() + getRetryDelayMs(newAttemptCount)) : null; @@ -446,7 +443,7 @@ export class WebhookService { } catch (error) { const newAttemptCount = attemptCount + 1; const nextRetryTime = - newAttemptCount < MAX_RETRY_ATTEMPTS + newAttemptCount < WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS ? new Date(Date.now() + getRetryDelayMs(newAttemptCount)) : null; From c1a39e2c51dbfc59b43c622847d4482209f0abe4 Mon Sep 17 00:00:00 2001 From: Sam-Rytech Date: Sun, 21 Jun 2026 14:54:36 +0100 Subject: [PATCH 14/16] Fix: resolve dangling imports and restore multi-instance lock semantics --- .../__tests__/webhookRetryProcessor.test.ts | 18 +++++--- src/services/webhookService.ts | 45 ++++++++++++++++--- src/tests/distributedLock.test.ts | 12 ++--- 3 files changed, 56 insertions(+), 19 deletions(-) diff --git a/src/services/__tests__/webhookRetryProcessor.test.ts b/src/services/__tests__/webhookRetryProcessor.test.ts index 13a841a..6b669ed 100644 --- a/src/services/__tests__/webhookRetryProcessor.test.ts +++ b/src/services/__tests__/webhookRetryProcessor.test.ts @@ -1,8 +1,5 @@ import { jest, describe, it, expect } from "@jest/globals"; -import { - WEBHOOK_RETRY_CONFIG, - getRetryDelayMs, -} from "../webhookService.js"; +import { WEBHOOK_RETRY_CONFIG, getRetryDelayMs } from "../webhookService.js"; describe("Webhook Retry Processor", () => { it("respects the configured backoff delays", () => { @@ -18,9 +15,16 @@ describe("Webhook Retry Processor", () => { }); it("caps the backoff delay at the last configured value", () => { - const maxDelay = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS.length - 1]; - expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS)).toBe(maxDelay); - expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS + 5)).toBe(maxDelay); + const maxDelay = + WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[ + WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS.length - 1 + ]; + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS)).toBe( + maxDelay, + ); + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS + 5)).toBe( + maxDelay, + ); }); it("configures exactly 4 max attempts", () => { diff --git a/src/services/webhookService.ts b/src/services/webhookService.ts index 59e378d..41d97b2 100644 --- a/src/services/webhookService.ts +++ b/src/services/webhookService.ts @@ -1,6 +1,7 @@ import crypto from "node:crypto"; import { query } from "../db/connection.js"; import logger from "../utils/logger.js"; +import { cacheService } from "./cacheService.js"; export const SUPPORTED_WEBHOOK_EVENT_TYPES = [ "LoanRequested", @@ -270,13 +271,15 @@ async function postWebhook( } } +const RETRY_DELAYS_MS = [ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, +] as const; + export const WEBHOOK_RETRY_CONFIG = { - RETRY_DELAYS_MS: [ - 5 * 60 * 1000, - 15 * 60 * 1000, - 45 * 60 * 1000, - ] as const, - MAX_RETRY_ATTEMPTS: 4, + RETRY_DELAYS_MS, + MAX_RETRY_ATTEMPTS: RETRY_DELAYS_MS.length + 1, }; export const getRetryDelayMs = (attemptNumber: number): number => { @@ -288,6 +291,28 @@ export const getRetryDelayMs = (attemptNumber: number): number => { export class WebhookService { // Retry processor that polls for pending retries static async processRetries(): Promise { + const LOCK_KEY = "webhook_retry_scheduler:running"; + const LOCK_TTL_SECONDS = 120; // 2 minutes + + let lockAcquired = false; + try { + const lockValue = `${Date.now()}-${Math.random().toString(16).slice(2)}`; + lockAcquired = await cacheService.setNotExists( + LOCK_KEY, + lockValue, + LOCK_TTL_SECONDS, + ); + } catch (error) { + logger.error("Failed to acquire webhook retry scheduler lock", { error }); + } + + if (!lockAcquired) { + logger.warn( + "Webhook retry processor run skipped - another instance is already running", + ); + return; + } + logger.info("Starting webhook retry processor"); try { @@ -337,6 +362,14 @@ export class WebhookService { } } catch (error) { logger.error("Error in webhook retry processor", { error }); + } finally { + try { + await cacheService.delete(LOCK_KEY); + } catch (error) { + logger.error("Failed to release webhook retry scheduler lock", { + error, + }); + } } } diff --git a/src/tests/distributedLock.test.ts b/src/tests/distributedLock.test.ts index 02c3d7e..86e6e4d 100644 --- a/src/tests/distributedLock.test.ts +++ b/src/tests/distributedLock.test.ts @@ -34,8 +34,8 @@ jest.unstable_mockModule("../db/connection.js", () => ({ withTransaction: jest.fn(), })); -const { retryFailedWebhooks } = - await import("../services/webhookRetryScheduler.js"); +const { WebhookService } = + await import("../services/webhookService.js"); const { scoreReconciliationService } = await import("../services/scoreReconciliationService.js"); const { runLoanDueCheck } = await import("../cron/loanCheckCron.js"); @@ -48,10 +48,10 @@ describe("distributed lock: schedulers skip when lock is held", () => { mockWarn.mockClear(); }); - describe("webhookRetryScheduler", () => { + describe("webhookRetryProcessor", () => { it("skips run when lock is not acquired", async () => { activeLocks.add("webhook_retry_scheduler:running"); - const result = await retryFailedWebhooks(); + const result = await WebhookService.processRetries(); expect(result).toBeUndefined(); expect(mockWarn).toHaveBeenCalledWith(expect.stringContaining("skipped")); }); @@ -90,8 +90,8 @@ describe("distributed lock: schedulers skip when lock is held", () => { activeLocks.clear(); // Trigger two concurrent runs - const promise1 = retryFailedWebhooks(); - const promise2 = retryFailedWebhooks(); + const promise1 = WebhookService.processRetries(); + const promise2 = WebhookService.processRetries(); await Promise.all([promise1, promise2]); From ed3d6ee6766f66eb0a28f2f77a7795afda3962b2 Mon Sep 17 00:00:00 2001 From: Sam-Rytech Date: Sun, 21 Jun 2026 21:04:24 +0100 Subject: [PATCH 15/16] Fix: format distributedLock.test.ts --- src/tests/distributedLock.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tests/distributedLock.test.ts b/src/tests/distributedLock.test.ts index 86e6e4d..4467399 100644 --- a/src/tests/distributedLock.test.ts +++ b/src/tests/distributedLock.test.ts @@ -34,8 +34,7 @@ jest.unstable_mockModule("../db/connection.js", () => ({ withTransaction: jest.fn(), })); -const { WebhookService } = - await import("../services/webhookService.js"); +const { WebhookService } = await import("../services/webhookService.js"); const { scoreReconciliationService } = await import("../services/scoreReconciliationService.js"); const { runLoanDueCheck } = await import("../cron/loanCheckCron.js"); From e6af9c40e652ee6811dc636b6d0ab6771f0707d1 Mon Sep 17 00:00:00 2001 From: Sam_Rytech <107815081+Sam-Rytech@users.noreply.github.com> Date: Tue, 23 Jun 2026 21:29:15 +0000 Subject: [PATCH 16/16] Fix: resolve webhook retry processor conflict and share retry config --- src/tests/distributedLock.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tests/distributedLock.test.ts b/src/tests/distributedLock.test.ts index 86e6e4d..4467399 100644 --- a/src/tests/distributedLock.test.ts +++ b/src/tests/distributedLock.test.ts @@ -34,8 +34,7 @@ jest.unstable_mockModule("../db/connection.js", () => ({ withTransaction: jest.fn(), })); -const { WebhookService } = - await import("../services/webhookService.js"); +const { WebhookService } = await import("../services/webhookService.js"); const { scoreReconciliationService } = await import("../services/scoreReconciliationService.js"); const { runLoanDueCheck } = await import("../cron/loanCheckCron.js");