Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,21 @@ GATEWAY_PROFILING_ENABLED=false
# -----------------------------------------------------------------------------
REQUEST_BODY_LIMIT=100kb
GATEWAY_BODY_LIMIT=1mb

# -----------------------------------------------------------------------------
# Slow Query Alerting — via pg_stat_statements
# Requires the pg_stat_statements extension to be enabled on the database.
# The worker polls pg_stat_statements every SLOW_QUERY_POLL_INTERVAL_MS and
# fires a webhook when any query's mean_exec_time exceeds the threshold.
# -----------------------------------------------------------------------------
# Webhook URL to POST slow query alerts to (required to enable the feature).
# When omitted the worker is not started.
SLOW_QUERY_ALERT_WEBHOOK_URL=
# P95 latency threshold in milliseconds. Any query averaging above this will
# trigger an alert. Default: 500ms.
SLOW_QUERY_P95_THRESHOLD_MS=500
# How often to poll pg_stat_statements (milliseconds). Default: 300000 (5 min).
SLOW_QUERY_POLL_INTERVAL_MS=300000
# Deduplication window per query fingerprint (seconds). A query that was
# already alerted on will not fire again within this window. Default: 3600 (1h).
SLOW_QUERY_DEDUP_WINDOW_SECONDS=3600
103 changes: 103 additions & 0 deletions docs/slow-query-alerts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Slow Query Alerting

A background worker that polls PostgreSQL's `pg_stat_statements` view and fires
a webhook when any query's average execution time (`mean_exec_time`) exceeds a
configurable threshold.

## How it works

1. Every `SLOW_QUERY_POLL_INTERVAL_MS` (default 5 min) the worker runs a query
against `pg_stat_statements` selecting rows where `mean_exec_time > threshold`.
2. Results are fingerprinted via `md5(query)` for deduplication.
3. Queries that have not been alerted on within the dedup window are POSTed as
JSON to the configured webhook URL.
4. Alerted fingerprints are tracked in-memory; suppressed fingerprints expire
after `SLOW_QUERY_DEDUP_WINDOW_SECONDS`.

## Prerequisites

Requires the `pg_stat_statements` extension to be installed on the database:

```sql
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
```

## Configuration

| Variable | Default | Description |
|---|---|---|
| `SLOW_QUERY_ALERT_WEBHOOK_URL` | — | Webhook URL (required to enable). When unset the worker is not started. |
| `SLOW_QUERY_P95_THRESHOLD_MS` | `500` | Queries with `mean_exec_time` above this (ms) trigger an alert. |
| `SLOW_QUERY_POLL_INTERVAL_MS` | `300000` | Polling interval in ms (default 5 min). |
| `SLOW_QUERY_DEDUP_WINDOW_SECONDS` | `3600` | Dedup window per query fingerprint (default 1 h). |

## Webhook Payload

The worker POSTs a JSON body with the following shape:

```json
{
"event": "slow_query_alert",
"timestamp": "2025-01-01T00:00:00.000Z",
"data": {
"thresholdMs": 500,
"queryCount": 2,
"queries": [
{
"fingerprint": "abc123def456",
"querySample": "SELECT * FROM large_table WHERE ...",
"calls": 1500,
"meanExecTimeMs": 1234.56,
"maxExecTimeMs": 8901.23,
"rows": 100
}
]
}
}
```

Headers:

| Header | Value |
|---|---|
| `Content-Type` | `application/json` |
| `User-Agent` | `Callora-SlowQueryAlerter/1.0` |

## Architecture

The worker follows the same `{ start, stop, beginShutdown, awaitIdle }` factory
pattern used by other background jobs (`idempotencySweeper`, `revenueLedgerIndexer`).

### Dedup Store

An in-memory `Map<fingerprint, expiryTimestamp>` prevents repeated alerts for
the same query signature. Entries expire after the configured dedup window and
are lazily evicted on `has()` / `cleanup()` calls.

