From 5a2cc3bce58a8212b41e8a4188408d52180a3e42 Mon Sep 17 00:00:00 2001 From: Akpolo Date: Fri, 19 Jun 2026 20:21:23 +0100 Subject: [PATCH] fix(backend): close idempotency race; create contract_events index via migrate - #1: idempotencyMiddleware now reserves the cache key with an atomic setNotExists BEFORE forwarding to the handler, so two concurrent requests sharing one Idempotency-Key can't both miss the cache check and submit the same money-moving transaction twice. The losing concurrent request gets a 409 with X-Idempotency-Cache: IN_PROGRESS; on 5xx the in-progress placeholder is dropped so retries aren't poisoned. Regression test fires two requests with the same key at once and asserts the handler runs exactly once. - #2: replace the orphan src/db/migrations SQL file (which node-pg-migrate never scanned) with a properly-formatted node-pg-migrate JS migration using disableTransaction = true so CREATE INDEX CONCURRENTLY survives. Per review: * renumbered the new migration from 1778000000000 to 1790000000000 so it sorts after the current head (1789000000001) instead of being back-dated and violating the monotonic convention; * retargeted the index from loan_events to contract_events because 1788000000019_unified-contract-events turned loan_events into a backward-compat view over contract_events, and CREATE INDEX CONCURRENTLY cannot index a view. The index name follows the contract_events_* convention that migration established. Single source of truth: the stray src/db/migrations directory has been removed. Closes #1 Closes #2 --- ...0_contract-events-type-created-at-index.js | 37 +++++ ...00000000000_add-loan-events-type-index.sql | 9 -- src/middleware/idempotency.ts | 131 +++++++++++---- src/tests/idempotency.test.ts | 152 ++++++++++++++---- 4 files changed, 253 insertions(+), 76 deletions(-) create mode 100644 migrations/1790000000000_contract-events-type-created-at-index.js delete mode 100644 src/db/migrations/1700000000000_add-loan-events-type-index.sql diff --git a/migrations/1790000000000_contract-events-type-created-at-index.js b/migrations/1790000000000_contract-events-type-created-at-index.js new file mode 100644 index 0000000..630ad13 --- /dev/null +++ b/migrations/1790000000000_contract-events-type-created-at-index.js @@ -0,0 +1,37 @@ +/** + * @type {import('node-pg-migrate').ColumnDefinitions | undefined} + */ +export const shorthands = undefined; + +// Issue #2: CREATE INDEX CONCURRENTLY cannot run inside a transaction, and +// node-pg-migrate wraps each migration in a transaction by default. Disabling +// the transaction wrapper lets the CONCURRENTLY clause survive while still +// going through the migrate:up tooling. +export const disableTransaction = true; + +/** + * @param pgm {import('node-pg-migrate').MigrationBuilder} + * @returns {Promise | void} + */ +export const up = (pgm) => { + // The orphaned src/db/migrations SQL targeted `loan_events`, but as of + // 1788000000019_unified-contract-events the loan_events relation is a + // backward-compatibility view over contract_events. CREATE INDEX + // CONCURRENTLY cannot index a view, so we target the underlying table + // directly — the index name follows the contract_events_* convention + // the rest of that migration established. + pgm.sql( + `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_contract_events_type_created_at + ON contract_events (event_type, created_at);`, + ); +}; + +/** + * @param pgm {import('node-pg-migrate').MigrationBuilder} + * @returns {Promise | void} + */ +export const down = (pgm) => { + pgm.sql( + "DROP INDEX CONCURRENTLY IF EXISTS idx_contract_events_type_created_at;", + ); +}; diff --git a/src/db/migrations/1700000000000_add-loan-events-type-index.sql b/src/db/migrations/1700000000000_add-loan-events-type-index.sql deleted file mode 100644 index b595c61..0000000 --- a/src/db/migrations/1700000000000_add-loan-events-type-index.sql +++ /dev/null @@ -1,9 +0,0 @@ --- Migration: add index on loan_events (event_type, created_at) --- Speeds up the conditional-aggregation query in getPoolStats() - --- Up -CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_loan_events_type_created_at - ON loan_events (event_type, created_at); - --- Down --- DROP INDEX CONCURRENTLY IF EXISTS idx_loan_events_type_created_at; \ No newline at end of file diff --git a/src/middleware/idempotency.ts b/src/middleware/idempotency.ts index 86b314d..4eb7cf1 100644 --- a/src/middleware/idempotency.ts +++ b/src/middleware/idempotency.ts @@ -6,13 +6,29 @@ const IDEMPOTENCY_TTL = 24 * 60 * 60; // 24 hours in seconds interface CachedResponse { status: number; - body: any; + body: unknown; + inProgress?: boolean; } /** * Middleware to handle Idempotency-Key headers. - * If the key is present and a cached response exists, it returns the cached response. - * Otherwise, it intercepts the response, captures it, and stores it in Redis. + * + * Reserves the key with an atomic SET NX *before* the downstream handler + * runs, so two concurrent requests sharing the same key can't both miss the + * cache check and submit the same money-moving transaction twice. The race + * window in the original implementation — which only wrote the cache on the + * `res.on("finish")` callback — is closed by writing an `inProgress` + * placeholder up front and overwriting it with the real response when the + * handler completes. + * + * Behaviour: + * * No `Idempotency-Key` header → forward. + * * Key reserved successfully → handler runs, the response body is cached + * on finish with `X-Idempotency-Cache: STORED`. + * * Key already cached as a finished response → return the cached body + * with `X-Idempotency-Cache: HIT`. + * * Key already reserved but the first request is still in flight → 409 + * with `X-Idempotency-Cache: IN_PROGRESS`. */ export const idempotencyMiddleware = async ( req: Request, @@ -25,16 +41,62 @@ export const idempotencyMiddleware = async ( return next(); } + const cacheKey = `idemp:${key}`; + try { - const cacheKey = `idemp:${key}`; - const cached = await cacheService.get(cacheKey); + // ── Race-free key reservation ────────────────────────────────────────── + // Try to claim the key first. If we lose the race, fall back to reading + // whatever the first request stored (which is guaranteed to exist because + // the winner has written at least the in-progress placeholder). + const reserved = await cacheService.setNotExists( + cacheKey, + { status: 0, body: null, inProgress: true } satisfies CachedResponse, + IDEMPOTENCY_TTL, + ); + + if (!reserved) { + const cached = await cacheService.get(cacheKey); + + // The setNX call failed but the read came back empty — almost certainly + // a transient cache miss between the failed reserve and the read. + // Treat it as "in progress" so the client retries rather than letting + // the handler run uncoordinated. + if (!cached) { + logger.warn( + `Idempotency reservation lost the race but no cached value found; treating as in-progress`, + { key, url: req.originalUrl, method: req.method }, + ); + res + .status(409) + .set("X-Idempotency-Cache", "IN_PROGRESS") + .json({ + error: "request_in_progress", + message: + "Another request with this Idempotency-Key is still being processed.", + }); + return; + } + + if (cached.inProgress) { + logger.info(`Idempotency in-progress for key: ${key}`, { + url: req.originalUrl, + method: req.method, + }); + res + .status(409) + .set("X-Idempotency-Cache", "IN_PROGRESS") + .json({ + error: "request_in_progress", + message: + "Another request with this Idempotency-Key is still being processed.", + }); + return; + } - if (cached) { logger.info(`Idempotency hit for key: ${key}`, { url: req.originalUrl, method: req.method, }); - res .status(cached.status) .set("X-Idempotency-Cache", "HIT") @@ -42,21 +104,21 @@ export const idempotencyMiddleware = async ( return; } - // Capture the original methods to intercept the response body - const originalJson = res.json; - const originalSend = res.send; + // ── Intercept response body to overwrite the placeholder on finish ──── + const originalJson = res.json.bind(res); + const originalSend = res.send.bind(res); - let responseBody: any; + let responseBody: unknown; + let bodyCaptured = false; - // Override res.json - res.json = function (body: any) { + res.json = function (body: unknown) { responseBody = body; - return originalJson.call(this, body); + bodyCaptured = true; + return originalJson(body); }; - // Override res.send (as res.json eventually calls res.send) - res.send = function (body: any) { - if (!responseBody) { + res.send = function (body: unknown) { + if (!bodyCaptured) { if (typeof body === "string") { try { responseBody = JSON.parse(body); @@ -66,27 +128,30 @@ export const idempotencyMiddleware = async ( } else { responseBody = body; } + bodyCaptured = true; } - return originalSend.call(this, body); + return originalSend(body); }; - // Store the response in cache once the request is finished res.on("finish", async () => { - // Only cache 2xx and 4xx status codes. - // 5xx errors should usually be retried without returning a cached failure. - if (res.statusCode >= 200 && res.statusCode < 500 && responseBody) { - try { - await cacheService.set( - cacheKey, - { - status: res.statusCode, - body: responseBody, - }, - IDEMPOTENCY_TTL, - ); - } catch (error) { - logger.error(`Error caching idempotency key ${key}`, { error }); + try { + // Only cache 2xx and 4xx so retries on 5xx are not poisoned with a + // stale failure. On 5xx, drop the in-progress placeholder so future + // requests with the same key can try again. + if (res.statusCode >= 500) { + await cacheService.delete(cacheKey); + return; } + await cacheService.set( + cacheKey, + { + status: res.statusCode, + body: responseBody ?? null, + } satisfies CachedResponse, + IDEMPOTENCY_TTL, + ); + } catch (error) { + logger.error(`Error caching idempotency key ${key}`, { error }); } }); diff --git a/src/tests/idempotency.test.ts b/src/tests/idempotency.test.ts index 96b0d5e..edfd9ac 100644 --- a/src/tests/idempotency.test.ts +++ b/src/tests/idempotency.test.ts @@ -6,72 +6,156 @@ import { jest } from "@jest/globals"; // Helper to cast to jest.Mock const asMock = (fn: any) => fn as jest.Mock; +function makeReq(key?: string): Partial { + const headerMock = jest.fn() as any; + if (key !== undefined) headerMock.mockReturnValue(key); + else headerMock.mockReturnValue(undefined); + return { + header: headerMock, + method: "POST", + originalUrl: "/api/test", + }; +} + +function makeRes() { + return { + status: jest.fn().mockReturnThis(), + set: jest.fn().mockReturnThis(), + setHeader: jest.fn(), + json: jest.fn().mockReturnThis(), + send: jest.fn().mockReturnThis(), + on: jest.fn(), + statusCode: 200, + } as any; +} + describe("Idempotency Middleware", () => { - let req: Partial; - let res: any; // Using any for easier mocking of the intercepted methods let next: NextFunction; beforeEach(() => { - req = { - header: jest.fn() as any, - method: "POST", - originalUrl: "/api/test", - }; - res = { - status: jest.fn().mockReturnThis(), - set: jest.fn().mockReturnThis(), - json: jest.fn().mockReturnThis(), - send: jest.fn().mockReturnThis(), - on: jest.fn(), - statusCode: 200, - }; next = jest.fn(); - - // Mock cacheService explicitly for each test if needed - // In ESM with Jest, mocking can be tricky, so we rely on manual mocks of the singleton instance if possible - // or use jest.spyOn if the instance is exported. jest.spyOn(cacheService, "get").mockReset(); jest.spyOn(cacheService, "set").mockReset(); + jest.spyOn(cacheService, "setNotExists").mockReset(); + jest.spyOn(cacheService, "delete").mockReset(); }); afterEach(() => { jest.restoreAllMocks(); }); - it("should call next() if no Idempotency-Key is present", async () => { - asMock(req.header).mockReturnValue(undefined); - + it("calls next() if no Idempotency-Key is present", async () => { + const req = makeReq(); + const res = makeRes(); await idempotencyMiddleware(req as Request, res as Response, next); expect(next).toHaveBeenCalled(); - expect(cacheService.get).not.toHaveBeenCalled(); + expect(cacheService.setNotExists).not.toHaveBeenCalled(); }); - it("should return cached response if key exists", async () => { + it("returns the cached response when the key was previously stored", async () => { const key = "test-key"; - const cachedResponse = { status: 201, body: { success: true } }; - asMock(req.header).mockReturnValue(key); - (cacheService.get as jest.Mock<() => Promise>).mockResolvedValue( - cachedResponse, - ); + const cached = { status: 201, body: { success: true } }; + ( + cacheService.setNotExists as jest.Mock<() => Promise> + ).mockResolvedValue(false); + (cacheService.get as jest.Mock<() => Promise>).mockResolvedValue(cached); + + const req = makeReq(key); + const res = makeRes(); await idempotencyMiddleware(req as Request, res as Response, next); - expect(cacheService.get).toHaveBeenCalledWith(`idemp:${key}`); + expect(cacheService.setNotExists).toHaveBeenCalledWith( + `idemp:${key}`, + expect.any(Object), + expect.any(Number), + ); expect(res.status).toHaveBeenCalledWith(201); expect(res.set).toHaveBeenCalledWith("X-Idempotency-Cache", "HIT"); - expect(res.json).toHaveBeenCalledWith(cachedResponse.body); + expect(res.json).toHaveBeenCalledWith(cached.body); + expect(next).not.toHaveBeenCalled(); + }); + + it("returns 409 IN_PROGRESS when another request is still processing", async () => { + const key = "in-flight"; + + ( + cacheService.setNotExists as jest.Mock<() => Promise> + ).mockResolvedValue(false); + (cacheService.get as jest.Mock<() => Promise>).mockResolvedValue({ + status: 0, + body: null, + inProgress: true, + }); + + const req = makeReq(key); + const res = makeRes(); + await idempotencyMiddleware(req as Request, res as Response, next); + + expect(res.status).toHaveBeenCalledWith(409); + expect(res.set).toHaveBeenCalledWith("X-Idempotency-Cache", "IN_PROGRESS"); expect(next).not.toHaveBeenCalled(); }); - it("should proceed and intercept response on cache miss", async () => { + it("reserves the key and forwards to next() on the happy path", async () => { const key = "new-key"; - asMock(req.header).mockReturnValue(key); - (cacheService.get as jest.Mock<() => Promise>).mockResolvedValue(null); + ( + cacheService.setNotExists as jest.Mock<() => Promise> + ).mockResolvedValue(true); + const req = makeReq(key); + const res = makeRes(); await idempotencyMiddleware(req as Request, res as Response, next); + expect(cacheService.setNotExists).toHaveBeenCalledWith( + `idemp:${key}`, + expect.objectContaining({ inProgress: true }), + expect.any(Number), + ); expect(next).toHaveBeenCalled(); expect(res.on).toHaveBeenCalledWith("finish", expect.any(Function)); }); + + // Issue #1: regression for the concurrency race that allowed duplicate + // money-moving submissions through. + it("runs the handler exactly once when two requests fire concurrently with the same key", async () => { + const key = "concurrent-key"; + + // Simulate a real cache: the first setNotExists wins, the second loses. + let reserved = false; + let cached: any = null; + ( + cacheService.setNotExists as jest.Mock<() => Promise> + ).mockImplementation(async (_k: any, value: any) => { + if (reserved) return false; + reserved = true; + cached = value; + return true; + }); + (cacheService.get as jest.Mock<() => Promise>).mockImplementation( + async () => cached, + ); + + const handler = jest.fn(); + const wrap = async () => { + const req = makeReq(key); + const res = makeRes(); + const middlewareNext = (() => handler()) as unknown as NextFunction; + await idempotencyMiddleware(req as Request, res as Response, middlewareNext); + return res; + }; + + const [resA, resB] = await Promise.all([wrap(), wrap()]); + + expect(handler).toHaveBeenCalledTimes(1); + // The losing request must surface an explicit in-progress signal — not + // silently pass through to the handler. + const losingRes = resA.status.mock.calls.length > 0 ? resA : resB; + expect(losingRes.status).toHaveBeenCalledWith(409); + expect(losingRes.set).toHaveBeenCalledWith( + "X-Idempotency-Cache", + "IN_PROGRESS", + ); + }); });