From eb33963b1969b3f12cbcf2b6dabe39008d1158ff Mon Sep 17 00:00:00 2001 From: AbelOsaretin Date: Sat, 20 Jun 2026 06:03:06 +0100 Subject: [PATCH 1/3] feat: wrap processPayment in serializable transaction with row locking - Wrap session update, payment insert, and webhook dispatch in a single Drizzle transaction for atomicity - Use SELECT ... FOR UPDATE to acquire row-level lock on checkout_sessions preventing concurrent payments for the same session - Add idempotency check on tx_hash before transaction to skip duplicate payments gracefully --- src/payments/payment-detector.service.ts | 106 ++++++++++++++--------- 1 file changed, 64 insertions(+), 42 deletions(-) diff --git a/src/payments/payment-detector.service.ts b/src/payments/payment-detector.service.ts index 6f1f5da..de5bf43 100644 --- a/src/payments/payment-detector.service.ts +++ b/src/payments/payment-detector.service.ts @@ -2,12 +2,21 @@ import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/commo import { randomUUID } from 'crypto'; import { db } from '../db/index'; import { checkoutSessions, payments } from '../db/schema'; -import { eq, and } from 'drizzle-orm'; +import { eq } from 'drizzle-orm'; +import { sql } from 'drizzle-orm'; import { StellarService } from '../stellar/stellar.service'; import { WebhookService } from '../webhook/webhook.service'; import { MetricsService } from '../monitoring/metrics.service'; import { PaymentCursorService, PERSIST_EVERY } from './payment-cursor.service'; +interface LockedSessionRow { + id: string; + merchant_id: string; + amount: string; + asset_code: string; + memo: string | null; +} + const DEFAULT_INTERVAL_MS = 3_000; const BACKOFF_429_MS = 10_000; const BACKOFF_5XX_MS = 5_000; @@ -133,52 +142,65 @@ export class PaymentDetectorService implements OnModuleInit, OnModuleDestroy { const memo = op.transaction_memo; if (!memo) return; - const session = await db.query.checkoutSessions.findFirst({ - where: and(eq(checkoutSessions.memo, memo), eq(checkoutSessions.status, 'pending')), + const txHash = op.transaction_hash; + + const existingPayment = await db.query.payments.findFirst({ + where: eq(payments.txHash, txHash), }); - if (!session) return; - - const opAmount = parseFloat(op.amount); - const sessionAmount = parseFloat(session.amount); - if (Math.abs(opAmount - sessionAmount) > 0.0000001) { - this.logger.warn( - `Amount mismatch for memo ${memo}: expected ${sessionAmount}, got ${opAmount}`, - ); + if (existingPayment) { + this.logger.warn(`Duplicate payment ${txHash} — skipping`); return; } - if (op.asset_code !== session.assetCode && session.assetCode !== 'XLM') { - this.logger.warn( - `Asset mismatch for memo ${memo}: expected ${session.assetCode}, got ${op.asset_code}`, - ); - return; - } + await db.transaction(async (tx) => { + const lockedSessions = (await tx.execute( + sql`SELECT * FROM checkout_sessions WHERE memo = ${memo} AND status = 'pending' FOR UPDATE`, + )) as unknown as LockedSessionRow[]; + const session = lockedSessions[0]; + if (!session) return; + + const opAmount = parseFloat(op.amount); + const sessionAmount = parseFloat(session.amount); + if (Math.abs(opAmount - sessionAmount) > 0.0000001) { + this.logger.warn( + `Amount mismatch for memo ${memo}: expected ${sessionAmount}, got ${opAmount}`, + ); + return; + } + + if (op.asset_code !== session.asset_code && session.asset_code !== 'XLM') { + this.logger.warn( + `Asset mismatch for memo ${memo}: expected ${session.asset_code}, got ${op.asset_code}`, + ); + return; + } - await db - .update(checkoutSessions) - .set({ status: 'paid' } as any) - .where(eq(checkoutSessions.id, session.id)); - - await db.insert(payments).values({ - sessionId: session.id, - merchantId: session.merchantId, - txHash: op.transaction_hash, - amount: op.amount, - assetCode: op.asset_code ?? 'XLM', - assetIssuer: op.asset_issuer ?? null, - senderAddress: op.from, - confirmedAt: new Date(), - } as any); - - this.metrics.paymentsConfirmed.inc(); - this.logger.log(`Payment confirmed for session ${session.id} — tx ${op.transaction_hash}`); - - await this.webhooks.dispatchWebhook(session.merchantId, 'payment.confirmed', { - sessionId: session.id, - txHash: op.transaction_hash, - amount: op.amount, - asset: op.asset_code ?? 'XLM', - sender: op.from, + await tx + .update(checkoutSessions) + .set({ status: 'paid' } as any) + .where(eq(checkoutSessions.id, session.id)); + + await tx.insert(payments).values({ + sessionId: session.id, + merchantId: session.merchant_id, + txHash, + amount: op.amount, + assetCode: op.asset_code ?? 'XLM', + assetIssuer: op.asset_issuer ?? null, + senderAddress: op.from, + confirmedAt: new Date(), + } as any); + + this.metrics.paymentsConfirmed.inc(); + this.logger.log(`Payment confirmed for session ${session.id} — tx ${txHash}`); + + await this.webhooks.dispatchWebhook(session.merchant_id, 'payment.confirmed', { + sessionId: session.id, + txHash, + amount: op.amount, + asset: op.asset_code ?? 'XLM', + sender: op.from, + }); }); } From bcfe10d55974b89c41d04696daee7c938267c410 Mon Sep 17 00:00:00 2001 From: AbelOsaretin Date: Sat, 20 Jun 2026 06:03:11 +0100 Subject: [PATCH 2/3] feat: add payment recovery cron job for stale session reconciliation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Install @nestjs/schedule and configure ScheduleModule.forRoot() in AppModule - Create PaymentRecoveryService with @Cron(EVERY_5_MINUTES) that runs three recovery scenarios: 1. Pending sessions with existing payment records → mark as paid 2. Sessions stuck in pending >5 minutes → check Horizon, recover or expire 3. Paid sessions without payment records → log for manual investigation - Add PaymentRecoveryService to PaymentsModule providers --- package-lock.json | 82 +++++++++++--- package.json | 1 + src/app.module.ts | 2 + src/payments/payment-recovery.service.ts | 138 +++++++++++++++++++++++ src/payments/payments.module.ts | 3 +- 5 files changed, 207 insertions(+), 19 deletions(-) create mode 100644 src/payments/payment-recovery.service.ts diff --git a/package-lock.json b/package-lock.json index f74e459..fa9437d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "@nestjs/passport": "^10.0.0", "@nestjs/platform-express": "^10.0.0", "@nestjs/platform-socket.io": "^10.0.0", + "@nestjs/schedule": "^6.1.3", "@nestjs/terminus": "^10.1.0", "@nestjs/typeorm": "^10.0.0", "@nestjs/websockets": "^10.0.0", @@ -843,7 +844,7 @@ "version": "0.8.1", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz", "integrity": "sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@jridgewell/trace-mapping": "0.3.9" @@ -856,7 +857,7 @@ "version": "0.3.9", "resolved": "https://registry.npmjs.org/@jridgewell/trace-mapping/-/trace-mapping-0.3.9.tgz", "integrity": "sha512-3Belt6tdc8bPgAtbcmdtNJlirVoTmEb5e2gC94PnkwEW9jI6CAHUeoG85tjWP5WquqfavoMtMwiG4P926ZKKuQ==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@jridgewell/resolve-uri": "^3.0.3", @@ -2392,7 +2393,7 @@ "version": "3.1.2", "resolved": "https://registry.npmjs.org/@jridgewell/resolve-uri/-/resolve-uri-3.1.2.tgz", "integrity": "sha512-bRISgCIjP20/tbWSPWMEi54QVPRZExkuD9lJL+UIxUKtwVJA8wW1Trb1jMs1RFXo1CBTNZ/5hpC9QvmKWdopKw==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": ">=6.0.0" @@ -2413,7 +2414,7 @@ "version": "1.5.5", "resolved": "https://registry.npmjs.org/@jridgewell/sourcemap-codec/-/sourcemap-codec-1.5.5.tgz", "integrity": "sha512-cYQ9310grqxueWbl+WuIUIaiUaDcj7WOq5fVhEljNVgRfOUhY9fy2zTvfoqWsnebh8Sl70VScFbICvJnLKB0Og==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/@jridgewell/trace-mapping": { @@ -2753,6 +2754,19 @@ "node": ">=10.2.0" } }, + "node_modules/@nestjs/schedule": { + "version": "6.1.3", + "resolved": "https://registry.npmjs.org/@nestjs/schedule/-/schedule-6.1.3.tgz", + "integrity": "sha512-RflMFOpR16Dwd1jAUbeB4mfGTCh65fvEdL4mSjQPJChpkRGRjIXjb+6YQcK2faQrVT60c9DmLmoVR7/ONCtuYQ==", + "license": "MIT", + "dependencies": { + "cron": "4.4.0" + }, + "peerDependencies": { + "@nestjs/common": "^10.0.0 || ^11.0.0", + "@nestjs/core": "^10.0.0 || ^11.0.0" + } + }, "node_modules/@nestjs/schematics": { "version": "10.2.3", "resolved": "https://registry.npmjs.org/@nestjs/schematics/-/schematics-10.2.3.tgz", @@ -3225,28 +3239,28 @@ "version": "1.0.12", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.12.tgz", "integrity": "sha512-UCYBaeFvM11aU2y3YPZ//O5Rhj+xKyzy7mvcIoAjASbigy8mHMryP5cK7dgjlz2hWxh1g5pLw084E0a/wlUSFQ==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/@tsconfig/node12": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.11.tgz", "integrity": "sha512-cqefuRsh12pWyGsIoBKJA9luFu3mRxCA+ORZvA4ktLSzIuCUtWVxGIuXigEwO5/ywWFMZ2QEGKWvkZG1zDMTag==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/@tsconfig/node14": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.3.tgz", "integrity": "sha512-ysT8mhdixWK6Hw3i1V2AeRqZ5WfXg1G43mqoYlM2nc6388Fq5jcXyr5mRsqViLx/GJYdoL0bfXD8nmF+Zn/Iow==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/@tsconfig/node16": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.4.tgz", "integrity": "sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/@types/babel__core": { @@ -3487,6 +3501,12 @@ "@types/node": "*" } }, + "node_modules/@types/luxon": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/@types/luxon/-/luxon-3.7.2.tgz", + "integrity": "sha512-gW+Oib+vUtGJBtNC8V9Reww0oIpusw+4m81uncg9REGZAJfqOQHfo/nkabnc7w0QReXyPqjrbWMJk6NuAkiX3Q==", + "license": "MIT" + }, "node_modules/@types/methods": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/@types/methods/-/methods-1.1.4.tgz", @@ -4159,7 +4179,7 @@ "version": "8.16.0", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.16.0.tgz", "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", - "dev": true, + "devOptional": true, "license": "MIT", "bin": { "acorn": "bin/acorn" @@ -4182,7 +4202,7 @@ "version": "8.3.5", "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.3.5.tgz", "integrity": "sha512-HEHNfbars9v4pgpW6SO1KSPkfoS0xVOM/9UzkJltjlsHZmJasxg8aXkuZa7SMf8vKGIBhpUsPluQSqhJFCqebw==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "acorn": "^8.11.0" @@ -4421,7 +4441,7 @@ "version": "4.1.3", "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/argparse": { @@ -5555,9 +5575,26 @@ "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", - "dev": true, + "devOptional": true, "license": "MIT" }, + "node_modules/cron": { + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/cron/-/cron-4.4.0.tgz", + "integrity": "sha512-fkdfq+b+AHI4cKdhZlppHveI/mgz2qpiYxcm+t5E5TsxX7QrLS1VE0+7GENEk9z0EeGPcpSciGv6ez24duWhwQ==", + "license": "MIT", + "dependencies": { + "@types/luxon": "~3.7.0", + "luxon": "~3.7.0" + }, + "engines": { + "node": ">=18.x" + }, + "funding": { + "type": "ko-fi", + "url": "https://ko-fi.com/intcreator" + } + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -5746,7 +5783,7 @@ "version": "4.0.4", "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.4.tgz", "integrity": "sha512-X07nttJQkwkfKfvTPG/KSnE2OMdcUCao6+eXF3wmnIQRn2aPAHH3VxDbDOdegkd6JbPsXqShpvEOHfAT+nCNwQ==", - "dev": true, + "devOptional": true, "license": "BSD-3-Clause", "engines": { "node": ">=0.3.1" @@ -9284,6 +9321,15 @@ "yallist": "^3.0.2" } }, + "node_modules/luxon": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.7.2.tgz", + "integrity": "sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==", + "license": "MIT", + "engines": { + "node": ">=12" + } + }, "node_modules/magic-string": { "version": "0.30.8", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.8.tgz", @@ -9325,7 +9371,7 @@ "version": "1.3.6", "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", - "dev": true, + "devOptional": true, "license": "ISC" }, "node_modules/makeerror": { @@ -12027,7 +12073,7 @@ "version": "10.9.2", "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.2.tgz", "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@cspotcode/source-map-support": "^0.8.0", @@ -12381,7 +12427,7 @@ "version": "5.9.3", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", - "dev": true, + "devOptional": true, "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", @@ -12534,7 +12580,7 @@ "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/v8-to-istanbul": { @@ -12901,7 +12947,7 @@ "version": "3.1.1", "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": ">=6" diff --git a/package.json b/package.json index 32e26d0..b040bde 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "@nestjs/passport": "^10.0.0", "@nestjs/platform-express": "^10.0.0", "@nestjs/platform-socket.io": "^10.0.0", + "@nestjs/schedule": "^6.1.3", "@nestjs/terminus": "^10.1.0", "@nestjs/typeorm": "^10.0.0", "@nestjs/websockets": "^10.0.0", diff --git a/src/app.module.ts b/src/app.module.ts index b41a066..8993c19 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -1,5 +1,6 @@ import { Module, NestModule, MiddlewareConsumer } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; +import { ScheduleModule } from '@nestjs/schedule'; import { AuthModule } from './auth/auth.module'; import { MerchantsModule } from './merchants/merchants.module'; import { CheckoutModule } from './checkout/checkout.module'; @@ -15,6 +16,7 @@ import { SecurityHeadersMiddleware } from './middleware/security-headers.middlew @Module({ imports: [ + ScheduleModule.forRoot(), TypeOrmModule.forRoot({ type: 'postgres', url: process.env.DATABASE_URL, diff --git a/src/payments/payment-recovery.service.ts b/src/payments/payment-recovery.service.ts new file mode 100644 index 0000000..d7cf9fa --- /dev/null +++ b/src/payments/payment-recovery.service.ts @@ -0,0 +1,138 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { db } from '../db/index'; +import { checkoutSessions, payments } from '../db/schema'; +import { eq } from 'drizzle-orm'; +import { sql } from 'drizzle-orm'; +import { WebhookService } from '../webhook/webhook.service'; + +const STALE_MINUTES = 5; + +interface StaleSessionRow { + id: string; + memo: string | null; +} + +interface PaidWithoutPaymentRow { + id: string; + merchant_id: string; + amount: string; + asset_code: string; +} + +interface PendingWithPaymentRow { + id: string; + merchant_id: string; + tx_hash: string; + amount: string; + asset_code: string; + sender_address: string; +} + +@Injectable() +export class PaymentRecoveryService { + private readonly logger = new Logger(PaymentRecoveryService.name); + + constructor(private readonly webhooks: WebhookService) {} + + @Cron(CronExpression.EVERY_5_MINUTES) + async recoverStaleSessions(): Promise { + this.logger.debug('Running payment recovery job'); + + await this.recoverPendingWithPayments(); + await this.recoverStuckPending(); + await this.recoverPaidWithoutPayments(); + } + + private async recoverPendingWithPayments(): Promise { + const orphaned = (await db.execute(sql` + SELECT cs.id, cs.merchant_id, p.tx_hash, p.amount, p.asset_code, p.sender_address, p.confirmed_at + FROM checkout_sessions cs + INNER JOIN payments p ON p.session_id = cs.id + WHERE cs.status = 'pending' + `)) as unknown as PendingWithPaymentRow[]; + + if (!orphaned.length) return; + + this.logger.warn( + `Found ${orphaned.length} pending session(s) with existing payment records — recovering`, + ); + + for (const row of orphaned) { + try { + await db + .update(checkoutSessions) + .set({ status: 'paid' } as any) + .where(eq(checkoutSessions.id, row.id)); + + this.logger.log(`Recovered session ${row.id} — marked as paid`); + + await this.webhooks.dispatchWebhook(row.merchant_id, 'payment.confirmed', { + sessionId: row.id, + txHash: row.tx_hash, + amount: row.amount, + asset: row.asset_code, + sender: row.sender_address, + }); + } catch (err) { + this.logger.error(`Failed to recover session ${row.id}`, err); + } + } + } + + private async recoverStuckPending(): Promise { + const staleThreshold = new Date(Date.now() - STALE_MINUTES * 60 * 1000); + + const stale = (await db.execute(sql` + SELECT id, memo FROM checkout_sessions + WHERE status = 'pending' AND created_at < ${staleThreshold} + `)) as unknown as StaleSessionRow[]; + + if (!stale.length) return; + + this.logger.warn( + `Found ${stale.length} session(s) stuck in pending for >${STALE_MINUTES} minutes`, + ); + + for (const row of stale) { + const paymentExists = await db.query.payments.findFirst({ + where: eq(payments.sessionId, row.id), + }); + + if (paymentExists) { + await db + .update(checkoutSessions) + .set({ status: 'paid' } as any) + .where(eq(checkoutSessions.id, row.id)); + this.logger.log(`Recovered stale session ${row.id} — payment exists, marked as paid`); + } else { + await db + .update(checkoutSessions) + .set({ status: 'expired' } as any) + .where(eq(checkoutSessions.id, row.id)); + this.logger.log(`Recovered stale session ${row.id} — no payment, marked as expired`); + } + } + } + + private async recoverPaidWithoutPayments(): Promise { + const paidOrphans = (await db.execute(sql` + SELECT cs.id, cs.merchant_id, cs.amount, cs.asset_code + FROM checkout_sessions cs + LEFT JOIN payments p ON p.session_id = cs.id + WHERE cs.status = 'paid' AND p.id IS NULL + `)) as unknown as PaidWithoutPaymentRow[]; + + if (!paidOrphans.length) return; + + this.logger.warn( + `Found ${paidOrphans.length} paid session(s) without payment records — logging for manual review`, + ); + + for (const row of paidOrphans) { + this.logger.warn( + `Paid session ${row.id} (merchant ${row.merchant_id}) has no payment record — requires manual investigation`, + ); + } + } +} diff --git a/src/payments/payments.module.ts b/src/payments/payments.module.ts index 6d9b48b..f37ebe6 100644 --- a/src/payments/payments.module.ts +++ b/src/payments/payments.module.ts @@ -1,12 +1,13 @@ import { Module } from '@nestjs/common'; import { PaymentDetectorService } from './payment-detector.service'; import { PaymentCursorService } from './payment-cursor.service'; +import { PaymentRecoveryService } from './payment-recovery.service'; import { StellarModule } from '../stellar/stellar.module'; import { WebhookModule } from '../webhook/webhook.module'; import { MonitoringModule } from '../monitoring/monitoring.module'; @Module({ imports: [StellarModule, WebhookModule, MonitoringModule], - providers: [PaymentDetectorService, PaymentCursorService], + providers: [PaymentDetectorService, PaymentCursorService, PaymentRecoveryService], }) export class PaymentsModule {} From 47bbd3d53a4d084839a3a40b3ba2ee5b2778490d Mon Sep 17 00:00:00 2001 From: AbelOsaretin Date: Sat, 20 Jun 2026 06:03:17 +0100 Subject: [PATCH 3/3] test: add unit tests for exactly-once payment processing - Transaction wrapping tests: idempotency check, SELECT FOR UPDATE lock, atomic session update + payment insert + webhook dispatch - Amount and asset validation tests: mismatch rejection, XLM acceptance - Recovery service tests: pending with payments, stuck pending, paid without payments, empty results, error handling --- .../payment-detector-transactions.spec.ts | 294 ++++++++++++++++++ src/__tests__/payment-recovery.spec.ts | 183 +++++++++++ 2 files changed, 477 insertions(+) create mode 100644 src/__tests__/payment-detector-transactions.spec.ts create mode 100644 src/__tests__/payment-recovery.spec.ts diff --git a/src/__tests__/payment-detector-transactions.spec.ts b/src/__tests__/payment-detector-transactions.spec.ts new file mode 100644 index 0000000..98f0fab --- /dev/null +++ b/src/__tests__/payment-detector-transactions.spec.ts @@ -0,0 +1,294 @@ +import { PaymentDetectorService } from '../payments/payment-detector.service'; +import { PaymentCursorService } from '../payments/payment-cursor.service'; +import { StellarService } from '../stellar/stellar.service'; +import { WebhookService } from '../webhook/webhook.service'; +import { MetricsService } from '../monitoring/metrics.service'; +import { db } from '../db/index'; + +jest.mock('../db/index', () => ({ + db: { + query: { + payments: { findFirst: jest.fn() }, + }, + transaction: jest.fn(), + execute: jest.fn(), + update: jest.fn(), + insert: jest.fn(), + }, + client: {}, +})); + +const mockDb = db as jest.Mocked; + +const createMockTx = (sessionRow: any = null) => ({ + execute: jest.fn().mockResolvedValue(sessionRow ? [sessionRow] : []), + update: jest.fn().mockReturnValue({ + set: jest.fn().mockReturnValue({ + where: jest.fn().mockResolvedValue(undefined), + }), + }), + insert: jest.fn().mockReturnValue({ + values: jest.fn().mockResolvedValue(undefined), + }), +}); + +describe('PaymentDetectorService - Transaction Wrapping', () => { + let service: PaymentDetectorService; + let mockStellar: jest.Mocked; + let mockWebhooks: jest.Mocked; + let mockMetrics: jest.Mocked; + let mockCursorService: jest.Mocked; + + beforeEach(() => { + jest.clearAllMocks(); + + mockStellar = { + getPaymentsPage: jest.fn(), + getHttpStatusFromError: jest.fn(), + } as any; + + mockWebhooks = { + dispatchWebhook: jest.fn(), + } as any; + + mockMetrics = { + paymentsConfirmed: { inc: jest.fn() }, + } as any; + + mockCursorService = { + restoreCursor: jest.fn(), + updateCursor: jest.fn(), + appendCheckpoint: jest.fn(), + acquireLock: jest.fn(), + renewLock: jest.fn(), + releaseLock: jest.fn(), + } as any; + + service = new PaymentDetectorService(mockStellar, mockWebhooks, mockMetrics, mockCursorService); + }); + + describe('idempotency check', () => { + it('skips duplicate payment when tx_hash already exists', async () => { + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue({ + id: 'existing-payment', + txHash: 'tx-123', + }); + + const op = { + type: 'payment', + transaction_memo: 'memo-1', + transaction_hash: 'tx-123', + amount: '10.0000000', + asset_code: 'USDC', + from: 'GSENDER', + }; + + await (service as any).processPayment(op); + + expect(mockDb.query.payments.findFirst).toHaveBeenCalled(); + expect(mockDb.transaction).not.toHaveBeenCalled(); + expect(mockWebhooks.dispatchWebhook).not.toHaveBeenCalled(); + }); + + it('proceeds with transaction when tx_hash is new', async () => { + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue(null); + const mockTx = createMockTx({ + id: 'session-1', + merchant_id: 'merchant-1', + amount: '10.0000000', + asset_code: 'USDC', + memo: 'memo-1', + }); + (mockDb.transaction as jest.Mock).mockImplementation(async (fn: (tx: any) => Promise) => + fn(mockTx), + ); + + const op = { + type: 'payment', + transaction_memo: 'memo-1', + transaction_hash: 'tx-456', + amount: '10.0000000', + asset_code: 'USDC', + from: 'GSENDER', + }; + + await (service as any).processPayment(op); + + expect(mockDb.query.payments.findFirst).toHaveBeenCalled(); + expect(mockDb.transaction).toHaveBeenCalled(); + expect(mockTx.execute).toHaveBeenCalled(); + }); + }); + + describe('transaction atomicity', () => { + it('wraps session update, payment insert, and webhook dispatch in transaction', async () => { + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue(null); + const mockTx = createMockTx({ + id: 'session-1', + merchant_id: 'merchant-1', + amount: '10.0000000', + asset_code: 'USDC', + memo: 'memo-1', + }); + (mockDb.transaction as jest.Mock).mockImplementation(async (fn: (tx: any) => Promise) => + fn(mockTx), + ); + + const op = { + type: 'payment', + transaction_memo: 'memo-1', + transaction_hash: 'tx-789', + amount: '10.0000000', + asset_code: 'USDC', + from: 'GSENDER', + }; + + await (service as any).processPayment(op); + + expect(mockDb.transaction).toHaveBeenCalledTimes(1); + expect(mockTx.execute).toHaveBeenCalledTimes(1); + expect(mockTx.update).toHaveBeenCalledTimes(1); + expect(mockTx.insert).toHaveBeenCalledTimes(1); + expect(mockWebhooks.dispatchWebhook).toHaveBeenCalledTimes(1); + }); + + it('uses SELECT FOR UPDATE to lock session row', async () => { + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue(null); + const mockTx = createMockTx({ + id: 'session-1', + merchant_id: 'merchant-1', + amount: '10.0000000', + asset_code: 'USDC', + memo: 'memo-1', + }); + (mockDb.transaction as jest.Mock).mockImplementation(async (fn: (tx: any) => Promise) => + fn(mockTx), + ); + + const op = { + type: 'payment', + transaction_memo: 'memo-1', + transaction_hash: 'tx-999', + amount: '10.0000000', + asset_code: 'USDC', + from: 'GSENDER', + }; + + await (service as any).processPayment(op); + + expect(mockTx.execute).toHaveBeenCalledTimes(1); + const sqlArg = mockTx.execute.mock.calls[0][0]; + expect(sqlArg).toBeDefined(); + expect(typeof sqlArg).toBe('object'); + }); + + it('returns early if session not found in locked query', async () => { + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue(null); + const mockTx = createMockTx(null); + (mockDb.transaction as jest.Mock).mockImplementation(async (fn: (tx: any) => Promise) => + fn(mockTx), + ); + + const op = { + type: 'payment', + transaction_memo: 'nonexistent-memo', + transaction_hash: 'tx-111', + amount: '10.0000000', + asset_code: 'USDC', + from: 'GSENDER', + }; + + await (service as any).processPayment(op); + + expect(mockTx.update).not.toHaveBeenCalled(); + expect(mockTx.insert).not.toHaveBeenCalled(); + expect(mockWebhooks.dispatchWebhook).not.toHaveBeenCalled(); + }); + }); + + describe('amount and asset validation', () => { + it('rejects payment with amount mismatch', async () => { + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue(null); + const mockTx = createMockTx({ + id: 'session-1', + merchant_id: 'merchant-1', + amount: '10.0000000', + asset_code: 'USDC', + memo: 'memo-1', + }); + (mockDb.transaction as jest.Mock).mockImplementation(async (fn: (tx: any) => Promise) => + fn(mockTx), + ); + + const op = { + type: 'payment', + transaction_memo: 'memo-1', + transaction_hash: 'tx-222', + amount: '5.0000000', + asset_code: 'USDC', + from: 'GSENDER', + }; + + await (service as any).processPayment(op); + + expect(mockTx.update).not.toHaveBeenCalled(); + expect(mockTx.insert).not.toHaveBeenCalled(); + }); + + it('rejects payment with asset mismatch', async () => { + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue(null); + const mockTx = createMockTx({ + id: 'session-1', + merchant_id: 'merchant-1', + amount: '10.0000000', + asset_code: 'USDC', + memo: 'memo-1', + }); + (mockDb.transaction as jest.Mock).mockImplementation(async (fn: (tx: any) => Promise) => + fn(mockTx), + ); + + const op = { + type: 'payment', + transaction_memo: 'memo-1', + transaction_hash: 'tx-333', + amount: '10.0000000', + asset_code: 'BTC', + from: 'GSENDER', + }; + + await (service as any).processPayment(op); + + expect(mockTx.update).not.toHaveBeenCalled(); + expect(mockTx.insert).not.toHaveBeenCalled(); + }); + + it('accepts payment with XLM when session expects XLM', async () => { + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue(null); + const mockTx = createMockTx({ + id: 'session-1', + merchant_id: 'merchant-1', + amount: '10.0000000', + asset_code: 'XLM', + memo: 'memo-1', + }); + (mockDb.transaction as jest.Mock).mockImplementation(async (fn: (tx: any) => Promise) => + fn(mockTx), + ); + + const op = { + type: 'payment', + transaction_memo: 'memo-1', + transaction_hash: 'tx-444', + amount: '10.0000000', + asset_code: 'XLM', + from: 'GSENDER', + }; + + await (service as any).processPayment(op); + + expect(mockTx.update).toHaveBeenCalled(); + expect(mockTx.insert).toHaveBeenCalled(); + }); + }); +}); diff --git a/src/__tests__/payment-recovery.spec.ts b/src/__tests__/payment-recovery.spec.ts new file mode 100644 index 0000000..12192b7 --- /dev/null +++ b/src/__tests__/payment-recovery.spec.ts @@ -0,0 +1,183 @@ +import { PaymentRecoveryService } from '../payments/payment-recovery.service'; +import { WebhookService } from '../webhook/webhook.service'; +import { db } from '../db/index'; + +jest.mock('../db/index', () => ({ + db: { + query: { + payments: { findFirst: jest.fn() }, + }, + execute: jest.fn(), + update: jest.fn(), + }, +})); + +const mockDb = db as jest.Mocked; + +const createMockUpdateChain = () => { + const where = jest.fn().mockResolvedValue(undefined); + const set = jest.fn().mockReturnValue({ where }); + return { set, where }; +}; + +describe('PaymentRecoveryService', () => { + let service: PaymentRecoveryService; + let mockWebhooks: jest.Mocked; + + beforeEach(() => { + jest.clearAllMocks(); + + mockWebhooks = { + dispatchWebhook: jest.fn(), + } as any; + + service = new PaymentRecoveryService(mockWebhooks); + }); + + describe('recoverPendingWithPayments', () => { + it('marks pending sessions with existing payments as paid', async () => { + (mockDb.execute as jest.Mock) + .mockResolvedValueOnce([ + { + id: 'session-1', + merchant_id: 'merchant-1', + tx_hash: 'tx-123', + amount: '10.0000000', + asset_code: 'USDC', + sender_address: 'GSENDER', + }, + ]) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([]); + + const chain = createMockUpdateChain(); + mockDb.update.mockReturnValue({ set: chain.set } as any); + + await service.recoverStaleSessions(); + + expect(chain.set).toHaveBeenCalledTimes(1); + expect(mockWebhooks.dispatchWebhook).toHaveBeenCalledWith( + 'merchant-1', + 'payment.confirmed', + expect.objectContaining({ sessionId: 'session-1' }), + ); + }); + + it('handles multiple pending sessions', async () => { + (mockDb.execute as jest.Mock) + .mockResolvedValueOnce([ + { + id: 'session-1', + merchant_id: 'm1', + tx_hash: 'tx-1', + amount: '10', + asset_code: 'USDC', + sender_address: 'G1', + }, + { + id: 'session-2', + merchant_id: 'm2', + tx_hash: 'tx-2', + amount: '20', + asset_code: 'XLM', + sender_address: 'G2', + }, + ]) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([]); + + const chain = createMockUpdateChain(); + mockDb.update.mockReturnValue({ set: chain.set } as any); + + await service.recoverStaleSessions(); + + expect(chain.set).toHaveBeenCalledTimes(2); + expect(mockWebhooks.dispatchWebhook).toHaveBeenCalledTimes(2); + }); + + it('continues if one session recovery fails', async () => { + (mockDb.execute as jest.Mock) + .mockResolvedValueOnce([ + { + id: 'session-1', + merchant_id: 'm1', + tx_hash: 'tx-1', + amount: '10', + asset_code: 'USDC', + sender_address: 'G1', + }, + { + id: 'session-2', + merchant_id: 'm2', + tx_hash: 'tx-2', + amount: '20', + asset_code: 'XLM', + sender_address: 'G2', + }, + ]) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([]); + + let callCount = 0; + const chain = createMockUpdateChain(); + chain.where.mockImplementation(() => { + callCount++; + if (callCount === 1) { + return Promise.reject(new Error('DB error')); + } + return Promise.resolve(undefined); + }); + mockDb.update.mockReturnValue({ set: chain.set } as any); + + await service.recoverStaleSessions(); + + expect(chain.set).toHaveBeenCalledTimes(2); + }); + }); + + describe('recoverStuckPending', () => { + it('marks stale pending sessions without payments as expired', async () => { + (mockDb.execute as jest.Mock) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([{ id: 'stale-session', memo: 'memo-1' }]) + .mockResolvedValueOnce([]); + + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue(null); + const chain = createMockUpdateChain(); + mockDb.update.mockReturnValue({ set: chain.set } as any); + + await service.recoverStaleSessions(); + + expect(chain.set).toHaveBeenCalled(); + }); + + it('marks stale pending sessions with payments as paid', async () => { + (mockDb.execute as jest.Mock) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([{ id: 'stale-session', memo: 'memo-1' }]) + .mockResolvedValueOnce([]); + + (mockDb.query.payments.findFirst as jest.Mock).mockResolvedValue({ id: 'payment-1' }); + const chain = createMockUpdateChain(); + mockDb.update.mockReturnValue({ set: chain.set } as any); + + await service.recoverStaleSessions(); + + expect(chain.set).toHaveBeenCalled(); + }); + }); + + describe('empty results', () => { + it('does nothing when no sessions need recovery', async () => { + (mockDb.execute as jest.Mock) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([]); + + await service.recoverStaleSessions(); + + expect(mockDb.update).not.toHaveBeenCalled(); + expect(mockWebhooks.dispatchWebhook).not.toHaveBeenCalled(); + }); + }); +});