diff --git a/.env.example b/.env.example index 3fdea6e..6b80ab3 100644 --- a/.env.example +++ b/.env.example @@ -165,3 +165,21 @@ GATEWAY_PROFILING_ENABLED=false # ----------------------------------------------------------------------------- REQUEST_BODY_LIMIT=100kb GATEWAY_BODY_LIMIT=1mb + +# ----------------------------------------------------------------------------- +# Slow Query Alerting — via pg_stat_statements +# Requires the pg_stat_statements extension to be enabled on the database. +# The worker polls pg_stat_statements every SLOW_QUERY_POLL_INTERVAL_MS and +# fires a webhook when any query's mean_exec_time exceeds the threshold. +# ----------------------------------------------------------------------------- +# Webhook URL to POST slow query alerts to (required to enable the feature). +# When omitted the worker is not started. +SLOW_QUERY_ALERT_WEBHOOK_URL= +# P95 latency threshold in milliseconds. Any query averaging above this will +# trigger an alert. Default: 500ms. +SLOW_QUERY_P95_THRESHOLD_MS=500 +# How often to poll pg_stat_statements (milliseconds). Default: 300000 (5 min). +SLOW_QUERY_POLL_INTERVAL_MS=300000 +# Deduplication window per query fingerprint (seconds). A query that was +# already alerted on will not fire again within this window. Default: 3600 (1h). +SLOW_QUERY_DEDUP_WINDOW_SECONDS=3600 diff --git a/docs/slow-query-alerts.md b/docs/slow-query-alerts.md new file mode 100644 index 0000000..cc095ad --- /dev/null +++ b/docs/slow-query-alerts.md @@ -0,0 +1,103 @@ +# Slow Query Alerting + +A background worker that polls PostgreSQL's `pg_stat_statements` view and fires +a webhook when any query's average execution time (`mean_exec_time`) exceeds a +configurable threshold. + +## How it works + +1. Every `SLOW_QUERY_POLL_INTERVAL_MS` (default 5 min) the worker runs a query + against `pg_stat_statements` selecting rows where `mean_exec_time > threshold`. +2. Results are fingerprinted via `md5(query)` for deduplication. +3. Queries that have not been alerted on within the dedup window are POSTed as + JSON to the configured webhook URL. +4. Alerted fingerprints are tracked in-memory; suppressed fingerprints expire + after `SLOW_QUERY_DEDUP_WINDOW_SECONDS`. + +## Prerequisites + +Requires the `pg_stat_statements` extension to be installed on the database: + +```sql +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +``` + +## Configuration + +| Variable | Default | Description | +|---|---|---| +| `SLOW_QUERY_ALERT_WEBHOOK_URL` | — | Webhook URL (required to enable). When unset the worker is not started. | +| `SLOW_QUERY_P95_THRESHOLD_MS` | `500` | Queries with `mean_exec_time` above this (ms) trigger an alert. | +| `SLOW_QUERY_POLL_INTERVAL_MS` | `300000` | Polling interval in ms (default 5 min). | +| `SLOW_QUERY_DEDUP_WINDOW_SECONDS` | `3600` | Dedup window per query fingerprint (default 1 h). | + +## Webhook Payload + +The worker POSTs a JSON body with the following shape: + +```json +{ + "event": "slow_query_alert", + "timestamp": "2025-01-01T00:00:00.000Z", + "data": { + "thresholdMs": 500, + "queryCount": 2, + "queries": [ + { + "fingerprint": "abc123def456", + "querySample": "SELECT * FROM large_table WHERE ...", + "calls": 1500, + "meanExecTimeMs": 1234.56, + "maxExecTimeMs": 8901.23, + "rows": 100 + } + ] + } +} +``` + +Headers: + +| Header | Value | +|---|---| +| `Content-Type` | `application/json` | +| `User-Agent` | `Callora-SlowQueryAlerter/1.0` | + +## Architecture + +The worker follows the same `{ start, stop, beginShutdown, awaitIdle }` factory +pattern used by other background jobs (`idempotencySweeper`, `revenueLedgerIndexer`). + +### Dedup Store + +An in-memory `Map` prevents repeated alerts for +the same query signature. Entries expire after the configured dedup window and +are lazily evicted on `has()` / `cleanup()` calls. + +### Graceful Shutdown + +The worker registers as a `DrainableSubsystem` via the standard lifecycle +handler in `src/lifecycle/shutdown.ts`. + +## Testing + +```bash +npx jest src/workers/slowQueryAlerter.test.ts +``` + +## Metrics + +The worker emits the following Prometheus metrics (via the shared +`src/metrics.ts` registry): + +| Metric | Type | Description | +|---|---|---| +| `slow_query_alerter_runs_total` | Counter | Total poll runs | +| `slow_query_alerter_alerts_total` | Counter | Total alerts fired | +| `slow_query_alerter_queries_above_threshold` | Gauge | Number of queries exceeding threshold in last poll | + +## Error Handling + +- Poll failures are logged at `error` level and do not crash the worker. +- Webhook POST failures are logged at `error` level; no retry logic is applied + (the next poll cycle will re-attempt if the dedup window has expired). diff --git a/src/config/env.ts b/src/config/env.ts index 6ff35bb..df42cc3 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -185,6 +185,12 @@ export const envSchema = z // Idempotency IDEMPOTENCY_RETENTION_WINDOW_SECONDS: z.coerce.number().int().positive().default(86400), IDEMPOTENCY_SWEEPER_INTERVAL_MS: z.coerce.number().int().positive().default(60_000), + + // Slow query alerting + SLOW_QUERY_ALERT_WEBHOOK_URL: z.string().url().optional(), + SLOW_QUERY_P95_THRESHOLD_MS: z.coerce.number().positive().default(500), + SLOW_QUERY_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(300_000), + SLOW_QUERY_DEDUP_WINDOW_SECONDS: z.coerce.number().int().positive().default(3600), }) .superRefine((values, ctx) => { if (values.SOROBAN_RPC_ENABLED && !values.SOROBAN_RPC_URL) { diff --git a/src/config/index.ts b/src/config/index.ts index 90fb885..5300160 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -209,4 +209,11 @@ export const config = { warmupTimeoutMs: env.LISTINGS_CACHE_WARMUP_TIMEOUT_MS, }, bulkEndpointLimit: env.BULK_ENDPOINT_LIMIT, + + slowQueryAlerter: { + webhookUrl: env.SLOW_QUERY_ALERT_WEBHOOK_URL, + p95ThresholdMs: env.SLOW_QUERY_P95_THRESHOLD_MS, + pollIntervalMs: env.SLOW_QUERY_POLL_INTERVAL_MS, + dedupWindowMs: env.SLOW_QUERY_DEDUP_WINDOW_SECONDS * 1000, + }, } as const; diff --git a/src/index.ts b/src/index.ts index 86e07b5..151f447 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,6 +38,7 @@ import { createApiRegistry } from './data/apiRegistry.js'; import { ApiKey } from './types/gateway.js'; import { config } from './config/index.js'; import { listingsCache } from './lib/listingsCache.js'; +import { createSlowQueryAlerterJob } from './workers/slowQueryAlerter.js'; // Helper for Jest/CommonJS compat const isDirectExecution = process.argv[1] && (process.argv[1].endsWith('index.ts') || process.argv[1].endsWith('index.js')); @@ -123,6 +124,15 @@ if (isDirectExecution) { intervalMs: config.idempotency.sweeperIntervalMs, }); + const slowQueryAlerterJob = config.slowQueryAlerter.webhookUrl + ? createSlowQueryAlerterJob(pool, { + webhookUrl: config.slowQueryAlerter.webhookUrl, + p95ThresholdMs: config.slowQueryAlerter.p95ThresholdMs, + pollIntervalMs: config.slowQueryAlerter.pollIntervalMs, + dedupWindowMs: config.slowQueryAlerter.dedupWindowMs, + }) + : null; + const apiKeys = new Map([ ['test-key-1', { key: 'test-key-1', developerId: 'dev_001', apiId: 'api_001' }], ['test-key-2', { key: 'test-key-2', developerId: 'dev_002', apiId: 'api_002' }], @@ -186,6 +196,14 @@ if (isDirectExecution) { awaitIdle: () => settlementReconJob.awaitIdle(), }, ]; + + if (slowQueryAlerterJob) { + shutdownSubsystems.push({ + name: 'slow-query-alerter', + beginShutdown: () => slowQueryAlerterJob.beginShutdown(), + awaitIdle: () => slowQueryAlerterJob.awaitIdle(), + }); + } app.use('/v1/call', legacyV1DeprecationMiddleware, proxyDrainTracker.middleware); app.use('/v1/call', proxyRouter); @@ -202,6 +220,7 @@ if (isDirectExecution) { settlementStatusSyncJob.stop(); settlementReconJob.stop(); idempotencySweeperJob.stop(); + slowQueryAlerterJob?.stop(); await closeDb(); await Promise.allSettled([ closePgPool(), @@ -234,6 +253,7 @@ if (isDirectExecution) { settlementStatusSyncJob.start(); settlementReconJob.start(); idempotencySweeperJob.start(); + slowQueryAlerterJob?.start(); const server = app.listen(PORT, () => { console.log(`Callora backend listening on http://localhost:${PORT}`); diff --git a/src/metrics.ts b/src/metrics.ts index f7355a9..9209aa2 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -497,6 +497,7 @@ export function resetAllMetrics(): void { proxyPrematureAbortsTotal.reset(); idempotencyStoreRows.reset(); gatewayUpstreamBreakerState.reset(); + resetSlowQueryAlerterMetrics(); resetReplicaMetrics(); } @@ -566,6 +567,62 @@ export function recordReplicaFailure(): void { dbReplicaFailuresTotal.inc(); } +// ── Slow Query Alerter metrics ──────────────────────────────────────────────── +// +// Metric: slow_query_alerter_runs_total +// Type: Counter +// Labels: (none) +// Purpose: Total number of poll cycles the slow query alerter has completed. +// +// Metric: slow_query_alerter_alerts_total +// Type: Counter +// Labels: (none) +// Purpose: Total number of webhook alerts fired. +// +// Metric: slow_query_alerter_queries_above_threshold +// Type: Gauge +// Labels: (none) +// Purpose: Number of queries exceeding the threshold in the most recent poll. +// ───────────────────────────────────────────────────────────────────────────── + +const slowQueryAlerterRunsTotal = new client.Counter({ + name: 'slow_query_alerter_runs_total', + help: 'Total number of slow query alerter poll cycles', +}); + +const slowQueryAlerterAlertsTotal = new client.Counter({ + name: 'slow_query_alerter_alerts_total', + help: 'Total number of slow query alerts fired', +}); + +const slowQueryAlerterQueriesAboveThreshold = new client.Gauge({ + name: 'slow_query_alerter_queries_above_threshold', + help: 'Number of queries exceeding the threshold in the most recent poll', +}); + +register.registerMetric(slowQueryAlerterRunsTotal); +register.registerMetric(slowQueryAlerterAlertsTotal); +register.registerMetric(slowQueryAlerterQueriesAboveThreshold); + +export function recordSlowQueryAlerterRun(): void { + slowQueryAlerterRunsTotal.inc(); +} + +export function recordSlowQueryAlerterAlert(): void { + slowQueryAlerterAlertsTotal.inc(); +} + +export function recordSlowQueryAlerterQueriesAboveThreshold(count: number): void { + slowQueryAlerterQueriesAboveThreshold.set(count); +} + +/** Reset slow query alerter metrics. Used in tests to isolate metric state. */ +export function resetSlowQueryAlerterMetrics(): void { + slowQueryAlerterRunsTotal.reset(); + slowQueryAlerterAlertsTotal.reset(); + slowQueryAlerterQueriesAboveThreshold.reset(); +} + /** Reset all replica routing metrics. Used in tests to isolate metric state. */ export function resetReplicaMetrics(): void { dbReplicaQueriesTotal.reset(); diff --git a/src/workers/slowQueryAlerter.test.ts b/src/workers/slowQueryAlerter.test.ts new file mode 100644 index 0000000..e9fced9 --- /dev/null +++ b/src/workers/slowQueryAlerter.test.ts @@ -0,0 +1,515 @@ +import { resetAllMetrics, register } from '../metrics.js'; +import { + createSlowQueryAlerterJob, + createDedupStore, + fetchSlowQueries, + type SlowQueryEntry, +} from './slowQueryAlerter.js'; + +function makeRow(overrides: Partial = {}): SlowQueryEntry { + return { + fingerprint: 'abc123', + querySample: 'SELECT * FROM users WHERE id = $1', + calls: 100, + meanExecTime: 600, + maxExecTime: 1200, + rows: 1, + ...overrides, + }; +} + +const webhookUrl = 'https://hooks.example.com/slow-queries'; + +describe('slowQueryAlerter', () => { + let originalFetch: typeof global.fetch; + + beforeAll(() => { + jest.useFakeTimers(); + }); + + beforeEach(() => { + originalFetch = global.fetch; + jest.spyOn(console, 'log').mockImplementation(() => {}); + jest.spyOn(console, 'warn').mockImplementation(() => {}); + jest.spyOn(console, 'error').mockImplementation(() => {}); + }); + + afterEach(() => { + global.fetch = originalFetch; + jest.clearAllTimers(); + jest.restoreAllMocks(); + resetAllMetrics(); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + describe('fetchSlowQueries', () => { + it('returns rows from the pool query', async () => { + const expectedRows = [makeRow()]; + const mockPool = { + query: jest.fn().mockResolvedValue({ rows: expectedRows }), + } as any; + + const rows = await fetchSlowQueries(mockPool, 500); + + expect(rows).toEqual(expectedRows); + expect(mockPool.query).toHaveBeenCalledTimes(1); + expect(mockPool.query).toHaveBeenCalledWith( + expect.stringContaining('pg_stat_statements'), + [500], + ); + }); + + it('returns empty array when no slow queries', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ rows: [] }), + } as any; + + const rows = await fetchSlowQueries(mockPool, 9999); + + expect(rows).toEqual([]); + }); + }); + + describe('createDedupStore', () => { + beforeEach(() => { + jest.setSystemTime(100_000); + }); + + it('returns false for unseen keys', () => { + const store = createDedupStore(60_000); + expect(store.has('fingerprint-1')).toBe(false); + }); + + it('returns true for set keys within window', () => { + const store = createDedupStore(60_000); + store.set('fingerprint-1'); + expect(store.has('fingerprint-1')).toBe(true); + }); + + it('returns false for expired keys', () => { + const store = createDedupStore(60_000); + store.set('fingerprint-1'); + jest.advanceTimersByTime(61_000); + expect(store.has('fingerprint-1')).toBe(false); + }); + + it('cleanup removes expired entries', () => { + const store = createDedupStore(60_000); + store.set('fingerprint-1'); + store.set('fingerprint-2'); + jest.advanceTimersByTime(61_000); + store.set('fingerprint-3'); + store.cleanup(); + expect(store.has('fingerprint-1')).toBe(false); + expect(store.has('fingerprint-2')).toBe(false); + expect(store.has('fingerprint-3')).toBe(true); + }); + }); + + describe('createSlowQueryAlerterJob', () => { + it('throws on invalid pollIntervalMs', () => { + const pool = {} as any; + expect(() => + createSlowQueryAlerterJob(pool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: -1, + dedupWindowMs: 3600_000, + }), + ).toThrow('pollIntervalMs must be a positive integer'); + }); + + it('throws on invalid p95ThresholdMs', () => { + const pool = {} as any; + expect(() => + createSlowQueryAlerterJob(pool, { + webhookUrl, + p95ThresholdMs: 0, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }), + ).toThrow('p95ThresholdMs must be a positive number'); + }); + + it('throws on invalid dedupWindowMs', () => { + const pool = {} as any; + expect(() => + createSlowQueryAlerterJob(pool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 0, + }), + ).toThrow('dedupWindowMs must be a positive integer'); + }); + + it('throws on missing webhookUrl', () => { + const pool = {} as any; + expect(() => + createSlowQueryAlerterJob(pool, { + webhookUrl: '', + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }), + ).toThrow('webhookUrl is required'); + }); + + it('runs a tick on start and alerts for new slow queries', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ + rows: [makeRow()], + }), + } as any; + + const fetchMock = jest.fn().mockResolvedValue({ ok: true } as Response); + global.fetch = fetchMock as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }); + + job.start(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockPool.query).toHaveBeenCalledWith( + expect.stringContaining('pg_stat_statements'), + [500], + ); + expect(fetchMock).toHaveBeenCalledWith( + webhookUrl, + expect.objectContaining({ + method: 'POST', + headers: expect.objectContaining({ + 'Content-Type': 'application/json', + }), + }), + ); + + job.stop(); + }); + + it('does not alert for queries already in dedup window', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ + rows: [makeRow({ fingerprint: 'dup-fingerprint' })], + }), + } as any; + + const fetchMock = jest.fn().mockResolvedValue({ ok: true } as Response); + global.fetch = fetchMock as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }); + + job.start(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(fetchMock).toHaveBeenCalledTimes(1); + fetchMock.mockClear(); + + jest.advanceTimersByTime(300_000); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(fetchMock).not.toHaveBeenCalled(); + + job.stop(); + }); + + it('alerts again after dedup window expires', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ + rows: [makeRow({ fingerprint: 'recurring-query' })], + }), + } as any; + + const fetchMock = jest.fn().mockResolvedValue({ ok: true } as Response); + global.fetch = fetchMock as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 10, + dedupWindowMs: 100, + }); + + job.start(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(fetchMock).toHaveBeenCalledTimes(1); + fetchMock.mockClear(); + mockPool.query.mockClear(); + + jest.advanceTimersByTime(200); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(fetchMock).toHaveBeenCalledTimes(1); + job.stop(); + }); + + it('skips tick when already running', async () => { + let queryResolve!: () => void; + const queryPromise = new Promise((resolve) => { + queryResolve = resolve; + }); + + const mockPool = { + query: jest.fn().mockImplementation(async () => { + await queryPromise; + return { rows: [makeRow()] }; + }), + } as any; + + const fetchMock = jest.fn().mockResolvedValue({ ok: true } as Response); + global.fetch = fetchMock as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 10, + dedupWindowMs: 3600_000, + }); + + job.start(); + await Promise.resolve(); + + expect(mockPool.query).toHaveBeenCalledTimes(1); + + jest.advanceTimersByTime(10); + await Promise.resolve(); + + expect(mockPool.query).toHaveBeenCalledTimes(1); + + queryResolve(); + await Promise.resolve(); + await Promise.resolve(); + + jest.advanceTimersByTime(10); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockPool.query).toHaveBeenCalledTimes(2); + + job.stop(); + }); + + it('respects beginShutdown and does not start ticks', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ rows: [] }), + } as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 10, + dedupWindowMs: 3600_000, + }); + + job.beginShutdown(); + job.start(); + + jest.advanceTimersByTime(100); + await Promise.resolve(); + + expect(mockPool.query).not.toHaveBeenCalled(); + }); + + it('awaitIdle resolves when no tick is running', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ rows: [] }), + } as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }); + + await expect(job.awaitIdle()).resolves.toBeUndefined(); + }); + + it('stops and starts cleanly', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ rows: [] }), + } as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 10, + dedupWindowMs: 3600_000, + }); + + job.start(); + await Promise.resolve(); + expect(mockPool.query).toHaveBeenCalledTimes(1); + + job.stop(); + mockPool.query.mockClear(); + + jest.advanceTimersByTime(100); + await Promise.resolve(); + + expect(mockPool.query).not.toHaveBeenCalled(); + + job.start(); + await Promise.resolve(); + expect(mockPool.query).toHaveBeenCalledTimes(1); + + job.stop(); + }); + + it('records Prometheus metrics on successful run', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ + rows: [makeRow(), makeRow({ fingerprint: 'def456', meanExecTime: 900 })], + }), + } as any; + + const fetchMock = jest.fn().mockResolvedValue({ ok: true } as Response); + global.fetch = fetchMock as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }); + + job.start(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + const metrics = await register.getMetricsAsJSON(); + const runsMetric = metrics.find( + (m: any) => m.name === 'slow_query_alerter_runs_total', + ); + expect(runsMetric).toBeDefined(); + expect(runsMetric.values[0].value).toBe(1); + + const alertsMetric = metrics.find( + (m: any) => m.name === 'slow_query_alerter_alerts_total', + ); + expect(alertsMetric).toBeDefined(); + expect(alertsMetric.values[0].value).toBe(1); + + const gaugeMetric = metrics.find( + (m: any) => m.name === 'slow_query_alerter_queries_above_threshold', + ); + expect(gaugeMetric).toBeDefined(); + expect(gaugeMetric.values[0].value).toBe(2); + + job.stop(); + }); + + it('logs error when webhook returns non-2xx', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ + rows: [makeRow()], + }), + } as any; + + const fetchMock = jest.fn().mockResolvedValue({ + ok: false, + status: 500, + statusText: 'Internal Server Error', + } as Response); + global.fetch = fetchMock as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }); + + job.start(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(console.error).toHaveBeenCalledWith( + expect.stringContaining('[slowQueryAlerter] Webhook returned 500'), + 'Internal Server Error', + ); + + job.stop(); + }); + + it('logs error when webhook fetch throws', async () => { + const mockPool = { + query: jest.fn().mockResolvedValue({ + rows: [makeRow()], + }), + } as any; + + const fetchMock = jest.fn().mockRejectedValue(new Error('network error')); + global.fetch = fetchMock as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }); + + job.start(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(console.error).toHaveBeenCalledWith( + expect.stringContaining('[slowQueryAlerter] Webhook post failed:'), + 'network error', + ); + + job.stop(); + }); + + it('logs error when pool query throws', async () => { + const mockPool = { + query: jest.fn().mockRejectedValue(new Error('db connection lost')), + } as any; + + const job = createSlowQueryAlerterJob(mockPool, { + webhookUrl, + p95ThresholdMs: 500, + pollIntervalMs: 300_000, + dedupWindowMs: 3600_000, + }); + + job.start(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(console.error).toHaveBeenCalledWith( + expect.stringContaining('[slowQueryAlerter] Job failed:'), + expect.any(Error), + ); + job.stop(); + }); + }); +}); diff --git a/src/workers/slowQueryAlerter.ts b/src/workers/slowQueryAlerter.ts new file mode 100644 index 0000000..11207ef --- /dev/null +++ b/src/workers/slowQueryAlerter.ts @@ -0,0 +1,241 @@ +import type { Pool } from 'pg'; +import { logger } from '../logger.js'; +import { + recordSlowQueryAlerterRun, + recordSlowQueryAlerterAlert, + recordSlowQueryAlerterQueriesAboveThreshold, +} from '../metrics.js'; + +export interface SlowQueryAlerterOptions { + webhookUrl: string; + p95ThresholdMs: number; + pollIntervalMs: number; + dedupWindowMs: number; + logger?: Pick; +} + +export interface SlowQueryEntry { + fingerprint: string; + querySample: string; + calls: number; + meanExecTime: number; + maxExecTime: number; + rows: number; +} + +export interface SlowQueryAlerterJob { + start(): void; + stop(): void; + beginShutdown(): void; + awaitIdle(): Promise; +} + +const POLL_SQL = ` + SELECT + md5(query)::text AS fingerprint, + left(query, 200)::text AS query_sample, + calls, + mean_exec_time, + max_exec_time, + rows + FROM pg_stat_statements + WHERE query NOT ILIKE 'DEALLOCATE%' + AND query NOT ILIKE 'BEGIN%' + AND query NOT ILIKE 'COMMIT%' + AND query NOT ILIKE 'ROLLBACK%' + AND mean_exec_time > $1 + ORDER BY mean_exec_time DESC + LIMIT 50 +`; + +export async function fetchSlowQueries( + pool: Pool, + thresholdMs: number, +): Promise { + const result = await pool.query(POLL_SQL, [thresholdMs]); + return result.rows; +} + +export interface DedupStore { + has(key: string): boolean; + set(key: string): void; + cleanup(): void; +} + +export function createDedupStore(windowMs: number): DedupStore { + const store = new Map(); + + return { + has(key: string): boolean { + const expiry = store.get(key); + if (expiry === undefined) return false; + if (Date.now() > expiry) { + store.delete(key); + return false; + } + return true; + }, + + set(key: string): void { + store.set(key, Date.now() + windowMs); + }, + + cleanup(): void { + const now = Date.now(); + for (const [key, expiry] of store) { + if (now > expiry) store.delete(key); + } + }, + }; +} + +function buildAlertPayload( + queries: SlowQueryEntry[], + thresholdMs: number, +): object { + return { + event: 'slow_query_alert', + timestamp: new Date().toISOString(), + data: { + thresholdMs, + queryCount: queries.length, + queries: queries.map((q) => ({ + fingerprint: q.fingerprint, + querySample: q.querySample, + calls: q.calls, + meanExecTimeMs: q.meanExecTime, + maxExecTimeMs: q.maxExecTime, + rows: q.rows, + })), + }, + }; +} + +async function postAlert( + webhookUrl: string, + payload: object, + log: Pick, +): Promise { + const body = JSON.stringify(payload); + const headers: Record = { + 'Content-Type': 'application/json', + 'User-Agent': 'Callora-SlowQueryAlerter/1.0', + }; + + try { + const response = await fetch(webhookUrl, { + method: 'POST', + body, + headers, + signal: AbortSignal.timeout(10_000), + }); + + if (!response.ok) { + log.error( + `[slowQueryAlerter] Webhook returned ${response.status}`, + response.statusText, + ); + } + } catch (err) { + log.error( + '[slowQueryAlerter] Webhook post failed:', + (err as Error).message, + ); + } +} + +export function createSlowQueryAlerterJob( + pool: Pool, + options: SlowQueryAlerterOptions, +): SlowQueryAlerterJob { + const log = options.logger ?? logger; + + if (!Number.isInteger(options.pollIntervalMs) || options.pollIntervalMs <= 0) { + throw new Error('pollIntervalMs must be a positive integer.'); + } + + if ( + !Number.isFinite(options.p95ThresholdMs) || + options.p95ThresholdMs <= 0 + ) { + throw new Error('p95ThresholdMs must be a positive number.'); + } + + if ( + !Number.isInteger(options.dedupWindowMs) || + options.dedupWindowMs <= 0 + ) { + throw new Error('dedupWindowMs must be a positive integer.'); + } + + if (typeof options.webhookUrl !== 'string' || options.webhookUrl.length === 0) { + throw new Error('webhookUrl is required'); + } + + const dedup = createDedupStore(options.dedupWindowMs); + let timer: NodeJS.Timeout | null = null; + let accepting = true; + let running: Promise | null = null; + + const tick = async (): Promise => { + if (!accepting || running) return; + + running = (async () => { + try { + const queries = await fetchSlowQueries(pool, options.p95ThresholdMs); + recordSlowQueryAlerterRun(); + recordSlowQueryAlerterQueriesAboveThreshold(queries.length); + + const newQueries = queries.filter((q) => !dedup.has(q.fingerprint)); + + for (const q of newQueries) { + dedup.set(q.fingerprint); + } + + if (newQueries.length > 0) { + recordSlowQueryAlerterAlert(); + const payload = buildAlertPayload(newQueries, options.p95ThresholdMs); + await postAlert(options.webhookUrl, payload, log); + log.info( + `[slowQueryAlerter] Alerted for ${newQueries.length} slow ` + + `queries (threshold: ${options.p95ThresholdMs}ms)`, + ); + } + } catch (error) { + log.error('[slowQueryAlerter] Job failed:', error); + } finally { + running = null; + } + })(); + + await running; + }; + + return { + start() { + if (timer || !accepting) return; + void tick(); + timer = setInterval(() => { + void tick(); + }, options.pollIntervalMs); + }, + + stop() { + if (!timer) return; + clearInterval(timer); + timer = null; + }, + + beginShutdown() { + accepting = false; + if (timer) { + clearInterval(timer); + timer = null; + } + }, + + async awaitIdle() { + await (running ?? Promise.resolve()); + }, + }; +}