Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 84 additions & 53 deletions src/services/idempotencySweeper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,61 @@ import {
sweepIdempotencyStoreRows,
} from './idempotencySweeper.js';

/** Build a mock pool where connect() returns a client that proxies advisory-lock
* queries, and pool.query() handles the row-count SELECT. */
function makeMockPool({
lockAcquired,
deleteRowCount,
rowCount,
deleteFn,
}: {
lockAcquired: boolean;
deleteRowCount?: number;
rowCount?: number;
deleteFn?: () => Promise<{ rowCount: number }>;
}) {
const client = {
query: jest.fn().mockImplementation(async (sql: string) => {
if (sql.includes('pg_try_advisory_lock')) {
return { rows: [{ acquired: lockAcquired }] };
}
if (sql.includes('DELETE FROM idempotency_store')) {
return deleteFn ? deleteFn() : { rowCount: deleteRowCount ?? 0 };
}
if (sql.includes('pg_advisory_unlock')) {
return { rows: [] };
}
return { rows: [] };
}),
release: jest.fn(),
};

const pool = {
connect: jest.fn().mockResolvedValue(client),
query: jest.fn().mockImplementation(async (sql: string) => {
if (sql.includes('SELECT COUNT')) {
return { rows: [{ row_count: String(rowCount ?? 0) }] };
}
return { rows: [] };
}),
};

return { pool, client };
}

