|
| 1 | +# SPDX-License-Identifier: PMPL-1.0-or-later |
| 2 | +# Copyright (c) 2026 Jonathan D.A. Jewell <j.d.a.jewell@open.ac.uk> |
| 3 | + |
| 4 | +""" |
| 5 | +Diagnostics — circuit breaker, query metrics, and health reporting for QuandleDB. |
| 6 | +
|
| 7 | +Provides three orthogonal capabilities: |
| 8 | +
|
| 9 | + CircuitBreaker — wraps fallible function calls (Skein, semantic index) with |
| 10 | + state-machine fault isolation. States: :closed → :open → :half_open → :closed. |
| 11 | + Self-heals after cooldown by probing with a single call; reopens on failure. |
| 12 | +
|
| 13 | + QueryMetrics — thread-safe rolling statistics over all /api/query calls. |
| 14 | + Tracks counts, latencies, error classes, and pushdown hit rates. |
| 15 | +
|
| 16 | + HealthReport — aggregates component liveness into a structured JSON response |
| 17 | + suitable for readiness/liveness probes and operator dashboards. |
| 18 | +
|
| 19 | +Point-to-point tracing is built into EvalTrace (see Evaluator.jl), which records |
| 20 | +per-stage timings and row counts. Diagnostics.jl provides the infrastructure that |
| 21 | +the evaluator and serve.jl use to surface those traces. |
| 22 | +""" |
| 23 | + |
| 24 | +using Dates |
| 25 | + |
| 26 | +# ───────────────────────────────────────────────────────────────────────────── |
| 27 | +# Circuit breaker |
| 28 | +# ───────────────────────────────────────────────────────────────────────────── |
| 29 | + |
| 30 | +""" |
| 31 | + CircuitBreaker(name; threshold, cooldown_s) |
| 32 | +
|
| 33 | +State-machine wrapper around fallible I/O calls. |
| 34 | +
|
| 35 | +States: |
| 36 | + :closed — normal; calls go through. Consecutive failures are counted. |
| 37 | + :open — tripped; calls fail immediately with the last recorded error. |
| 38 | + Transitions to :half_open after `cooldown_s` seconds. |
| 39 | + :half_open — one probe call allowed through. Success → :closed; failure → :open. |
| 40 | +
|
| 41 | +Thread safety: all state mutations are guarded by an internal `ReentrantLock`. |
| 42 | +""" |
| 43 | +mutable struct CircuitBreaker |
| 44 | + name::String |
| 45 | + threshold::Int # consecutive failures before opening |
| 46 | + cooldown_s::Float64 # seconds before attempting a probe |
| 47 | + state::Symbol # :closed | :open | :half_open |
| 48 | + consecutive_failures::Int |
| 49 | + last_failure_at::Float64 # Unix timestamp |
| 50 | + last_error::Union{Exception, Nothing} |
| 51 | + total_calls::Int |
| 52 | + total_failures::Int |
| 53 | + total_short_circuits::Int |
| 54 | + lock::ReentrantLock |
| 55 | +end |
| 56 | + |
| 57 | +function CircuitBreaker(name::String; |
| 58 | + threshold::Int = 3, |
| 59 | + cooldown_s::Float64 = 30.0) |
| 60 | + CircuitBreaker(name, threshold, cooldown_s, |
| 61 | + :closed, 0, 0.0, nothing, |
| 62 | + 0, 0, 0, |
| 63 | + ReentrantLock()) |
| 64 | +end |
| 65 | + |
| 66 | +""" |
| 67 | + call_with_breaker!(cb::CircuitBreaker, f) -> result |
| 68 | +
|
| 69 | +Call `f()` through the circuit breaker. Raises `CircuitOpenError` when the |
| 70 | +circuit is open and the cooldown period has not expired. |
| 71 | +""" |
| 72 | +struct CircuitOpenError <: Exception |
| 73 | + breaker_name::String |
| 74 | + last_error::Union{Exception, Nothing} |
| 75 | + cooldown_remaining_s::Float64 |
| 76 | +end |
| 77 | + |
| 78 | +Base.showerror(io::IO, e::CircuitOpenError) = print(io, |
| 79 | + "CircuitOpenError($(e.breaker_name)): circuit open; " * |
| 80 | + "retry in $(round(e.cooldown_remaining_s, digits=1))s. " * |
| 81 | + "Last error: $(e.last_error)") |
| 82 | + |
| 83 | +function call_with_breaker!(cb::CircuitBreaker, f) |
| 84 | + lock(cb.lock) do |
| 85 | + cb.total_calls += 1 |
| 86 | + now_ts = time() |
| 87 | + |
| 88 | + if cb.state == :open |
| 89 | + elapsed = now_ts - cb.last_failure_at |
| 90 | + if elapsed >= cb.cooldown_s |
| 91 | + cb.state = :half_open |
| 92 | + else |
| 93 | + cb.total_short_circuits += 1 |
| 94 | + throw(CircuitOpenError(cb.name, cb.last_error, |
| 95 | + cb.cooldown_s - elapsed)) |
| 96 | + end |
| 97 | + end |
| 98 | + end |
| 99 | + |
| 100 | + # Probe or normal call — outside the lock so f() can be slow |
| 101 | + try |
| 102 | + result = f() |
| 103 | + lock(cb.lock) do |
| 104 | + if cb.state == :half_open |
| 105 | + cb.state = :closed |
| 106 | + end |
| 107 | + cb.consecutive_failures = 0 |
| 108 | + end |
| 109 | + return result |
| 110 | + catch e |
| 111 | + lock(cb.lock) do |
| 112 | + cb.consecutive_failures += 1 |
| 113 | + cb.total_failures += 1 |
| 114 | + cb.last_failure_at = time() |
| 115 | + cb.last_error = e |
| 116 | + if cb.state == :half_open || cb.consecutive_failures >= cb.threshold |
| 117 | + cb.state = :open |
| 118 | + end |
| 119 | + end |
| 120 | + rethrow() |
| 121 | + end |
| 122 | +end |
| 123 | + |
| 124 | +""" |
| 125 | + breaker_state_dict(cb) -> Dict |
| 126 | +
|
| 127 | +Serialize circuit breaker state for inclusion in health reports. |
| 128 | +""" |
| 129 | +function breaker_state_dict(cb::CircuitBreaker) |
| 130 | + lock(cb.lock) do |
| 131 | + remaining = cb.state == :open ? |
| 132 | + max(0.0, cb.cooldown_s - (time() - cb.last_failure_at)) : 0.0 |
| 133 | + Dict{String, Any}( |
| 134 | + "name" => cb.name, |
| 135 | + "state" => string(cb.state), |
| 136 | + "consecutive_failures" => cb.consecutive_failures, |
| 137 | + "threshold" => cb.threshold, |
| 138 | + "cooldown_remaining_s" => round(remaining, digits=1), |
| 139 | + "total_calls" => cb.total_calls, |
| 140 | + "total_failures" => cb.total_failures, |
| 141 | + "total_short_circuits" => cb.total_short_circuits, |
| 142 | + "last_error" => isnothing(cb.last_error) ? nothing : string(cb.last_error), |
| 143 | + ) |
| 144 | + end |
| 145 | +end |
| 146 | + |
| 147 | +# ───────────────────────────────────────────────────────────────────────────── |
| 148 | +# Query metrics |
| 149 | +# ───────────────────────────────────────────────────────────────────────────── |
| 150 | + |
| 151 | +const _LATENCY_WINDOW = 200 # keep last N latencies for percentile calculation |
| 152 | + |
| 153 | +""" |
| 154 | + QueryMetrics() |
| 155 | +
|
| 156 | +Thread-safe rolling statistics over KRL/SQL queries. |
| 157 | +
|
| 158 | +Error classes tracked: |
| 159 | + :parse_error — KRLLexError or KRLParseError |
| 160 | + :eval_error — error during stage execution (type mismatch, etc.) |
| 161 | + :db_error — Skein DB failure (wrapped or raw) |
| 162 | + :timeout — query exceeded configured timeout |
| 163 | + :circuit_open — circuit breaker rejected the call |
| 164 | +""" |
| 165 | +mutable struct QueryMetrics |
| 166 | + total::Int |
| 167 | + by_source::Dict{Symbol, Int} # :krl, :sql, :unknown |
| 168 | + by_error_class::Dict{Symbol, Int} # see above |
| 169 | + pushdown_hits::Int |
| 170 | + pushdown_misses::Int |
| 171 | + latencies_ms::Vector{Float64} # circular buffer (last _LATENCY_WINDOW) |
| 172 | + lock::ReentrantLock |
| 173 | +end |
| 174 | + |
| 175 | +QueryMetrics() = QueryMetrics(0, Dict{Symbol,Int}(), Dict{Symbol,Int}(), |
| 176 | + 0, 0, Float64[], ReentrantLock()) |
| 177 | + |
| 178 | +""" |
| 179 | + record_query!(m, source, latency_ms; error_class=nothing, pushdown=false) |
| 180 | +
|
| 181 | +Record one completed query. `source` ∈ {:krl, :sql, :unknown}. |
| 182 | +""" |
| 183 | +function record_query!(m::QueryMetrics, source::Symbol, latency_ms::Float64; |
| 184 | + error_class::Union{Symbol, Nothing} = nothing, |
| 185 | + pushdown::Bool = false) |
| 186 | + lock(m.lock) do |
| 187 | + m.total += 1 |
| 188 | + m.by_source[source] = get(m.by_source, source, 0) + 1 |
| 189 | + if !isnothing(error_class) |
| 190 | + m.by_error_class[error_class] = get(m.by_error_class, error_class, 0) + 1 |
| 191 | + end |
| 192 | + if pushdown; m.pushdown_hits += 1 |
| 193 | + else; m.pushdown_misses += 1; end |
| 194 | + push!(m.latencies_ms, latency_ms) |
| 195 | + if length(m.latencies_ms) > _LATENCY_WINDOW |
| 196 | + deleteat!(m.latencies_ms, 1) |
| 197 | + end |
| 198 | + end |
| 199 | +end |
| 200 | + |
| 201 | +""" |
| 202 | + metrics_snapshot(m) -> Dict |
| 203 | +
|
| 204 | +Compute derived statistics (p50, p95, p99, avg) and return a snapshot dict. |
| 205 | +""" |
| 206 | +function metrics_snapshot(m::QueryMetrics)::Dict{String, Any} |
| 207 | + lock(m.lock) do |
| 208 | + lats = sort(copy(m.latencies_ms)) |
| 209 | + n = length(lats) |
| 210 | + pct(p) = n == 0 ? 0.0 : lats[max(1, round(Int, p * n))] |
| 211 | + avg = n == 0 ? 0.0 : sum(lats) / n |
| 212 | + |
| 213 | + Dict{String, Any}( |
| 214 | + "total_queries" => m.total, |
| 215 | + "by_source" => Dict(string(k) => v for (k,v) in m.by_source), |
| 216 | + "by_error_class" => Dict(string(k) => v for (k,v) in m.by_error_class), |
| 217 | + "pushdown_hits" => m.pushdown_hits, |
| 218 | + "pushdown_misses" => m.pushdown_misses, |
| 219 | + "pushdown_rate" => m.total > 0 ? |
| 220 | + round(m.pushdown_hits / m.total, digits = 3) : 0.0, |
| 221 | + "latency_ms" => Dict{String, Any}( |
| 222 | + "avg" => round(avg, digits = 2), |
| 223 | + "p50" => round(pct(0.50), digits = 2), |
| 224 | + "p95" => round(pct(0.95), digits = 2), |
| 225 | + "p99" => round(pct(0.99), digits = 2), |
| 226 | + "window" => n, |
| 227 | + ), |
| 228 | + ) |
| 229 | + end |
| 230 | +end |
| 231 | + |
| 232 | +""" |
| 233 | + prometheus_text(m, cb_skein, cb_sem) -> String |
| 234 | +
|
| 235 | +Emit Prometheus exposition format text for /metrics. |
| 236 | +""" |
| 237 | +function prometheus_text(m::QueryMetrics, |
| 238 | + cb_skein::CircuitBreaker, |
| 239 | + cb_sem::CircuitBreaker)::String |
| 240 | + snap = metrics_snapshot(m) |
| 241 | + s = IOBuffer() |
| 242 | + function metric(name, help, type, val) |
| 243 | + println(s, "# HELP $name $help") |
| 244 | + println(s, "# TYPE $name $type") |
| 245 | + println(s, "$name $val") |
| 246 | + end |
| 247 | + metric("quandledb_queries_total", "Total queries processed", "counter", m.total) |
| 248 | + metric("quandledb_parse_errors_total", "Parse error count", "counter", |
| 249 | + get(m.by_error_class, :parse_error, 0)) |
| 250 | + metric("quandledb_eval_errors_total", "Eval error count", "counter", |
| 251 | + get(m.by_error_class, :eval_error, 0)) |
| 252 | + metric("quandledb_db_errors_total", "Database error count", "counter", |
| 253 | + get(m.by_error_class, :db_error, 0)) |
| 254 | + metric("quandledb_circuit_open", "Skein DB circuit breaker state (1=open)", "gauge", |
| 255 | + cb_skein.state == :open ? 1 : 0) |
| 256 | + metric("quandledb_sem_circuit_open", "Semantic index circuit breaker state (1=open)", "gauge", |
| 257 | + cb_sem.state == :open ? 1 : 0) |
| 258 | + metric("quandledb_latency_p95_ms", "p95 query latency in milliseconds", "gauge", |
| 259 | + snap["latency_ms"]["p95"]) |
| 260 | + metric("quandledb_latency_p99_ms", "p99 query latency in milliseconds", "gauge", |
| 261 | + snap["latency_ms"]["p99"]) |
| 262 | + String(take!(s)) |
| 263 | +end |
| 264 | + |
| 265 | +# ───────────────────────────────────────────────────────────────────────────── |
| 266 | +# Health report |
| 267 | +# ───────────────────────────────────────────────────────────────────────────── |
| 268 | + |
| 269 | +""" |
| 270 | + ComponentStatus(name, status, detail, latency_ms) |
| 271 | +
|
| 272 | +`status` is `:ok`, `:degraded`, or `:down`. |
| 273 | +""" |
| 274 | +struct ComponentStatus |
| 275 | + name::String |
| 276 | + status::Symbol |
| 277 | + detail::String |
| 278 | + latency_ms::Float64 |
| 279 | +end |
| 280 | + |
| 281 | +""" |
| 282 | + HealthReport(overall, components, metrics_snap, checked_at) |
| 283 | +
|
| 284 | +`overall` is the worst of all component statuses. |
| 285 | +""" |
| 286 | +struct HealthReport |
| 287 | + overall::Symbol |
| 288 | + components::Vector{ComponentStatus} |
| 289 | + metrics_snap::Dict{String, Any} |
| 290 | + checked_at::String |
| 291 | +end |
| 292 | + |
| 293 | +function health_report_dict(r::HealthReport)::Dict{String, Any} |
| 294 | + worst_order = Dict(:down => 2, :degraded => 1, :ok => 0) |
| 295 | + Dict{String, Any}( |
| 296 | + "status" => string(r.overall), |
| 297 | + "checked_at" => r.checked_at, |
| 298 | + "components" => [Dict{String, Any}( |
| 299 | + "name" => c.name, |
| 300 | + "status" => string(c.status), |
| 301 | + "detail" => c.detail, |
| 302 | + "latency_ms" => round(c.latency_ms, digits = 2), |
| 303 | + ) for c in r.components], |
| 304 | + "metrics" => r.metrics_snap, |
| 305 | + ) |
| 306 | +end |
| 307 | + |
| 308 | +""" |
| 309 | + check_health(skein_probe, sem_probe, krl_ok, metrics, cb_skein, cb_sem) -> HealthReport |
| 310 | +
|
| 311 | +Probe each component and return a structured health report. |
| 312 | +
|
| 313 | +`skein_probe` and `sem_probe` are zero-argument callables that return |
| 314 | +`(ok::Bool, detail::String)` — call them to measure liveness and latency. |
| 315 | +""" |
| 316 | +function check_health(skein_probe, |
| 317 | + sem_probe, |
| 318 | + krl_ok::Bool, |
| 319 | + metrics::QueryMetrics, |
| 320 | + cb_skein::CircuitBreaker, |
| 321 | + cb_sem::CircuitBreaker)::HealthReport |
| 322 | + |
| 323 | + function probe_component(name, probe_fn) |
| 324 | + t0 = time() |
| 325 | + try |
| 326 | + ok, detail = probe_fn() |
| 327 | + lat = (time() - t0) * 1000 |
| 328 | + st = ok ? :ok : :degraded |
| 329 | + ComponentStatus(name, st, detail, lat) |
| 330 | + catch e |
| 331 | + lat = (time() - t0) * 1000 |
| 332 | + ComponentStatus(name, :down, string(e), lat) |
| 333 | + end |
| 334 | + end |
| 335 | + |
| 336 | + components = ComponentStatus[ |
| 337 | + probe_component("skein_db", skein_probe), |
| 338 | + probe_component("semantic_index", sem_probe), |
| 339 | + ComponentStatus("krl_parser", krl_ok ? :ok : :down, |
| 340 | + krl_ok ? "v0.1.0 operational" : "parser self-test failed", 0.0), |
| 341 | + ComponentStatus("skein_circuit", |
| 342 | + cb_skein.state == :closed ? :ok : |
| 343 | + cb_skein.state == :half_open ? :degraded : :down, |
| 344 | + "state=$(cb_skein.state) failures=$(cb_skein.consecutive_failures)", |
| 345 | + 0.0), |
| 346 | + ComponentStatus("sem_circuit", |
| 347 | + cb_sem.state == :closed ? :ok : |
| 348 | + cb_sem.state == :half_open ? :degraded : :down, |
| 349 | + "state=$(cb_sem.state) failures=$(cb_sem.consecutive_failures)", |
| 350 | + 0.0), |
| 351 | + ] |
| 352 | + |
| 353 | + worst = :ok |
| 354 | + for c in components |
| 355 | + if c.status == :down && worst != :down |
| 356 | + worst = :down |
| 357 | + elseif c.status == :degraded && worst == :ok |
| 358 | + worst = :degraded |
| 359 | + end |
| 360 | + end |
| 361 | + |
| 362 | + HealthReport(worst, components, metrics_snapshot(metrics), |
| 363 | + string(Dates.now(Dates.UTC)) * "Z") |
| 364 | +end |
| 365 | + |
| 366 | +# ───────────────────────────────────────────────────────────────────────────── |
| 367 | +# KRL parser self-test |
| 368 | +# ───────────────────────────────────────────────────────────────────────────── |
| 369 | + |
| 370 | +""" |
| 371 | + krl_parser_selftest() -> (ok::Bool, detail::String) |
| 372 | +
|
| 373 | +Run a minimal round-trip through the KRL lexer + parser to verify the |
| 374 | +parser module is operational. Does not touch any database. |
| 375 | +""" |
| 376 | +function krl_parser_selftest()::Tuple{Bool, String} |
| 377 | + try |
| 378 | + prog = parse_krl("from knots | filter crossing_number == 3 | take 1") |
| 379 | + ok = length(prog.statements) == 1 |
| 380 | + (ok, ok ? "self-test passed" : "unexpected statement count") |
| 381 | + catch e |
| 382 | + (false, "self-test threw: $e") |
| 383 | + end |
| 384 | +end |
0 commit comments