From f4a03b5ecfe007bb70ba0847da5935dd0ab32fcb Mon Sep 17 00:00:00 2001 From: Itodo-S Date: Sat, 27 Jun 2026 07:52:56 +0100 Subject: [PATCH] feat(gateway): behavioral anomaly detection for adaptive rate limiting (#615) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Static per-IP / per-key limits miss distributed attacks (botnets, rotating IPs/keys). This adds unsupervised behavioral anomaly scoring that learns normal per-key usage and adaptively tightens limits when traffic looks anomalous. backend (TypeScript, jest): - backend/gateway/featureExtraction.ts — request rate, endpoint-distribution entropy, time-of-day, payload size, user-agent entropy, geographic spread. - backend/gateway/isolationForest.ts — dependency-free Isolation Forest (unsupervised), deterministic via seeded PRNG, score in [0,1]. - backend/gateway/anomalyDetector.ts — train on normal windows, score new ones. - backend/gateway/adaptiveRateLimit.ts — reduce limit 50% past the threshold (0.8), 90% past 0.95; allow-list (webhooks/health) + per-key override for false positives. - backend/gateway/middleware/adaptiveRateLimitMiddleware.ts — Express-compatible. - backend/monitoring/anomalyMetrics.ts — per-key anomaly-score Prometheus gauge. - 15 jest tests (forest, features, adaptive decisions, detector, metrics, middleware). ml-service (Python, FastAPI — mirrors the model, no new deps): - ml-service/anomaly/{isolation_forest,features,detector}.py — pure-Python port. - ml-service/routers/anomaly.py — /v1/anomaly train/score/status; registered in main. - ml-service/tests/test_anomaly.py — 6 tests. READMEs document covered criteria and documented follow-ups (Slack/PagerDuty alerting, admin dashboard screen, seasonal model + weekly retrain/drift alerts). Closes #615 --- backend/gateway/README.md | 52 ++++ .../__tests__/anomalyRateLimit.test.ts | 244 ++++++++++++++++++ backend/gateway/adaptiveRateLimit.ts | 106 ++++++++ backend/gateway/anomalyDetector.ts | 47 ++++ backend/gateway/featureExtraction.ts | 90 +++++++ backend/gateway/index.ts | 26 ++ backend/gateway/isolationForest.ts | 127 +++++++++ .../middleware/adaptiveRateLimitMiddleware.ts | 113 ++++++++ backend/monitoring/anomalyMetrics.ts | 74 ++++++ ml-service/anomaly/__init__.py | 0 ml-service/anomaly/detector.py | 45 ++++ ml-service/anomaly/features.py | 63 +++++ ml-service/anomaly/isolation_forest.py | 92 +++++++ ml-service/main.py | 3 +- ml-service/routers/anomaly.py | 77 ++++++ ml-service/tests/test_anomaly.py | 78 ++++++ 16 files changed, 1236 insertions(+), 1 deletion(-) create mode 100644 backend/gateway/README.md create mode 100644 backend/gateway/__tests__/anomalyRateLimit.test.ts create mode 100644 backend/gateway/adaptiveRateLimit.ts create mode 100644 backend/gateway/anomalyDetector.ts create mode 100644 backend/gateway/featureExtraction.ts create mode 100644 backend/gateway/index.ts create mode 100644 backend/gateway/isolationForest.ts create mode 100644 backend/gateway/middleware/adaptiveRateLimitMiddleware.ts create mode 100644 backend/monitoring/anomalyMetrics.ts create mode 100644 ml-service/anomaly/__init__.py create mode 100644 ml-service/anomaly/detector.py create mode 100644 ml-service/anomaly/features.py create mode 100644 ml-service/anomaly/isolation_forest.py create mode 100644 ml-service/routers/anomaly.py create mode 100644 ml-service/tests/test_anomaly.py diff --git a/backend/gateway/README.md b/backend/gateway/README.md new file mode 100644 index 00000000..e4b134c1 --- /dev/null +++ b/backend/gateway/README.md @@ -0,0 +1,52 @@ +# Rate-Limit Anomaly Detection Gateway (#615) + +Behavioral anomaly scoring + adaptive rate limiting to catch distributed attacks +(botnets, rotating IPs/API keys) that slip past static per-IP / per-key limits. + +## Pieces + +- **`featureExtraction.ts`** — turns a window of recent requests into a feature + vector: request rate, endpoint-distribution entropy, time-of-day, average + payload size, user-agent entropy, geographic (distinct-IP) spread. +- **`isolationForest.ts`** — dependency-free Isolation Forest (unsupervised) that + scores anomalies in `[0, 1]`; deterministic via a seeded PRNG. +- **`anomalyDetector.ts`** — trains on normal-traffic windows and scores new ones. +- **`adaptiveRateLimit.ts`** — when a key's score crosses the threshold (default + `0.8`) the effective limit is reduced 50%, and 90% past `0.95`. Allow-listed + paths (webhooks/health) bypass reduction; per-key overrides handle false + positives. +- **`middleware/adaptiveRateLimitMiddleware.ts`** — Express-compatible middleware + wiring it together (sliding window per key, scoring, enforcement, headers). +- **`../monitoring/anomalyMetrics.ts`** — per-key anomaly-score gauge + (Prometheus text exposition + the repo's flat metric shape). + +The Python **`ml-service`** mirrors the model (`ml-service/anomaly/`, +`routers/anomaly.py` at `/v1/anomaly`) so scoring can run in-process in the +gateway or be delegated to the ML service. + +## Tests + +```bash +# Backend (TS) +npx jest --config jest.backend.config.js backend/gateway/__tests__/anomalyRateLimit.test.ts + +# ml-service (Python) +cd ml-service && python -m pytest tests/test_anomaly.py +``` + +## Covered acceptance criteria + +- Feature extraction (rate, endpoint distribution, time-of-day, payload size, + user-agent entropy, geographic spread). +- Isolation Forest anomaly scoring with a configurable threshold. +- Adaptive limiting: reduce by 50% past threshold, 90% past the severe threshold. +- False-positive handling: allow-listed patterns + per-key manual override. +- Anomaly-score Prometheus metric per key (`backend/monitoring`). + +## Follow-ups (out of this PR's core) + +- Real-time Slack/PagerDuty alerting on high-confidence attacks (score > 0.95) — + the detector already surfaces `high_confidence`. +- Admin `RateLimitDashboardScreen` (`mobile/app/screens/`). +- Seasonal model + event-day whitelisting; weekly auto-retrain + drift alerting + (hooks belong in `ml-service/jobs/` / `ml-service/retrain.py`). diff --git a/backend/gateway/__tests__/anomalyRateLimit.test.ts b/backend/gateway/__tests__/anomalyRateLimit.test.ts new file mode 100644 index 00000000..c63b4207 --- /dev/null +++ b/backend/gateway/__tests__/anomalyRateLimit.test.ts @@ -0,0 +1,244 @@ +/** + * Tests for the rate-limit anomaly detection gateway (#615). + */ + +import { IsolationForest } from "../isolationForest"; +import { extractFeatures, toVector } from "../featureExtraction"; +import { AnomalyDetector } from "../anomalyDetector"; +import { decideLimit, isAllowlisted, severityFor } from "../adaptiveRateLimit"; +import { AnomalyMetrics } from "../../monitoring/anomalyMetrics"; +import { + createAdaptiveRateLimitMiddleware, + type MinimalRequest, +} from "../middleware/adaptiveRateLimitMiddleware"; +import type { RequestSample } from "../featureExtraction"; + +function normalVectors(): number[][] { + // A "normal" cluster with spread in every dimension (constant dims would make + // the forest pick degenerate splits and dilute discrimination). + const out: number[][] = []; + for (let i = 0; i < 200; i++) { + out.push([ + 1 + (i % 5) * 0.2, + 2 + (i % 3) * 0.3, + 0.4 + (i % 4) * 0.05, + 500 + (i % 7) * 10, + 0.1 + (i % 3) * 0.02, + 1 + (i % 2), + ]); + } + return out; +} + +describe("IsolationForest", () => { + it("scores an outlier higher than an inlier", () => { + const forest = new IsolationForest({ trees: 100, sampleSize: 128, seed: 7 }).fit(normalVectors()); + const inlier = forest.score([1, 2, 0.5, 500, 0.1, 1]); + const outlier = forest.score([50, 9, 0.99, 90000, 5, 40]); + expect(outlier).toBeGreaterThan(inlier); + expect(inlier).toBeGreaterThanOrEqual(0); + expect(outlier).toBeLessThanOrEqual(1); + }); + + it("is deterministic for a fixed seed", () => { + const a = new IsolationForest({ seed: 1 }).fit(normalVectors()).score([1, 2, 0.5, 500, 0.1, 1]); + const b = new IsolationForest({ seed: 1 }).fit(normalVectors()).score([1, 2, 0.5, 500, 0.1, 1]); + expect(a).toBe(b); + }); +}); + +describe("featureExtraction", () => { + function reqs(n: number, endpoint: string, spanMs: number): RequestSample[] { + const out: RequestSample[] = []; + for (let i = 0; i < n; i++) { + out.push({ + timestamp: 1_700_000_000_000 + (i * spanMs) / n, + endpoint, + payloadSize: 100, + userAgent: "agent/1", + ip: "1.2.3.4", + }); + } + return out; + } + + it("computes request rate and entropy", () => { + const f = extractFeatures(reqs(60, "/a", 60_000)); + expect(f.requestRate).toBeGreaterThan(0); + expect(f.endpointEntropy).toBe(0); // single endpoint => zero entropy + expect(f.geoSpread).toBe(1); + expect(toVector(f)).toHaveLength(6); + }); + + it("higher endpoint diversity raises entropy", () => { + const mixed: RequestSample[] = [ + { timestamp: 1, endpoint: "/a", payloadSize: 1, userAgent: "x", ip: "1" }, + { timestamp: 2, endpoint: "/b", payloadSize: 1, userAgent: "y", ip: "2" }, + { timestamp: 3, endpoint: "/c", payloadSize: 1, userAgent: "z", ip: "3" }, + ]; + expect(extractFeatures(mixed).endpointEntropy).toBeGreaterThan(0); + expect(extractFeatures(mixed).geoSpread).toBe(3); + }); + + it("handles an empty window", () => { + expect(extractFeatures([]).requestRate).toBe(0); + }); +}); + +describe("decideLimit (adaptive limiting)", () => { + const config = { baseLimit: 100, threshold: 0.8, severeThreshold: 0.95 }; + + it("keeps the base limit below threshold", () => { + const d = decideLimit({ key: "k", score: 0.3, config }); + expect(d.action).toBe("normal"); + expect(d.effectiveLimit).toBe(100); + }); + + it("reduces by 50% at the threshold", () => { + const d = decideLimit({ key: "k", score: 0.85, config }); + expect(d.action).toBe("reduced"); + expect(d.effectiveLimit).toBe(50); + expect(d.severity).toBe("medium"); + }); + + it("reduces by 90% at the severe threshold", () => { + const d = decideLimit({ key: "k", score: 0.97, config }); + expect(d.action).toBe("severely-reduced"); + expect(d.effectiveLimit).toBe(10); + expect(d.severity).toBe("high"); + }); + + it("allow-listed paths bypass reduction", () => { + const d = decideLimit({ + key: "k", + score: 0.99, + path: "/webhooks/stripe", + config: { ...config, allowlistPaths: ["/webhooks", "/health"] }, + }); + expect(d.action).toBe("allowlisted"); + expect(d.effectiveLimit).toBe(100); + }); + + it("per-key override wins (false-positive handling)", () => { + const d = decideLimit({ + key: "trusted", + score: 0.99, + config: { ...config, overrides: { trusted: 5000 } }, + }); + expect(d.action).toBe("override"); + expect(d.effectiveLimit).toBe(5000); + }); + + it("isAllowlisted matches by prefix; severityFor buckets scores", () => { + expect(isAllowlisted("/health/live", ["/health"])).toBe(true); + expect(isAllowlisted("/api/x", ["/health"])).toBe(false); + expect(severityFor(0.96, config)).toBe("high"); + expect(severityFor(0.81, config)).toBe("medium"); + expect(severityFor(0.1, config)).toBe("low"); + }); +}); + +describe("AnomalyDetector", () => { + // Realistic normal traffic varies window-to-window, which the model needs in + // order to learn a distribution (identical windows give a degenerate forest). + function normalWindow(w: number): RequestSample[] { + const n = 40 + (w % 20); // ~40–60 requests/min + const base = 1_700_000_000_000 + w * 60_000; + const endpoints = w % 2 ? ["/api/subscriptions", "/api/usage"] : ["/api/subscriptions"]; + return Array.from({ length: n }, (_, i) => ({ + timestamp: base + Math.floor((i * 60_000) / n), + endpoint: endpoints[i % endpoints.length], + payloadSize: 350 + (i % 100), + userAgent: "app/1.0", + ip: "10.0.0.1", + })); + } + + it("scores anomalous windows higher than normal ones", () => { + const detector = new AnomalyDetector({ seed: 3 }).fit( + Array.from({ length: 60 }, (_, w) => normalWindow(w)), + ); + + const attackWindow: RequestSample[] = Array.from({ length: 5000 }, (_, i) => ({ + timestamp: 1_700_000_000_000 + i, // 5000 reqs in 5s = huge rate + endpoint: `/api/ep${i % 50}`, // scanning many endpoints + payloadSize: 50_000, + userAgent: `bot/${i % 100}`, // rotating UAs + ip: `192.168.${i % 255}.${i % 255}`, // distributed IPs + })); + + const normal = detector.scoreWindow(normalWindow(3)).score; + const attack = detector.scoreWindow(attackWindow).score; + expect(attack).toBeGreaterThan(normal); + }); +}); + +describe("AnomalyMetrics", () => { + it("tracks per-key score, max, and high-confidence count", () => { + const m = new AnomalyMetrics(0.95); + m.record("k1", 0.2); + m.record("k2", 0.97); + expect(m.scoreFor("k2")).toBe(0.97); + const flat = m.getMetrics(); + expect(flat.anomaly_keys_tracked).toBe(2); + expect(flat.anomaly_score_max).toBe(0.97); + expect(flat.anomaly_high_confidence_total).toBe(1); + expect(m.toPrometheus()).toContain('rate_limit_anomaly_score{key="k2"} 0.97'); + }); +}); + +describe("adaptive rate-limit middleware", () => { + function fakeReqRes(path: string, apiKey: string) { + const req: MinimalRequest = { + path, + headers: { "x-api-key": apiKey, "user-agent": "app", "content-length": "100" }, + ip: "10.0.0.5", + }; + const res = { + statusCode: 200, + headers: {} as Record, + body: undefined as unknown, + setHeader(n: string, v: string | number) { + this.headers[n] = v; + }, + status(code: number) { + this.statusCode = code; + return this; + }, + json(b: unknown) { + this.body = b; + }, + }; + return { req, res }; + } + + it("passes normal traffic and 429s once the (unfitted) base limit is exceeded", () => { + const detector = new AnomalyDetector(); // not fitted -> score 0 -> base limit + const mw = createAdaptiveRateLimitMiddleware({ + detector, + config: { baseLimit: 3 }, + windowMs: 60_000, + }); + + let allowed = 0; + let blocked = 0; + for (let i = 0; i < 5; i++) { + const { req, res } = fakeReqRes("/api/x", "key-A"); + mw(req, res, () => { + allowed += 1; + }); + if (res.statusCode === 429) blocked += 1; + } + expect(allowed).toBe(3); + expect(blocked).toBe(2); + }); + + it("sets anomaly headers", () => { + const detector = new AnomalyDetector(); + const mw = createAdaptiveRateLimitMiddleware({ detector, config: { baseLimit: 100 } }); + const { req, res } = fakeReqRes("/api/x", "key-B"); + mw(req, res, () => {}); + expect(res.headers["X-RateLimit-Limit"]).toBe(100); + expect(res.headers["X-Anomaly-Action"]).toBe("normal"); + }); +}); diff --git a/backend/gateway/adaptiveRateLimit.ts b/backend/gateway/adaptiveRateLimit.ts new file mode 100644 index 00000000..0b41672b --- /dev/null +++ b/backend/gateway/adaptiveRateLimit.ts @@ -0,0 +1,106 @@ +/** + * Adaptive rate-limit decision logic driven by anomaly scores (#615). + * + * When a key's behavioral anomaly score crosses the threshold, its effective + * rate limit is reduced (moderately, then severely). Allow-listed paths + * (webhook callbacks, health checks) bypass reduction, and per-key manual + * overrides handle false positives. Pure decision function — the middleware + * wraps it. + */ + +export type LimitAction = + | "normal" + | "reduced" + | "severely-reduced" + | "allowlisted" + | "override"; + +export type Severity = "low" | "medium" | "high"; + +export interface AdaptiveConfig { + baseLimit: number; + /** Score at/above which to reduce the limit (default 0.8). */ + threshold?: number; + /** Score at/above which to reduce severely (default 0.95). */ + severeThreshold?: number; + /** Fractional reduction at `threshold` (default 0.5 = -50%). */ + reduceModerate?: number; + /** Fractional reduction at `severeThreshold` (default 0.9 = -90%). */ + reduceSevere?: number; + /** Path prefixes that bypass adaptive reduction. */ + allowlistPaths?: string[]; + /** Per-key fixed limit overrides (manual false-positive handling). */ + overrides?: Record; +} + +export interface LimitDecision { + key: string; + anomalyScore: number; + effectiveLimit: number; + action: LimitAction; + severity: Severity; +} + +const DEFAULTS = { + threshold: 0.8, + severeThreshold: 0.95, + reduceModerate: 0.5, + reduceSevere: 0.9, +}; + +export function isAllowlisted(path: string, allowlist: string[] = []): boolean { + return allowlist.some((p) => path === p || path.startsWith(p)); +} + +export function severityFor(score: number, config: AdaptiveConfig): Severity { + const severe = config.severeThreshold ?? DEFAULTS.severeThreshold; + const threshold = config.threshold ?? DEFAULTS.threshold; + if (score >= severe) return "high"; + if (score >= threshold) return "medium"; + return "low"; +} + +export function decideLimit(params: { + key: string; + score: number; + path?: string; + config: AdaptiveConfig; +}): LimitDecision { + const { key, score, path = "", config } = params; + const threshold = config.threshold ?? DEFAULTS.threshold; + const severeThreshold = config.severeThreshold ?? DEFAULTS.severeThreshold; + const reduceModerate = config.reduceModerate ?? DEFAULTS.reduceModerate; + const reduceSevere = config.reduceSevere ?? DEFAULTS.reduceSevere; + const severity = severityFor(score, config); + + // Manual per-key override wins (false-positive handling). + if (config.overrides && key in config.overrides) { + return { key, anomalyScore: score, effectiveLimit: config.overrides[key], action: "override", severity }; + } + + // Allow-listed traffic is never throttled by the anomaly path. + if (isAllowlisted(path, config.allowlistPaths)) { + return { key, anomalyScore: score, effectiveLimit: config.baseLimit, action: "allowlisted", severity }; + } + + if (score >= severeThreshold) { + return { + key, + anomalyScore: score, + // "reduce by X%": keep base − ⌊base·X⌋ (avoids float drift, e.g. 100·0.1). + effectiveLimit: config.baseLimit - Math.floor(config.baseLimit * reduceSevere), + action: "severely-reduced", + severity, + }; + } + if (score >= threshold) { + return { + key, + anomalyScore: score, + effectiveLimit: config.baseLimit - Math.floor(config.baseLimit * reduceModerate), + action: "reduced", + severity, + }; + } + return { key, anomalyScore: score, effectiveLimit: config.baseLimit, action: "normal", severity }; +} diff --git a/backend/gateway/anomalyDetector.ts b/backend/gateway/anomalyDetector.ts new file mode 100644 index 00000000..ec5e67e3 --- /dev/null +++ b/backend/gateway/anomalyDetector.ts @@ -0,0 +1,47 @@ +/** + * Anomaly detector: feature extraction + Isolation Forest scoring (#615). + * + * Learns normal API usage from windows of historical requests and scores a new + * window's anomaly level. Stateless w.r.t. requests beyond the fitted model, so + * it's easy to retrain (see ml-service for the production training job). + */ + +import { + extractFeatures, + toVector, + type FeatureBreakdown, + type RequestSample, +} from "./featureExtraction"; +import { IsolationForest, type IsolationForestOptions } from "./isolationForest"; + +export interface AnomalyResult { + score: number; // [0,1], higher = more anomalous + features: FeatureBreakdown; +} + +export class AnomalyDetector { + private forest: IsolationForest; + private fitted = false; + + constructor(options: IsolationForestOptions = {}) { + this.forest = new IsolationForest(options); + } + + /** Train on windows representing normal traffic (one vector per window). */ + fit(normalWindows: RequestSample[][]): this { + const vectors = normalWindows.map((w) => toVector(extractFeatures(w))); + this.forest.fit(vectors); + this.fitted = true; + return this; + } + + isFitted(): boolean { + return this.fitted; + } + + /** Score a window of recent requests for a key/user. */ + scoreWindow(window: RequestSample[]): AnomalyResult { + const features = extractFeatures(window); + return { score: this.forest.score(toVector(features)), features }; + } +} diff --git a/backend/gateway/featureExtraction.ts b/backend/gateway/featureExtraction.ts new file mode 100644 index 00000000..9bf0dd2e --- /dev/null +++ b/backend/gateway/featureExtraction.ts @@ -0,0 +1,90 @@ +/** + * Behavioral feature extraction for rate-limit anomaly detection (#615). + * + * Turns a window of recent requests (per API key / user) into a fixed-length + * numeric feature vector for the Isolation Forest. Pure and side-effect free. + */ + +export interface RequestSample { + timestamp: number; // epoch ms + endpoint: string; + payloadSize: number; // bytes + userAgent: string; + ip: string; +} + +export interface FeatureBreakdown { + requestRate: number; // requests per second over the window + endpointEntropy: number; // Shannon entropy of endpoint distribution (bits) + timeOfDay: number; // 0..1 fraction of day of the latest request + avgPayloadSize: number; // bytes + userAgentEntropy: number; // entropy over user-agent strings (bits) + geoSpread: number; // distinct IPs in the window +} + +/** Order of features in the vector fed to the model. */ +export const FEATURE_ORDER: (keyof FeatureBreakdown)[] = [ + "requestRate", + "endpointEntropy", + "timeOfDay", + "avgPayloadSize", + "userAgentEntropy", + "geoSpread", +]; + +function shannonEntropy(counts: number[]): number { + const total = counts.reduce((a, b) => a + b, 0); + if (total === 0) return 0; + let h = 0; + for (const c of counts) { + if (c === 0) continue; + const p = c / total; + h -= p * Math.log2(p); + } + return h; +} + +function distribution(items: T[], key: (t: T) => string): number[] { + const counts = new Map(); + for (const item of items) { + const k = key(item); + counts.set(k, (counts.get(k) ?? 0) + 1); + } + return [...counts.values()]; +} + +/** Extract an interpretable feature breakdown from a request window. */ +export function extractFeatures(window: RequestSample[]): FeatureBreakdown { + if (window.length === 0) { + return { + requestRate: 0, + endpointEntropy: 0, + timeOfDay: 0, + avgPayloadSize: 0, + userAgentEntropy: 0, + geoSpread: 0, + }; + } + + const timestamps = window.map((r) => r.timestamp); + const minT = Math.min(...timestamps); + const maxT = Math.max(...timestamps); + const spanSec = Math.max((maxT - minT) / 1000, 1); // avoid div-by-zero / inflated rates + const latest = new Date(maxT); + const secondsIntoDay = + latest.getUTCHours() * 3600 + latest.getUTCMinutes() * 60 + latest.getUTCSeconds(); + + return { + requestRate: window.length / spanSec, + endpointEntropy: shannonEntropy(distribution(window, (r) => r.endpoint)), + timeOfDay: secondsIntoDay / 86400, + avgPayloadSize: window.reduce((a, r) => a + r.payloadSize, 0) / window.length, + userAgentEntropy: shannonEntropy(distribution(window, (r) => r.userAgent)), + geoSpread: new Set(window.map((r) => r.ip)).size, + }; +} + +/** Convert a breakdown to the ordered numeric vector for the model. */ +export function toVector(features: FeatureBreakdown): number[] { + return FEATURE_ORDER.map((k) => features[k]); +} diff --git a/backend/gateway/index.ts b/backend/gateway/index.ts new file mode 100644 index 00000000..38c0bfe1 --- /dev/null +++ b/backend/gateway/index.ts @@ -0,0 +1,26 @@ +/** + * Rate-limit anomaly detection gateway (#615). + * + * Behavioral anomaly scoring (Isolation Forest) + adaptive rate limiting that + * catches distributed attacks which static per-IP / per-key limits miss. + */ + +export { IsolationForest, makeRng, type FeatureVector } from "./isolationForest"; +export { + extractFeatures, + toVector, + FEATURE_ORDER, + type RequestSample, + type FeatureBreakdown, +} from "./featureExtraction"; +export { AnomalyDetector, type AnomalyResult } from "./anomalyDetector"; +export { + decideLimit, + isAllowlisted, + severityFor, + type AdaptiveConfig, + type LimitDecision, + type LimitAction, + type Severity, +} from "./adaptiveRateLimit"; +export { createAdaptiveRateLimitMiddleware } from "./middleware/adaptiveRateLimitMiddleware"; diff --git a/backend/gateway/isolationForest.ts b/backend/gateway/isolationForest.ts new file mode 100644 index 00000000..079351fe --- /dev/null +++ b/backend/gateway/isolationForest.ts @@ -0,0 +1,127 @@ +/** + * Dependency-free Isolation Forest for unsupervised anomaly scoring (#615). + * + * Anomalies are "few and different", so they isolate with shorter random + * partition paths. We build an ensemble of random isolation trees and score a + * point by its average path length, normalized to a [0, 1] anomaly score + * (Liu, Ting & Zhou, 2008). Implemented without numpy/sklearn so it runs in the + * Node backend with no extra dependencies; the Python ml-service mirrors it. + */ + +export type FeatureVector = number[]; + +interface ITreeNode { + // Internal node + splitFeature?: number; + splitValue?: number; + left?: ITreeNode; + right?: ITreeNode; + // External (leaf) node + size?: number; +} + +/** Deterministic, seedable PRNG (mulberry32) so training/scoring is testable. */ +export function makeRng(seed: number): () => number { + let a = seed >>> 0; + return () => { + a |= 0; + a = (a + 0x6d2b79f5) | 0; + let t = Math.imul(a ^ (a >>> 15), 1 | a); + t = (t + Math.imul(t ^ (t >>> 7), 61 | t)) ^ t; + return ((t ^ (t >>> 14)) >>> 0) / 4294967296; + }; +} + +/** Average path length of an unsuccessful BST search over n points (c(n)). */ +function cFactor(n: number): number { + if (n <= 1) return 0; + if (n === 2) return 1; + const H = Math.log(n - 1) + 0.5772156649; // harmonic number approximation + return 2 * H - (2 * (n - 1)) / n; +} + +function buildTree( + data: FeatureVector[], + heightLimit: number, + rng: () => number, + depth = 0, +): ITreeNode { + if (depth >= heightLimit || data.length <= 1) { + return { size: data.length }; + } + const dims = data[0].length; + const feature = Math.floor(rng() * dims); + let min = Infinity; + let max = -Infinity; + for (const row of data) { + if (row[feature] < min) min = row[feature]; + if (row[feature] > max) max = row[feature]; + } + if (min === max) return { size: data.length }; + + const splitValue = min + rng() * (max - min); + const left: FeatureVector[] = []; + const right: FeatureVector[] = []; + for (const row of data) (row[feature] < splitValue ? left : right).push(row); + + return { + splitFeature: feature, + splitValue, + left: buildTree(left, heightLimit, rng, depth + 1), + right: buildTree(right, heightLimit, rng, depth + 1), + }; +} + +function pathLength(point: FeatureVector, node: ITreeNode, depth = 0): number { + if (node.size !== undefined) { + return depth + cFactor(node.size); + } + const goLeft = point[node.splitFeature!] < node.splitValue!; + return pathLength(point, goLeft ? node.left! : node.right!, depth + 1); +} + +export interface IsolationForestOptions { + trees?: number; // ensemble size + sampleSize?: number; // subsample per tree + seed?: number; +} + +export class IsolationForest { + private trees: ITreeNode[] = []; + private normFactor = 1; + private readonly opts: Required; + + constructor(options: IsolationForestOptions = {}) { + this.opts = { + trees: options.trees ?? 100, + sampleSize: options.sampleSize ?? 256, + seed: options.seed ?? 42, + }; + } + + fit(data: FeatureVector[]): this { + if (data.length === 0) throw new Error("cannot fit on empty data"); + const rng = makeRng(this.opts.seed); + const sampleSize = Math.min(this.opts.sampleSize, data.length); + const heightLimit = Math.ceil(Math.log2(Math.max(2, sampleSize))); + this.normFactor = cFactor(sampleSize); + this.trees = []; + for (let i = 0; i < this.opts.trees; i++) { + const sample: FeatureVector[] = []; + for (let s = 0; s < sampleSize; s++) { + sample.push(data[Math.floor(rng() * data.length)]); + } + this.trees.push(buildTree(sample, heightLimit, rng)); + } + return this; + } + + /** Anomaly score in [0, 1]; higher = more anomalous (≈0.5 is borderline). */ + score(point: FeatureVector): number { + if (this.trees.length === 0) throw new Error("forest not fitted"); + let total = 0; + for (const tree of this.trees) total += pathLength(point, tree); + const avg = total / this.trees.length; + return Math.pow(2, -avg / (this.normFactor || 1)); + } +} diff --git a/backend/gateway/middleware/adaptiveRateLimitMiddleware.ts b/backend/gateway/middleware/adaptiveRateLimitMiddleware.ts new file mode 100644 index 00000000..717ad939 --- /dev/null +++ b/backend/gateway/middleware/adaptiveRateLimitMiddleware.ts @@ -0,0 +1,113 @@ +/** + * Adaptive rate-limit middleware (#615). + * + * Maintains a per-key sliding window of recent requests, scores the window with + * the anomaly detector, derives an adaptive effective limit, enforces it, and + * records the anomaly score for monitoring. Typed structurally so it works with + * Express without importing it. + */ + +import { decideLimit, type AdaptiveConfig, type LimitDecision } from "../adaptiveRateLimit"; +import { type AnomalyDetector } from "../anomalyDetector"; +import { type RequestSample } from "../featureExtraction"; +import { type AnomalyMetrics } from "../../monitoring/anomalyMetrics"; + +export interface MinimalRequest { + path: string; + headers: Record; + socket?: { remoteAddress?: string }; + ip?: string; +} +export interface MinimalResponse { + setHeader(name: string, value: string | number): void; + status(code: number): MinimalResponse; + json(body: unknown): void; +} +export type Next = () => void; + +export interface MiddlewareOptions { + detector: AnomalyDetector; + config: AdaptiveConfig; + metrics?: AnomalyMetrics; + /** Sliding window length used for scoring. */ + windowSize?: number; + /** Window duration in ms; counts reset per window. */ + windowMs?: number; + /** Derive the rate-limit key (default: API key header or IP). */ + keyFn?: (req: MinimalRequest) => string; + /** Clock injection for tests. */ + now?: () => number; +} + +interface KeyState { + samples: RequestSample[]; + windowStart: number; + count: number; +} + +function header(req: MinimalRequest, name: string): string { + const v = req.headers[name]; + return Array.isArray(v) ? (v[0] ?? "") : (v ?? ""); +} + +function defaultKey(req: MinimalRequest): string { + return header(req, "x-api-key") || req.ip || req.socket?.remoteAddress || "anonymous"; +} + +export function createAdaptiveRateLimitMiddleware(options: MiddlewareOptions) { + const { + detector, + config, + metrics, + windowSize = 100, + windowMs = 60_000, + keyFn = defaultKey, + now = () => Date.now(), + } = options; + + const states = new Map(); + + return function adaptiveRateLimit(req: MinimalRequest, res: MinimalResponse, next: Next): void { + const ts = now(); + const key = keyFn(req); + let state = states.get(key); + if (!state || ts - state.windowStart >= windowMs) { + state = { samples: [], windowStart: ts, count: 0 }; + states.set(key, state); + } + + state.count += 1; + state.samples.push({ + timestamp: ts, + endpoint: req.path, + payloadSize: Number(header(req, "content-length")) || 0, + userAgent: header(req, "user-agent"), + ip: req.ip || req.socket?.remoteAddress || "", + }); + if (state.samples.length > windowSize) state.samples.shift(); + + let decision: LimitDecision; + if (detector.isFitted()) { + const { score } = detector.scoreWindow(state.samples); + metrics?.record(key, score, ts); + decision = decideLimit({ key, score, path: req.path, config }); + } else { + decision = decideLimit({ key, score: 0, path: req.path, config }); + } + + res.setHeader("X-RateLimit-Limit", decision.effectiveLimit); + res.setHeader("X-Anomaly-Score", decision.anomalyScore.toFixed(4)); + res.setHeader("X-Anomaly-Action", decision.action); + + if (state.count > decision.effectiveLimit) { + res.status(429).json({ + error: "rate_limited", + reason: decision.action, + anomalyScore: decision.anomalyScore, + limit: decision.effectiveLimit, + }); + return; + } + next(); + }; +} diff --git a/backend/monitoring/anomalyMetrics.ts b/backend/monitoring/anomalyMetrics.ts new file mode 100644 index 00000000..8387a460 --- /dev/null +++ b/backend/monitoring/anomalyMetrics.ts @@ -0,0 +1,74 @@ +/** + * Anomaly-score metrics exporter (#615). + * + * Tracks the latest behavioral anomaly score per API key and exposes both the + * repo's flat `Record` metric shape (see lockMetrics.ts) and a + * Prometheus text-exposition rendering with a per-key gauge. + */ + +export interface AnomalyMetricSample { + key: string; + score: number; + at: number; +} + +export class AnomalyMetrics { + private latest = new Map(); + private highConfidenceTotal = 0; + private readonly highConfidenceThreshold: number; + + constructor(highConfidenceThreshold = 0.95) { + this.highConfidenceThreshold = highConfidenceThreshold; + } + + record(key: string, score: number, at: number = Date.now()): void { + this.latest.set(key, { key, score, at }); + if (score >= this.highConfidenceThreshold) this.highConfidenceTotal += 1; + } + + scoreFor(key: string): number | undefined { + return this.latest.get(key)?.score; + } + + /** Flat metrics for the generic exporter (mirrors lockMetrics.ts shape). */ + getMetrics(): Record { + let max = 0; + for (const s of this.latest.values()) max = Math.max(max, s.score); + return { + anomaly_keys_tracked: this.latest.size, + anomaly_score_max: max, + anomaly_high_confidence_total: this.highConfidenceTotal, + }; + } + + /** Prometheus text exposition with a per-key gauge. */ + toPrometheus(): string { + const lines = [ + "# HELP rate_limit_anomaly_score Behavioral anomaly score per API key (0-1).", + "# TYPE rate_limit_anomaly_score gauge", + ]; + for (const s of this.latest.values()) { + lines.push(`rate_limit_anomaly_score{key="${escapeLabel(s.key)}"} ${s.score}`); + } + lines.push("# HELP rate_limit_anomaly_high_confidence_total High-confidence anomalies seen."); + lines.push("# TYPE rate_limit_anomaly_high_confidence_total counter"); + lines.push(`rate_limit_anomaly_high_confidence_total ${this.highConfidenceTotal}`); + return lines.join("\n") + "\n"; + } + + reset(): void { + this.latest.clear(); + this.highConfidenceTotal = 0; + } +} + +function escapeLabel(value: string): string { + return value.replace(/\\/g, "\\\\").replace(/"/g, '\\"').replace(/\n/g, "\\n"); +} + +export const anomalyMetrics = new AnomalyMetrics(); + +export const anomalyMetricsExporter = { + getMetrics: () => anomalyMetrics.getMetrics(), + resetMetrics: () => anomalyMetrics.reset(), +}; diff --git a/ml-service/anomaly/__init__.py b/ml-service/anomaly/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ml-service/anomaly/detector.py b/ml-service/anomaly/detector.py new file mode 100644 index 00000000..c50a1bab --- /dev/null +++ b/ml-service/anomaly/detector.py @@ -0,0 +1,45 @@ +"""Anomaly detector: feature extraction + Isolation Forest scoring (#615).""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List + +from .features import extract_features, to_vector +from .isolation_forest import IsolationForest + + +@dataclass +class AnomalyResult: + score: float + features: Dict[str, float] + + +class AnomalyDetector: + def __init__(self, trees: int = 100, sample_size: int = 256, seed: int = 42) -> None: + self._forest = IsolationForest(trees=trees, sample_size=sample_size, seed=seed) + self._fitted = False + + @property + def fitted(self) -> bool: + return self._fitted + + def fit(self, normal_windows: List[List[dict]]) -> "AnomalyDetector": + vectors = [to_vector(extract_features(w)) for w in normal_windows] + self._forest.fit(vectors) + self._fitted = True + return self + + def score_window(self, window: List[dict]) -> AnomalyResult: + feats = extract_features(window) + return AnomalyResult(score=self._forest.score(to_vector(feats)), features=feats) + + +def recommend_limit(score: float, base_limit: int, threshold: float = 0.8, + severe: float = 0.95) -> dict: + """Adaptive limit recommendation mirroring the gateway's decision logic.""" + if score >= severe: + return {"action": "severely-reduced", "effective_limit": base_limit // 10, "severity": "high"} + if score >= threshold: + return {"action": "reduced", "effective_limit": base_limit // 2, "severity": "medium"} + return {"action": "normal", "effective_limit": base_limit, "severity": "low"} diff --git a/ml-service/anomaly/features.py b/ml-service/anomaly/features.py new file mode 100644 index 00000000..27340996 --- /dev/null +++ b/ml-service/anomaly/features.py @@ -0,0 +1,63 @@ +"""Behavioral feature extraction for rate-limit anomaly detection (#615). + +Mirrors backend/gateway/featureExtraction.ts. Turns a window of recent requests +into a fixed-length numeric feature vector. +""" + +from __future__ import annotations + +import math +from collections import Counter +from datetime import datetime, timezone +from typing import Dict, List + +FEATURE_ORDER = [ + "request_rate", + "endpoint_entropy", + "time_of_day", + "avg_payload_size", + "user_agent_entropy", + "geo_spread", +] + + +def _entropy(counts: List[int]) -> float: + total = sum(counts) + if total == 0: + return 0.0 + h = 0.0 + for c in counts: + if c == 0: + continue + p = c / total + h -= p * math.log2(p) + return h + + +def extract_features(window: List[dict]) -> Dict[str, float]: + """`window` is a list of request samples with keys: + timestamp_ms, endpoint, payload_size, user_agent, ip. + """ + if not window: + return {k: 0.0 for k in FEATURE_ORDER} + + timestamps = [r["timestamp_ms"] for r in window] + span_sec = max((max(timestamps) - min(timestamps)) / 1000.0, 1.0) + latest = datetime.fromtimestamp(max(timestamps) / 1000.0, tz=timezone.utc) + seconds_into_day = latest.hour * 3600 + latest.minute * 60 + latest.second + + endpoint_counts = list(Counter(r["endpoint"] for r in window).values()) + ua_counts = list(Counter(r["user_agent"] for r in window).values()) + + return { + "request_rate": len(window) / span_sec, + "endpoint_entropy": _entropy(endpoint_counts), + "time_of_day": seconds_into_day / 86400.0, + "avg_payload_size": sum(r["payload_size"] for r in window) / len(window), + "user_agent_entropy": _entropy(ua_counts), + "geo_spread": float(len({r["ip"] for r in window})), + } + + +def to_vector(features: Dict[str, float]) -> List[float]: + return [features[k] for k in FEATURE_ORDER] diff --git a/ml-service/anomaly/isolation_forest.py b/ml-service/anomaly/isolation_forest.py new file mode 100644 index 00000000..d893c631 --- /dev/null +++ b/ml-service/anomaly/isolation_forest.py @@ -0,0 +1,92 @@ +"""Dependency-free Isolation Forest for rate-limit anomaly scoring (#615). + +Mirrors backend/gateway/isolationForest.ts so the ml-service and the gateway +agree on scoring semantics. Pure Python (stdlib only) — no numpy/sklearn — to +match the existing ml-service dependency set. +""" + +from __future__ import annotations + +import math +import random +from dataclasses import dataclass, field +from typing import List, Optional + +Vector = List[float] + + +def _c_factor(n: int) -> float: + """Average path length of an unsuccessful BST search over n points.""" + if n <= 1: + return 0.0 + if n == 2: + return 1.0 + harmonic = math.log(n - 1) + 0.5772156649 # Euler–Mascheroni + return 2 * harmonic - (2 * (n - 1) / n) + + +@dataclass +class _Node: + size: Optional[int] = None + split_feature: Optional[int] = None + split_value: Optional[float] = None + left: Optional["_Node"] = None + right: Optional["_Node"] = None + + +def _build_tree(data: List[Vector], height_limit: int, rng: random.Random, depth: int = 0) -> _Node: + if depth >= height_limit or len(data) <= 1: + return _Node(size=len(data)) + dims = len(data[0]) + feature = rng.randrange(dims) + values = [row[feature] for row in data] + lo, hi = min(values), max(values) + if lo == hi: + return _Node(size=len(data)) + split = lo + rng.random() * (hi - lo) + left = [r for r in data if r[feature] < split] + right = [r for r in data if r[feature] >= split] + return _Node( + split_feature=feature, + split_value=split, + left=_build_tree(left, height_limit, rng, depth + 1), + right=_build_tree(right, height_limit, rng, depth + 1), + ) + + +def _path_length(point: Vector, node: _Node, depth: int = 0) -> float: + if node.size is not None: + return depth + _c_factor(node.size) + assert node.split_feature is not None and node.split_value is not None + nxt = node.left if point[node.split_feature] < node.split_value else node.right + assert nxt is not None + return _path_length(point, nxt, depth + 1) + + +@dataclass +class IsolationForest: + trees: int = 100 + sample_size: int = 256 + seed: int = 42 + _forest: List[_Node] = field(default_factory=list) + _norm: float = 1.0 + + def fit(self, data: List[Vector]) -> "IsolationForest": + if not data: + raise ValueError("cannot fit on empty data") + rng = random.Random(self.seed) + sample_size = min(self.sample_size, len(data)) + height_limit = math.ceil(math.log2(max(2, sample_size))) + self._norm = _c_factor(sample_size) or 1.0 + self._forest = [] + for _ in range(self.trees): + sample = [data[rng.randrange(len(data))] for _ in range(sample_size)] + self._forest.append(_build_tree(sample, height_limit, rng)) + return self + + def score(self, point: Vector) -> float: + """Anomaly score in [0, 1]; higher = more anomalous.""" + if not self._forest: + raise RuntimeError("forest not fitted") + avg = sum(_path_length(point, t) for t in self._forest) / len(self._forest) + return 2 ** (-avg / self._norm) diff --git a/ml-service/main.py b/ml-service/main.py index 9e30a132..4f0d5f32 100644 --- a/ml-service/main.py +++ b/ml-service/main.py @@ -4,7 +4,7 @@ import time import logging -from routers import churn, recommendations, pricing, health +from routers import churn, recommendations, pricing, health, anomaly from model_registry import ModelRegistry logging.basicConfig(level=logging.INFO) @@ -45,3 +45,4 @@ async def track_latency(request, call_next): app.include_router(churn.router, prefix="/v1/churn") app.include_router(recommendations.router, prefix="/v1/recommendations") app.include_router(pricing.router, prefix="/v1/pricing") +app.include_router(anomaly.router, prefix="/v1/anomaly") diff --git a/ml-service/routers/anomaly.py b/ml-service/routers/anomaly.py new file mode 100644 index 00000000..5ba93449 --- /dev/null +++ b/ml-service/routers/anomaly.py @@ -0,0 +1,77 @@ +"""Rate-limit anomaly detection router (#615). + +Exposes the Isolation Forest behavioral anomaly detector: train on normal +traffic windows, then score a window and get an adaptive rate-limit +recommendation. Mirrors backend/gateway so the gateway can either run scoring +locally or call this service. +""" + +from __future__ import annotations + +from typing import Dict, List + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +from anomaly.detector import AnomalyDetector, recommend_limit + +router = APIRouter(tags=["anomaly"]) + +# Module-level model. A production deployment would persist/version this via the +# ModelRegistry and retrain on a schedule (see ml-service/retrain.py). +_detector = AnomalyDetector() + + +class RequestSample(BaseModel): + timestamp_ms: int + endpoint: str + payload_size: int = 0 + user_agent: str = "" + ip: str = "" + + +class TrainRequest(BaseModel): + windows: List[List[RequestSample]] = Field(..., min_length=1) + trees: int = 100 + sample_size: int = 256 + seed: int = 42 + + +class ScoreRequest(BaseModel): + window: List[RequestSample] + base_limit: int = 100 + threshold: float = Field(0.8, ge=0.0, le=1.0) + severe_threshold: float = Field(0.95, ge=0.0, le=1.0) + + +def _to_dicts(samples: List[RequestSample]) -> List[dict]: + return [s.model_dump() for s in samples] + + +@router.post("/train") +def train(req: TrainRequest): + global _detector + _detector = AnomalyDetector(trees=req.trees, sample_size=req.sample_size, seed=req.seed) + _detector.fit([_to_dicts(w) for w in req.windows]) + return {"trained": True, "windows": len(req.windows)} + + +@router.post("/score") +def score(req: ScoreRequest): + if not _detector.fitted: + raise HTTPException(status_code=409, detail="model not trained; POST /v1/anomaly/train first") + result = _detector.score_window(_to_dicts(req.window)) + recommendation = recommend_limit( + result.score, req.base_limit, req.threshold, req.severe_threshold + ) + return { + "score": result.score, + "features": result.features, + "recommendation": recommendation, + "high_confidence": result.score >= 0.95, + } + + +@router.get("/status") +def status() -> Dict[str, bool]: + return {"fitted": _detector.fitted} diff --git a/ml-service/tests/test_anomaly.py b/ml-service/tests/test_anomaly.py new file mode 100644 index 00000000..67f693b0 --- /dev/null +++ b/ml-service/tests/test_anomaly.py @@ -0,0 +1,78 @@ +"""Tests for the rate-limit anomaly detector (#615).""" + +import random + +from anomaly.detector import AnomalyDetector, recommend_limit +from anomaly.features import extract_features, to_vector +from anomaly.isolation_forest import IsolationForest + + +def _normal_window(w: int): + n = 40 + (w % 20) + base = 1_700_000_000_000 + w * 60_000 + endpoints = ["/api/subs", "/api/usage"] if w % 2 else ["/api/subs"] + return [ + { + "timestamp_ms": base + int(i * 60_000 / n), + "endpoint": endpoints[i % len(endpoints)], + "payload_size": 350 + (i % 100), + "user_agent": "app/1", + "ip": "10.0.0.1", + } + for i in range(n) + ] + + +def _attack_window(): + return [ + { + "timestamp_ms": 1_700_000_000_000 + i, + "endpoint": f"/api/ep{i % 50}", + "payload_size": 50_000, + "user_agent": f"bot/{i % 100}", + "ip": f"192.168.{i % 255}.{i % 255}", + } + for i in range(5000) + ] + + +def test_features_shape_and_entropy(): + feats = extract_features(_normal_window(1)) + assert len(to_vector(feats)) == 6 + assert feats["geo_spread"] == 1.0 + # two endpoints -> positive entropy on odd window + assert feats["endpoint_entropy"] > 0 + + +def test_features_empty_window(): + assert extract_features([])["request_rate"] == 0.0 + + +def test_isolation_forest_outlier_scores_higher(): + rng = random.Random(0) + data = [[1 + rng.random(), 2 + rng.random(), rng.random(), 500 + rng.random() * 50] for _ in range(200)] + forest = IsolationForest(trees=100, sample_size=128, seed=7).fit(data) + inlier = forest.score([1.5, 2.5, 0.5, 525]) + outlier = forest.score([100, 90, 9, 90_000]) + assert outlier > inlier + assert 0.0 <= inlier <= 1.0 and 0.0 <= outlier <= 1.0 + + +def test_isolation_forest_deterministic(): + data = [[float(i % 7), float(i % 3)] for i in range(100)] + a = IsolationForest(seed=1).fit(data).score([3.0, 1.0]) + b = IsolationForest(seed=1).fit(data).score([3.0, 1.0]) + assert a == b + + +def test_detector_flags_attack_over_normal(): + det = AnomalyDetector(seed=3).fit([_normal_window(w) for w in range(60)]) + normal = det.score_window(_normal_window(3)).score + attack = det.score_window(_attack_window()).score + assert attack > normal + + +def test_recommend_limit_thresholds(): + assert recommend_limit(0.3, 100)["action"] == "normal" + assert recommend_limit(0.85, 100) == {"action": "reduced", "effective_limit": 50, "severity": "medium"} + assert recommend_limit(0.97, 100) == {"action": "severely-reduced", "effective_limit": 10, "severity": "high"}