Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion apps/backend/src/app/api/payments/checkout/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NextRequest, NextResponse } from 'next/server';
import { z } from 'zod';
import { withAuth } from '@/lib/api/with-auth';
import { paymentService } from '@/services/payment.service';
import { paymentService, CheckoutLockError } from '@/services/payment.service';
import { getValidPriceIds } from '@/lib/stripe/pricing';

const checkoutSchema = z.object({
Expand Down Expand Up @@ -42,6 +42,12 @@ export const POST = withAuth(async (req: NextRequest, { user }) => {
);
return NextResponse.json(session);
} catch (error: any) {
if (error instanceof CheckoutLockError) {
return NextResponse.json(
{ error: 'Checkout already in progress. Please retry shortly.' },
{ status: 409, headers: { 'Retry-After': '10' } }
);
}
console.error('Error creating checkout session:', error);
const isClientError = error.message === 'User email not found';
return NextResponse.json(
Expand Down
45 changes: 45 additions & 0 deletions apps/backend/src/lib/supabase/supabase-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { createClient } from './server';

/**
* Acquires a Supabase advisory lock using pg_try_advisory_lock with a polling
* loop until the lock is acquired or the timeout elapses.
*
* Returns true if the lock was acquired, false if the timeout expired.
*/
export async function acquireAdvisoryLock(key: string, timeoutMs: number): Promise<boolean> {
const supabase = createClient();
const lockId = hashKey(key);
const deadline = Date.now() + timeoutMs;

while (Date.now() < deadline) {
const { data, error } = await supabase.rpc('pg_try_advisory_lock', { lock_id: lockId });
if (error) throw new Error(`Advisory lock error: ${error.message}`);
if (data === true) return true;
// Brief sleep before retry to avoid tight loop
await sleep(100);
}
return false;
}

/**
* Releases a previously acquired advisory lock.
*/
export async function releaseAdvisoryLock(key: string): Promise<void> {
const supabase = createClient();
const lockId = hashKey(key);
await supabase.rpc('pg_advisory_unlock', { lock_id: lockId });
}

/** Deterministically map a string key to a 32-bit integer lock ID. */
function hashKey(key: string): number {
let hash = 5381;
for (let i = 0; i < key.length; i++) {
hash = ((hash << 5) + hash) ^ key.charCodeAt(i);
}
// Convert to unsigned 32-bit so Postgres bigint won't get negative values
return hash >>> 0;
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
37 changes: 36 additions & 1 deletion apps/backend/src/services/payment.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,27 @@ import { stripe } from '@/lib/stripe/client';
import { getTierFromPriceId } from '@/lib/stripe/pricing';
import { getTaxConfiguration, buildCheckoutTaxParams, buildTaxExemptUpdate, type TaxExemptStatus } from '@/lib/stripe/tax';
import { createClient } from '@/lib/supabase/server';
import { acquireAdvisoryLock, releaseAdvisoryLock } from '@/lib/supabase/supabase-lock';
import { paymentIdempotencyService } from './payment-idempotency.service';
import { invoiceDeliveryService } from './invoice-delivery.service';

import type {
CheckoutSession,
SubscriptionStatus,
StripeEvent,
} from '@craft/types';

const CHECKOUT_LOCK_TIMEOUT_MS = 10_000;

/** Thrown when the checkout advisory lock cannot be acquired (concurrent request in progress). */
export class CheckoutLockError extends Error {
readonly retryAfterMs = CHECKOUT_LOCK_TIMEOUT_MS;
constructor() {
super('Checkout already in progress for this user');
this.name = 'CheckoutLockError';
}
}

/**
* PaymentService
*
Expand All @@ -32,13 +45,35 @@ import type {
export class PaymentService {
/**
* Create a Stripe checkout session for subscription with idempotency guarantees.
* Retried calls with the same user/priceId will return the same session ID.
* A distributed advisory lock (payment_checkout_{userId}) prevents concurrent
* duplicate sessions from the same user. Returns the acquired session.
*
* Throws CheckoutLockError when the lock cannot be acquired within 10 seconds.
*/
async createCheckoutSession(
userId: string,
priceId: string,
successUrl?: string,
cancelUrl?: string
): Promise<CheckoutSession> {
const lockKey = `payment_checkout_${userId}`;
const acquired = await acquireAdvisoryLock(lockKey, CHECKOUT_LOCK_TIMEOUT_MS);
if (!acquired) {
throw new CheckoutLockError();
}

try {
return await this._createCheckoutSessionInner(userId, priceId, successUrl, cancelUrl);
} finally {
await releaseAdvisoryLock(lockKey);
}
}

private async _createCheckoutSessionInner(
userId: string,
priceId: string,
successUrl?: string,
cancelUrl?: string
): Promise<CheckoutSession> {
const supabase = createClient();

Expand Down
98 changes: 98 additions & 0 deletions apps/backend/tests/payments/checkout-distributed-lock.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import * as lockModule from '@/lib/supabase/supabase-lock';
import { PaymentService, CheckoutLockError } from '@/services/payment.service';

vi.mock('@/lib/supabase/server', () => ({
createClient: () => ({
from: () => ({ select: () => ({ eq: () => ({ single: async () => ({ data: { stripe_customer_id: 'cus_test' } }) }) }) }),
auth: { getUser: async () => ({ data: { user: { email: 'u@example.com' } } }) },
}),
}));

vi.mock('@/lib/stripe/client', () => ({
stripe: {
checkout: {
sessions: {
create: vi.fn(async () => ({ id: 'cs_test_123', url: 'https://stripe.com/cs_test' })),
},
},
},
}));

vi.mock('@/services/payment-idempotency.service', () => ({
paymentIdempotencyService: {
generateKey: vi.fn(async () => 'ikey'),
storeResponse: vi.fn(async () => {}),
},
}));

vi.mock('@/lib/stripe/pricing', () => ({
getTierFromPriceId: () => 'pro',
getValidPriceIds: () => ['price_pro'],
}));

vi.mock('@/lib/stripe/tax', () => ({
getTaxConfiguration: () => ({}),
buildCheckoutTaxParams: () => ({}),
}));

vi.mock('@/services/invoice-delivery.service', () => ({
invoiceDeliveryService: {},
}));

const acquireSpy = vi.spyOn(lockModule, 'acquireAdvisoryLock');
const releaseSpy = vi.spyOn(lockModule, 'releaseAdvisoryLock');

describe('PaymentService – distributed lock', () => {
const svc = new PaymentService();

beforeEach(() => {
vi.clearAllMocks();
});

it('acquires lock, calls inner logic, then releases on success', async () => {
acquireSpy.mockResolvedValue(true);
releaseSpy.mockResolvedValue(undefined);

const result = await svc.createCheckoutSession('user-1', 'price_pro');

expect(acquireSpy).toHaveBeenCalledWith('payment_checkout_user-1', 10_000);
expect(result.sessionId).toBe('cs_test_123');
expect(releaseSpy).toHaveBeenCalledWith('payment_checkout_user-1');
});

it('throws CheckoutLockError and does not release when lock not acquired', async () => {
acquireSpy.mockResolvedValue(false);

await expect(svc.createCheckoutSession('user-1', 'price_pro')).rejects.toBeInstanceOf(CheckoutLockError);
expect(releaseSpy).not.toHaveBeenCalled();
});

it('releases lock even when inner logic throws', async () => {
acquireSpy.mockResolvedValue(true);
releaseSpy.mockResolvedValue(undefined);
// Force inner to throw by making stripe fail
const { stripe } = await import('@/lib/stripe/client');
vi.mocked(stripe.checkout.sessions.create).mockRejectedValueOnce(new Error('stripe down'));

await expect(svc.createCheckoutSession('user-1', 'price_pro')).rejects.toThrow('stripe down');
expect(releaseSpy).toHaveBeenCalledWith('payment_checkout_user-1');
});

it('concurrent second request gets 409 when lock is held', async () => {
let unlocked = false;
acquireSpy.mockImplementation(async () => {
// First call acquires, second fails (simulates held lock)
if (!unlocked) { unlocked = true; return true; }
return false;
});
releaseSpy.mockResolvedValue(undefined);

// First request in progress (not awaited yet)
const first = svc.createCheckoutSession('user-2', 'price_pro');
const second = svc.createCheckoutSession('user-2', 'price_pro');

await first;
await expect(second).rejects.toBeInstanceOf(CheckoutLockError);
});
});
Loading