Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions backend/src/lib/websocket-relay-queries.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/**
* websocket-relay-queries.js
*
* Optimized SQL query helpers for the WebSocket relay event store.
*
* Index recommendations (run once against your database):
*
* -- Covering index for paginated relay event reads, ordered by creation time
* CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_relay_events_created_at
* ON relay_events (created_at DESC);
*
* -- Composite index for queue dequeue queries (status + created_at)
* CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_relay_events_status_created
* ON relay_events (status, created_at ASC)
* WHERE status = 'pending';
*
* -- Index for payment-scoped event lookups
* CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_relay_events_payment_id
* ON relay_events (payment_id, created_at DESC);
*
* These partial/covering indexes keep relay reads fast even when the table
* grows into the millions of rows.
*/

/**
* Fetch the most recent relay events in descending creation order.
*
* Uses a paginated query so callers never pull unbounded result sets.
* The covering index on (created_at DESC) ensures an index-only scan
* on databases that support it.
*
* @param {object} db - pg Pool or pg Client with a .query() method
* @param {number} limit - Maximum rows to return (default 100, max 1000)
* @param {number} offset - Row offset for pagination (default 0)
* @returns {Promise<object[]>}
*/
async function getRecentEvents(db, limit = 100, offset = 0) {
const parsedLimit = Number(limit);
const safeLimit = Math.min(Math.max(1, Number.isFinite(parsedLimit) ? parsedLimit : 100), 1000);
const parsedOffset = Number(offset);
const safeOffset = Math.max(0, Number.isFinite(parsedOffset) ? parsedOffset : 0);

// SET statement_timeout guards against runaway read queries
const { rows } = await db.query(
`
SET LOCAL statement_timeout = '5s';
SELECT
id,
payment_id,
event_type,
payload,
status,
created_at
FROM relay_events
ORDER BY created_at DESC
LIMIT $1
OFFSET $2
`,
[safeLimit, safeOffset],
);

return rows;
}