### Graceful Shutdown

The worker registers as a `DrainableSubsystem` via the standard lifecycle
handler in `src/lifecycle/shutdown.ts`.

## Testing

```bash
npx jest src/workers/slowQueryAlerter.test.ts
```

## Metrics

The worker emits the following Prometheus metrics (via the shared
`src/metrics.ts` registry):

| Metric | Type | Description |
|---|---|---|
| `slow_query_alerter_runs_total` | Counter | Total poll runs |
| `slow_query_alerter_alerts_total` | Counter | Total alerts fired |
| `slow_query_alerter_queries_above_threshold` | Gauge | Number of queries exceeding threshold in last poll |

## Error Handling

- Poll failures are logged at `error` level and do not crash the worker.
- Webhook POST failures are logged at `error` level; no retry logic is applied
(the next poll cycle will re-attempt if the dedup window has expired).
6 changes: 6 additions & 0 deletions src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ export const envSchema = z
// Idempotency
IDEMPOTENCY_RETENTION_WINDOW_SECONDS: z.coerce.number().int().positive().default(86400),
IDEMPOTENCY_SWEEPER_INTERVAL_MS: z.coerce.number().int().positive().default(60_000),

// Slow query alerting
SLOW_QUERY_ALERT_WEBHOOK_URL: z.string().url().optional(),
SLOW_QUERY_P95_THRESHOLD_MS: z.coerce.number().positive().default(500),
SLOW_QUERY_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(300_000),
SLOW_QUERY_DEDUP_WINDOW_SECONDS: z.coerce.number().int().positive().default(3600),
})
.superRefine((values, ctx) => {
if (values.SOROBAN_RPC_ENABLED && !values.SOROBAN_RPC_URL) {
Expand Down
7 changes: 7 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,11 @@ export const config = {
warmupTimeoutMs: env.LISTINGS_CACHE_WARMUP_TIMEOUT_MS,
},
bulkEndpointLimit: env.BULK_ENDPOINT_LIMIT,

slowQueryAlerter: {
webhookUrl: env.SLOW_QUERY_ALERT_WEBHOOK_URL,
p95ThresholdMs: env.SLOW_QUERY_P95_THRESHOLD_MS,
pollIntervalMs: env.SLOW_QUERY_POLL_INTERVAL_MS,
dedupWindowMs: env.SLOW_QUERY_DEDUP_WINDOW_SECONDS * 1000,
},
} as const;
20 changes: 20 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { createApiRegistry } from './data/apiRegistry.js';
import { ApiKey } from './types/gateway.js';
import { config } from './config/index.js';
import { listingsCache } from './lib/listingsCache.js';
import { createSlowQueryAlerterJob } from './workers/slowQueryAlerter.js';

// Helper for Jest/CommonJS compat
const isDirectExecution = process.argv[1] && (process.argv[1].endsWith('index.ts') || process.argv[1].endsWith('index.js'));
Expand Down Expand Up @@ -123,6 +124,15 @@ if (isDirectExecution) {
intervalMs: config.idempotency.sweeperIntervalMs,
});

const slowQueryAlerterJob = config.slowQueryAlerter.webhookUrl
? createSlowQueryAlerterJob(pool, {
webhookUrl: config.slowQueryAlerter.webhookUrl,
p95ThresholdMs: config.slowQueryAlerter.p95ThresholdMs,
pollIntervalMs: config.slowQueryAlerter.pollIntervalMs,
dedupWindowMs: config.slowQueryAlerter.dedupWindowMs,
})
: null;

const apiKeys = new Map<string, ApiKey>([
['test-key-1', { key: 'test-key-1', developerId: 'dev_001', apiId: 'api_001' }],
['test-key-2', { key: 'test-key-2', developerId: 'dev_002', apiId: 'api_002' }],
Expand Down Expand Up @@ -186,6 +196,14 @@ if (isDirectExecution) {
awaitIdle: () => settlementReconJob.awaitIdle(),
},
];

if (slowQueryAlerterJob) {
shutdownSubsystems.push({
name: 'slow-query-alerter',
beginShutdown: () => slowQueryAlerterJob.beginShutdown(),
awaitIdle: () => slowQueryAlerterJob.awaitIdle(),
});
}
app.use('/v1/call', legacyV1DeprecationMiddleware, proxyDrainTracker.middleware);
app.use('/v1/call', proxyRouter);

Expand All @@ -202,6 +220,7 @@ if (isDirectExecution) {
settlementStatusSyncJob.stop();
settlementReconJob.stop();
idempotencySweeperJob.stop();
slowQueryAlerterJob?.stop();
await closeDb();
await Promise.allSettled([
closePgPool(),
Expand Down Expand Up @@ -234,6 +253,7 @@ if (isDirectExecution) {
settlementStatusSyncJob.start();
settlementReconJob.start();
idempotencySweeperJob.start();
slowQueryAlerterJob?.start();

const server = app.listen(PORT, () => {
console.log(`Callora backend listening on http://localhost:${PORT}`);
Expand Down
57 changes: 57 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ export function resetAllMetrics(): void {
proxyPrematureAbortsTotal.reset();
idempotencyStoreRows.reset();
gatewayUpstreamBreakerState.reset();
resetSlowQueryAlerterMetrics();
resetReplicaMetrics();
}

Expand Down Expand Up @@ -566,6 +567,62 @@ export function recordReplicaFailure(): void {
dbReplicaFailuresTotal.inc();
}

// ── Slow Query Alerter metrics ────────────────────────────────────────────────
//
// Metric: slow_query_alerter_runs_total
// Type: Counter
// Labels: (none)
// Purpose: Total number of poll cycles the slow query alerter has completed.
//
// Metric: slow_query_alerter_alerts_total
// Type: Counter
// Labels: (none)
// Purpose: Total number of webhook alerts fired.
//
// Metric: slow_query_alerter_queries_above_threshold
// Type: Gauge
// Labels: (none)
// Purpose: Number of queries exceeding the threshold in the most recent poll.
// ─────────────────────────────────────────────────────────────────────────────

const slowQueryAlerterRunsTotal = new client.Counter({
name: 'slow_query_alerter_runs_total',
help: 'Total number of slow query alerter poll cycles',
});

const slowQueryAlerterAlertsTotal = new client.Counter({
name: 'slow_query_alerter_alerts_total',
help: 'Total number of slow query alerts fired',
});

const slowQueryAlerterQueriesAboveThreshold = new client.Gauge({
name: 'slow_query_alerter_queries_above_threshold',
help: 'Number of queries exceeding the threshold in the most recent poll',
});

register.registerMetric(slowQueryAlerterRunsTotal);
register.registerMetric(slowQueryAlerterAlertsTotal);
register.registerMetric(slowQueryAlerterQueriesAboveThreshold);

export function recordSlowQueryAlerterRun(): void {
slowQueryAlerterRunsTotal.inc();
}

export function recordSlowQueryAlerterAlert(): void {
slowQueryAlerterAlertsTotal.inc();
}

export function recordSlowQueryAlerterQueriesAboveThreshold(count: number): void {
slowQueryAlerterQueriesAboveThreshold.set(count);
}

/** Reset slow query alerter metrics. Used in tests to isolate metric state. */
export function resetSlowQueryAlerterMetrics(): void {
slowQueryAlerterRunsTotal.reset();
slowQueryAlerterAlertsTotal.reset();
slowQueryAlerterQueriesAboveThreshold.reset();
}

/** Reset all replica routing metrics. Used in tests to isolate metric state. */
export function resetReplicaMetrics(): void {
dbReplicaQueriesTotal.reset();
Expand Down
Loading
Loading