describe('idempotency sweeper', () => {
afterEach(() => {
jest.useRealTimers();
resetAllMetrics();
});

it('acquires the advisory lock, deletes expired rows, and updates the gauge', async () => {
const mockPool = {
query: jest
.fn()
.mockResolvedValueOnce({ rows: [{ acquired: true }] })
.mockResolvedValueOnce({ rowCount: 2 })
.mockResolvedValueOnce({ rows: [{ row_count: '5' }] })
.mockResolvedValueOnce({}),
} as any;
const { pool } = makeMockPool({ lockAcquired: true, deleteRowCount: 2, rowCount: 5 });

const rowCount = await sweepIdempotencyStoreRows(mockPool);
const rowCount = await sweepIdempotencyStoreRows(pool as any);

expect(rowCount).toBe(5);
expect(mockPool.query).toHaveBeenNthCalledWith(
1,
'SELECT pg_try_advisory_lock($1) AS acquired',
[0x4a5b6c7d],
);
expect(mockPool.query).toHaveBeenNthCalledWith(
2,
'DELETE FROM idempotency_store WHERE expires_at < NOW()::timestamp',
);
expect(mockPool.query).toHaveBeenNthCalledWith(
3,
'SELECT COUNT(*)::bigint AS row_count FROM idempotency_store',
);
expect(pool.connect).toHaveBeenCalledTimes(1);

const metrics = await register.getMetricsAsJSON();
const gauge = metrics.find((m: any) => m.name === 'idempotency_store_rows');
Expand All @@ -44,59 +67,67 @@ describe('idempotency sweeper', () => {
});

it('skips delete when lock is held by another instance and still updates the gauge', async () => {
const mockPool = {
query: jest
.fn()
.mockResolvedValueOnce({ rows: [{ acquired: false }] })
.mockResolvedValueOnce({ rows: [{ row_count: '3' }] }),
} as any;
const { pool, client } = makeMockPool({ lockAcquired: false, rowCount: 3 });

const rowCount = await sweepIdempotencyStoreRows(mockPool);
const rowCount = await sweepIdempotencyStoreRows(pool as any);

expect(rowCount).toBe(3);
expect(mockPool.query).toHaveBeenNthCalledWith(
1,
'SELECT pg_try_advisory_lock($1) AS acquired',
[0x4a5b6c7d],
);
expect(mockPool.query).toHaveBeenNthCalledWith(
2,
'SELECT COUNT(*)::bigint AS row_count FROM idempotency_store',
expect(client.query).not.toHaveBeenCalledWith(
expect.stringContaining('DELETE'),
expect.anything(),
);
});

it('respects shutdown and waits for the current sweep to complete', async () => {
jest.useFakeTimers();
let resolveDelete!: () => void;
const deleteStarted = new Promise<void>((r) => { resolveDelete = r; });
let deleteResolve!: () => void;
const deletePermit = new Promise<void>((r) => { deleteResolve = r; });
let sweepComplete = false;

let firstQueryReleased = false;
const mockPool = {
query: jest.fn().mockImplementation(async (text: string) => {
if (text.includes('pg_try_advisory_lock')) {
const client = {
query: jest.fn().mockImplementation(async (sql: string) => {
if (sql.includes('pg_try_advisory_lock')) {
return { rows: [{ acquired: true }] };
}
if (text.includes('DELETE FROM idempotency_store')) {
await new Promise((resolve) => setTimeout(resolve, 10));
firstQueryReleased = true;
if (sql.includes('pg_advisory_unlock')) {
return { rows: [] };
}
return { rows: [] };
}),
release: jest.fn(),
};

const pool = {
connect: jest.fn().mockResolvedValue(client),
query: jest.fn().mockImplementation(async (sql: string) => {
if (sql.includes('DELETE FROM idempotency_store')) {
resolveDelete(); // signal we're inside the slow query
await deletePermit; // block until test unblocks us
sweepComplete = true;
return { rowCount: 1 };
}
if (text.includes('SELECT COUNT')) {
if (sql.includes('SELECT COUNT')) {
return { rows: [{ row_count: '1' }] };
}
return { rows: [] };
}),
} as any;
};

const job = createIdempotencySweeperJob(mockPool, { intervalMs: 1000 });
const job = createIdempotencySweeperJob(pool as any, { intervalMs: 1000 });
job.start();

await Promise.resolve();
expect(mockPool.query).toHaveBeenCalledWith('SELECT pg_try_advisory_lock($1) AS acquired', [0x4a5b6c7d]);
// Wait until the tick is actually blocked inside the DELETE query
await deleteStarted;

// beginShutdown must not kill in-flight work
job.beginShutdown();

// Unblock the DELETE, then wait for the job to drain
deleteResolve();
await job.awaitIdle();

expect(firstQueryReleased).toBe(true);
jest.runOnlyPendingTimers();
expect(mockPool.query).toHaveBeenCalledTimes(3);
expect(sweepComplete).toBe(true);
expect(pool.connect).toHaveBeenCalledTimes(1);
});
});
59 changes: 18 additions & 41 deletions src/services/idempotencySweeper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Pool } from 'pg';
import { setIdempotencyStoreRows } from '../metrics.js';
import { withAdvisoryLock } from '../workers/lockHelper.js';

const IDEMPOTENCY_SWEEPER_ADVISORY_LOCK_KEY = 0x4a5b6c7d;

Expand All @@ -19,53 +20,29 @@ export async function sweepIdempotencyStoreRows(
pool: Pool,
logger: Pick<typeof console, 'error' | 'info'> = console,
): Promise<number> {
let lockAcquired = false;
let deletedRows = 0;

try {
const lockResult = await pool.query<{ acquired: boolean }>(
'SELECT pg_try_advisory_lock($1) AS acquired',
[IDEMPOTENCY_SWEEPER_ADVISORY_LOCK_KEY],
await withAdvisoryLock(pool, IDEMPOTENCY_SWEEPER_ADVISORY_LOCK_KEY, logger, async () => {
const deleteResult = await pool.query(
'DELETE FROM idempotency_store WHERE expires_at < NOW()::timestamp',
);

if (lockResult.rows[0]?.acquired) {
lockAcquired = true;
const deleteResult = await pool.query(
'DELETE FROM idempotency_store WHERE expires_at < NOW()::timestamp',
);
deletedRows = deleteResult.rowCount ?? 0;
logger.info(
`[idempotencySweeper] Removed ${deletedRows} expired idempotency rows.`,
);
} else {
logger.info(
'[idempotencySweeper] Another instance owns the sweeper lock; skipping cleanup for this run.',
);
}

const countResult = await pool.query<{ row_count: string }>(
'SELECT COUNT(*)::bigint AS row_count FROM idempotency_store',
);
const rowCount = Number(countResult.rows[0]?.row_count ?? 0);

setIdempotencyStoreRows(rowCount);
deletedRows = deleteResult.rowCount ?? 0;
logger.info(
`[idempotencySweeper] idempotency_store_rows=${rowCount} (deleted ${deletedRows}).`,
`[idempotencySweeper] Removed ${deletedRows} expired idempotency rows.`,
);
});

return rowCount;
} catch (error) {
logger.error('[idempotencySweeper] Sweep failed:', error);
throw error;
} finally {
if (lockAcquired) {
try {
await pool.query('SELECT pg_advisory_unlock($1)', [IDEMPOTENCY_SWEEPER_ADVISORY_LOCK_KEY]);
} catch (unlockError) {
logger.error('[idempotencySweeper] Failed to release advisory lock:', unlockError);
}
}
}
const countResult = await pool.query<{ row_count: string }>(
'SELECT COUNT(*)::bigint AS row_count FROM idempotency_store',
);
const rowCount = Number(countResult.rows[0]?.row_count ?? 0);

setIdempotencyStoreRows(rowCount);
logger.info(
`[idempotencySweeper] idempotency_store_rows=${rowCount} (deleted ${deletedRows}).`,
);

return rowCount;
}

export function createIdempotencySweeperJob(
Expand Down
130 changes: 130 additions & 0 deletions src/workers/lockHelper.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import type { Pool, PoolClient } from 'pg';
import { withAdvisoryLock } from './lockHelper.js';

const makeClient = (acquired: boolean, queryError?: Error, unlockError?: Error): PoolClient => {
const client = {
query: jest.fn().mockImplementation(async (sql: string) => {
if (sql.includes('pg_try_advisory_lock')) {
if (queryError) throw queryError;
return { rows: [{ acquired }] };
}
if (sql.includes('pg_advisory_unlock')) {
if (unlockError) throw unlockError;
return { rows: [] };
}
return { rows: [] };
}),
release: jest.fn(),
} as unknown as PoolClient;
return client;
};

const makePool = (client: PoolClient): Pool =>
({ connect: jest.fn().mockResolvedValue(client) }) as unknown as Pool;

const makeLogger = () => ({
info: jest.fn(),
error: jest.fn(),
});

describe('withAdvisoryLock', () => {
it('acquires lock, runs fn, releases lock, and returns true', async () => {
const client = makeClient(true);
const pool = makePool(client);
const logger = makeLogger();
const fn = jest.fn().mockResolvedValue(undefined);

const result = await withAdvisoryLock(pool, 0x1234, logger, fn);

expect(result).toBe(true);
expect(fn).toHaveBeenCalledTimes(1);
expect(client.query).toHaveBeenCalledWith('SELECT pg_try_advisory_lock($1) AS acquired', [0x1234]);
expect(client.query).toHaveBeenCalledWith('SELECT pg_advisory_unlock($1)', [0x1234]);
expect(client.release).toHaveBeenCalledTimes(1);
expect(logger.info).toHaveBeenCalledWith(expect.stringContaining('acquired'), expect.anything());
expect(logger.info).toHaveBeenCalledWith(expect.stringContaining('released'), expect.anything());
});

it('skips fn and returns false when lock is held by another replica', async () => {
const client = makeClient(false);
const pool = makePool(client);
const logger = makeLogger();
const fn = jest.fn();

const result = await withAdvisoryLock(pool, 0x1234, logger, fn);

expect(result).toBe(false);
expect(fn).not.toHaveBeenCalled();
// unlock must NOT be called — we never held the lock
expect(client.query).not.toHaveBeenCalledWith('SELECT pg_advisory_unlock($1)', [0x1234]);
expect(client.release).toHaveBeenCalledTimes(1);
expect(logger.info).toHaveBeenCalledWith(expect.stringContaining('skipping run'), expect.anything());
});

it('releases lock and rethrows when fn throws', async () => {
const client = makeClient(true);
const pool = makePool(client);
const logger = makeLogger();
const boom = new Error('fn failure');
const fn = jest.fn().mockRejectedValue(boom);

await expect(withAdvisoryLock(pool, 0x1234, logger, fn)).rejects.toThrow('fn failure');

expect(client.query).toHaveBeenCalledWith('SELECT pg_advisory_unlock($1)', [0x1234]);
expect(client.release).toHaveBeenCalledTimes(1);
});

it('logs error and still releases client when unlock query fails', async () => {
const unlockError = new Error('unlock failed');
const client = makeClient(true, undefined, unlockError);
const pool = makePool(client);
const logger = makeLogger();
const fn = jest.fn().mockResolvedValue(undefined);

// Should not throw — unlock failure is swallowed after logging
const result = await withAdvisoryLock(pool, 0x1234, logger, fn);

expect(result).toBe(true);
expect(client.release).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining('Failed to release advisory lock'),
expect.objectContaining({ unlockError }),
);
});

it('throws TypeError for a non-integer lock key', async () => {
const pool = { connect: jest.fn() } as unknown as Pool;
const logger = makeLogger();

await expect(withAdvisoryLock(pool, 1.5, logger, jest.fn())).rejects.toThrow(TypeError);
await expect(withAdvisoryLock(pool, NaN, logger, jest.fn())).rejects.toThrow(TypeError);
expect(pool.connect).not.toHaveBeenCalled();
});

it('releases client even when pool.connect succeeds but lock query throws', async () => {
const queryError = new Error('db gone');
const client = makeClient(false, queryError);
const pool = makePool(client);
const logger = makeLogger();

await expect(withAdvisoryLock(pool, 0x1234, logger, jest.fn())).rejects.toThrow('db gone');

expect(client.release).toHaveBeenCalledTimes(1);
});

it('treats missing rows[0] as lock-not-acquired', async () => {
const client = {
query: jest.fn().mockResolvedValueOnce({ rows: [] }),
release: jest.fn(),
} as unknown as PoolClient;
const pool = makePool(client);
const logger = makeLogger();
const fn = jest.fn();

const result = await withAdvisoryLock(pool, 0x1234, logger, fn);

expect(result).toBe(false);
expect(fn).not.toHaveBeenCalled();
expect(client.release).toHaveBeenCalledTimes(1);
});
});
Loading