/**
* Batch-insert multiple relay events in a single round-trip.
*
* Building one multi-row VALUES clause is significantly faster than
* issuing N sequential INSERT statements, especially when N > 10,
* because it eliminates per-statement network and parse overhead.
*
* @param {object} db - pg Pool or pg Client
* @param {object[]} events - Array of event objects with:
* { payment_id, event_type, payload, status }
* @returns {Promise<object[]>} Inserted rows
*/
async function batchInsertEvents(db, events) {
if (!Array.isArray(events) || events.length === 0) {
return [];
}

const values = [];
const placeholders = events.map((event, i) => {
const base = i * 4;
values.push(
event.payment_id,
event.event_type,
event.payload != null ? JSON.stringify(event.payload) : null,
event.status || "pending",
);
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, NOW())`;
});

const sql = `
SET LOCAL statement_timeout = '5s';
INSERT INTO relay_events (payment_id, event_type, payload, status, created_at)
VALUES ${placeholders.join(", ")}
RETURNING id, payment_id, event_type, status, created_at
`;

const { rows } = await db.query(sql, values);
return rows;
}

/**
* Dequeue the next pending relay event using SELECT … FOR UPDATE SKIP LOCKED.
*
* SKIP LOCKED lets multiple consumers pull from the queue concurrently
* without waiting on row locks held by other workers, which eliminates
* the "thundering herd" lock contention common in naive queue designs.
* The SET LOCAL statement_timeout prevents a worker from holding a lock
* indefinitely when the database is slow.
*
* @param {object} db - pg Pool or pg Client
* @returns {Promise<object|null>} The dequeued event row, or null if the queue is empty
*/
async function dequeueNextEvent(db) {
const { rows } = await db.query(`
SET LOCAL statement_timeout = '5s';
WITH next_event AS (
SELECT id
FROM relay_events
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE relay_events
SET status = 'processing'
FROM next_event
WHERE relay_events.id = next_event.id
RETURNING relay_events.id, relay_events.payment_id, relay_events.event_type, relay_events.payload, relay_events.created_at
`);

return rows[0] || null;
}

export { getRecentEvents, batchInsertEvents, dequeueNextEvent };
170 changes: 170 additions & 0 deletions backend/src/lib/websocket-relay-queries.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import {
getRecentEvents,
batchInsertEvents,
dequeueNextEvent,
} from "./websocket-relay-queries.js";

function makeDb(rows = []) {
return {
query: vi.fn().mockResolvedValue({ rows }),
};
}

describe("getRecentEvents", () => {
let db;

beforeEach(() => {
db = makeDb([
{ id: "1", payment_id: "p1", event_type: "payment.confirmed", status: "pending", created_at: new Date() },
]);
});

it("calls db.query with correct limit and offset", async () => {
await getRecentEvents(db, 10, 20);
expect(db.query).toHaveBeenCalledTimes(1);
const [sql, params] = db.query.mock.calls[0];
expect(params).toEqual([10, 20]);
expect(sql).toContain("ORDER BY created_at DESC");
expect(sql).toContain("LIMIT $1");
expect(sql).toContain("OFFSET $2");
});

it("uses default limit 100 and offset 0 when not provided", async () => {
await getRecentEvents(db);
const [, params] = db.query.mock.calls[0];
expect(params).toEqual([100, 0]);
});

it("clamps limit to 1000 max", async () => {
await getRecentEvents(db, 99999, 0);
const [, params] = db.query.mock.calls[0];
expect(params[0]).toBe(1000);
});

it("clamps limit to minimum of 1", async () => {
await getRecentEvents(db, 0, 0);
const [, params] = db.query.mock.calls[0];
expect(params[0]).toBe(1);
});

it("returns rows from the query result", async () => {
const result = await getRecentEvents(db, 5, 0);
expect(Array.isArray(result)).toBe(true);
expect(result).toHaveLength(1);
expect(result[0].payment_id).toBe("p1");
});

it("includes SET LOCAL statement_timeout in the query", async () => {
await getRecentEvents(db, 10, 0);
const [sql] = db.query.mock.calls[0];
expect(sql).toContain("statement_timeout");
});
});

describe("batchInsertEvents", () => {
let db;

beforeEach(() => {
db = makeDb([
{ id: "a", payment_id: "p1", event_type: "relay.send", status: "pending", created_at: new Date() },
{ id: "b", payment_id: "p2", event_type: "relay.send", status: "pending", created_at: new Date() },
]);
});

it("returns empty array when events array is empty", async () => {
const result = await batchInsertEvents(db, []);
expect(result).toEqual([]);
expect(db.query).not.toHaveBeenCalled();
});

it("returns empty array when events is not an array", async () => {
const result = await batchInsertEvents(db, null);
expect(result).toEqual([]);
expect(db.query).not.toHaveBeenCalled();
});

it("builds a single INSERT with multiple VALUE rows", async () => {
const events = [
{ payment_id: "p1", event_type: "relay.send", payload: { foo: 1 }, status: "pending" },
{ payment_id: "p2", event_type: "relay.ack", payload: null, status: "pending" },
];
await batchInsertEvents(db, events);
expect(db.query).toHaveBeenCalledTimes(1);
const [sql, params] = db.query.mock.calls[0];
expect(sql).toContain("INSERT INTO relay_events");
// 2 events × 4 params each = 8 total
expect(params).toHaveLength(8);
expect(params[0]).toBe("p1");
expect(params[1]).toBe("relay.send");
expect(params[4]).toBe("p2");
});

it("serializes payload objects to JSON strings", async () => {
const events = [
{ payment_id: "p1", event_type: "relay.send", payload: { amount: 100 }, status: "pending" },
];
await batchInsertEvents(db, events);
const [, params] = db.query.mock.calls[0];
expect(params[2]).toBe(JSON.stringify({ amount: 100 }));
});

it("defaults status to 'pending' if not provided", async () => {
const events = [{ payment_id: "p1", event_type: "relay.send", payload: null }];
await batchInsertEvents(db, events);
const [, params] = db.query.mock.calls[0];
expect(params[3]).toBe("pending");
});

it("returns rows from the query result", async () => {
const events = [
{ payment_id: "p1", event_type: "relay.send", payload: null, status: "pending" },
{ payment_id: "p2", event_type: "relay.ack", payload: null, status: "pending" },
];
const result = await batchInsertEvents(db, events);
expect(result).toHaveLength(2);
});

it("includes statement_timeout in the query", async () => {
const events = [{ payment_id: "p1", event_type: "e", payload: null, status: "pending" }];
await batchInsertEvents(db, events);
const [sql] = db.query.mock.calls[0];
expect(sql).toContain("statement_timeout");
});
});

describe("dequeueNextEvent", () => {
it("returns the first event row when queue has items", async () => {
const event = { id: "x", payment_id: "p1", event_type: "relay.send", payload: null, created_at: new Date() };
const db = makeDb([event]);
const result = await dequeueNextEvent(db);
expect(result).toEqual(event);
});

it("returns null when the queue is empty", async () => {
const db = makeDb([]);
const result = await dequeueNextEvent(db);
expect(result).toBeNull();
});

it("issues a SELECT FOR UPDATE SKIP LOCKED query", async () => {
const db = makeDb([]);
await dequeueNextEvent(db);
const [sql] = db.query.mock.calls[0];
expect(sql).toContain("FOR UPDATE SKIP LOCKED");
});

it("updates status to processing in the same query", async () => {
const db = makeDb([]);
await dequeueNextEvent(db);
const [sql] = db.query.mock.calls[0];
expect(sql).toContain("status = 'processing'");
});

it("includes statement_timeout guard", async () => {
const db = makeDb([]);
await dequeueNextEvent(db);
const [sql] = db.query.mock.calls[0];
expect(sql).toContain("statement_timeout");
});
});
Loading
Loading