@@ -384,7 +384,9 @@ pub async fn analyze_table(pool: &PgPool, table: &str) -> Result<()> {
384384///
385385/// # Arguments
386386/// * `pool` - Database connection pool
387- /// * `query` - SQL query to explain (without EXPLAIN prefix)
387+ /// * `query` - SQL query to explain (without EXPLAIN prefix).
388+ /// Must be a trusted/hardcoded string — not user-supplied input,
389+ /// as it is interpolated directly into the SQL statement.
388390///
389391/// # Returns
390392/// The EXPLAIN output as a newline-separated string
@@ -448,3 +450,319 @@ pub fn assert_uses_seq_scan(explain_output: &str) {
448450 explain_output
449451 ) ;
450452}
453+
454+ // ============================================================================
455+ // Benchmarking / EXPLAIN Helpers
456+ // ============================================================================
457+
458+ /// Statistics extracted from EXPLAIN ANALYZE JSON output
459+ ///
460+ /// Contains timing and plan information for benchmarking queries.
461+ /// Used by `explain_analyze_avg` to return averaged statistics.
462+ #[ derive( Debug , Clone ) ]
463+ pub struct ExplainStats {
464+ /// Average execution time in milliseconds across runs
465+ pub execution_time_ms : f64 ,
466+ /// Average planning time in milliseconds across runs
467+ pub planning_time_ms : f64 ,
468+ /// Top-level node type from the query plan (e.g., "Index Scan", "Seq Scan")
469+ pub node_type : String ,
470+ }
471+
472+ /// Run EXPLAIN with JSON format on a query and return the parsed plan
473+ ///
474+ /// Executes `EXPLAIN (FORMAT JSON) {query}` and parses the result.
475+ /// PostgreSQL returns a single-element JSON array containing the plan tree.
476+ ///
477+ /// This is distinct from `explain_query()` which returns plain text output.
478+ /// The JSON format provides structured access to plan nodes, costs, and types.
479+ ///
480+ /// # Arguments
481+ /// * `pool` - Database connection pool
482+ /// * `query` - SQL query to explain (without EXPLAIN prefix).
483+ /// Must be a trusted/hardcoded string — not user-supplied input,
484+ /// as it is interpolated directly into the SQL statement.
485+ ///
486+ /// # Returns
487+ /// The full EXPLAIN JSON output as a `serde_json::Value`
488+ ///
489+ /// # Example
490+ /// ```ignore
491+ /// let plan = explain_json(&pool, "SELECT * FROM foo WHERE x = 1").await?;
492+ /// let node_type = plan[0]["Plan"]["Node Type"].as_str().unwrap();
493+ /// ```
494+ pub async fn explain_json ( pool : & PgPool , query : & str ) -> Result < serde_json:: Value > {
495+ let sql = format ! ( "EXPLAIN (FORMAT JSON) {}" , query) ;
496+ let plan: serde_json:: Value = sqlx:: query_scalar ( & sql)
497+ . fetch_one ( pool)
498+ . await
499+ . with_context ( || format ! ( "running EXPLAIN (FORMAT JSON) on query: {}" , query) ) ?;
500+
501+ Ok ( plan)
502+ }
503+
504+ /// Run EXPLAIN ANALYZE multiple times and return averaged statistics
505+ ///
506+ /// Executes `EXPLAIN (ANALYZE, FORMAT JSON) {query}` the specified number of times
507+ /// and returns the arithmetic mean of execution and planning times.
508+ ///
509+ /// **Warning**: EXPLAIN ANALYZE actually executes the query. If the query has
510+ /// side effects (INSERT, UPDATE, DELETE), those effects will occur on every run.
511+ ///
512+ /// **Note**: The first run may include cold-start overhead (buffer cache misses,
513+ /// plan cache population). No runs are discarded — callers should account for this
514+ /// when setting thresholds or increase the run count to dilute the effect.
515+ ///
516+ /// # Arguments
517+ /// * `pool` - Database connection pool
518+ /// * `query` - SQL query to explain and execute (without EXPLAIN prefix).
519+ /// Must be a trusted/hardcoded string — not user-supplied input,
520+ /// as it is interpolated directly into the SQL statement.
521+ /// * `runs` - Number of times to execute (must be >= 1)
522+ ///
523+ /// # Returns
524+ /// Averaged `ExplainStats` with mean execution_time_ms, mean planning_time_ms,
525+ /// and the node_type from the first run's top-level plan node
526+ ///
527+ /// # Example
528+ /// ```ignore
529+ /// let stats = explain_analyze_avg(&pool, "SELECT * FROM foo WHERE x = 1", 5).await?;
530+ /// assert!(stats.execution_time_ms < 10.0, "Query too slow: {}ms", stats.execution_time_ms);
531+ /// assert_eq!(stats.node_type, "Index Scan");
532+ /// ```
533+ pub async fn explain_analyze_avg ( pool : & PgPool , query : & str , runs : usize ) -> Result < ExplainStats > {
534+ anyhow:: ensure!( runs >= 1 , "runs must be >= 1, got {}" , runs) ;
535+
536+ let sql = format ! ( "EXPLAIN (ANALYZE, FORMAT JSON) {}" , query) ;
537+
538+ let mut total_execution_ms = 0.0_f64 ;
539+ let mut total_planning_ms = 0.0_f64 ;
540+ let mut node_type = String :: new ( ) ;
541+
542+ for i in 0 ..runs {
543+ let plan: serde_json:: Value = sqlx:: query_scalar ( & sql)
544+ . fetch_one ( pool)
545+ . await
546+ . with_context ( || {
547+ format ! (
548+ "running EXPLAIN ANALYZE (run {}/{}) on query: {}" ,
549+ i + 1 ,
550+ runs,
551+ query
552+ )
553+ } ) ?;
554+
555+ // EXPLAIN (ANALYZE, FORMAT JSON) returns:
556+ // [{"Plan": {...}, "Planning Time": N, "Execution Time": N}]
557+ let entry = & plan[ 0 ] ;
558+
559+ let exec_time = entry[ "Execution Time" ]
560+ . as_f64 ( )
561+ . with_context ( || format ! ( "extracting Execution Time on run {}/{}" , i + 1 , runs) ) ?;
562+
563+ let plan_time = entry[ "Planning Time" ]
564+ . as_f64 ( )
565+ . with_context ( || format ! ( "extracting Planning Time on run {}/{}" , i + 1 , runs) ) ?;
566+
567+ total_execution_ms += exec_time;
568+ total_planning_ms += plan_time;
569+
570+ // Capture node type from first run only
571+ if i == 0 {
572+ node_type = entry[ "Plan" ] [ "Node Type" ]
573+ . as_str ( )
574+ . with_context ( || "extracting Node Type from first run" ) ?
575+ . to_string ( ) ;
576+ }
577+ }
578+
579+ let n = runs as f64 ;
580+ Ok ( ExplainStats {
581+ execution_time_ms : total_execution_ms / n,
582+ planning_time_ms : total_planning_ms / n,
583+ node_type,
584+ } )
585+ }
586+
587+ /// Assert that a JSON EXPLAIN plan does not use any sequential scan
588+ ///
589+ /// Recursively walks the JSON plan tree checking all "Node Type" fields.
590+ /// A plan can have nested nodes (e.g., Aggregate -> Seq Scan), so all levels
591+ /// are checked. Both "Seq Scan" and "Parallel Seq Scan" are rejected.
592+ ///
593+ /// This is the structured (JSON) counterpart to `assert_uses_seq_scan()` which
594+ /// operates on plain text output.
595+ ///
596+ /// # Arguments
597+ /// * `plan` - JSON EXPLAIN output from `explain_json()` or `EXPLAIN (FORMAT JSON)`
598+ ///
599+ /// # Panics
600+ /// Panics if any node in the plan tree has a "Seq Scan" or "Parallel Seq Scan" node type
601+ ///
602+ /// # Example
603+ /// ```ignore
604+ /// let plan = explain_json(&pool, "SELECT * FROM foo WHERE x = 1").await?;
605+ /// assert_no_seq_scan(&plan);
606+ /// ```
607+ pub fn assert_no_seq_scan ( plan : & serde_json:: Value ) {
608+ let mut seq_scan_nodes = Vec :: new ( ) ;
609+ collect_seq_scan_nodes ( plan, & mut seq_scan_nodes) ;
610+
611+ assert ! (
612+ seq_scan_nodes. is_empty( ) ,
613+ "Expected no sequential scans but found {} node(s): {:?}\n Full plan: {}" ,
614+ seq_scan_nodes. len( ) ,
615+ seq_scan_nodes,
616+ serde_json:: to_string_pretty( plan) . unwrap_or_else( |_| plan. to_string( ) )
617+ ) ;
618+ }
619+
620+ /// Recursively collect all sequential scan node types from a JSON EXPLAIN plan
621+ ///
622+ /// Checks standard PostgreSQL node types only ("Seq Scan", "Parallel Seq Scan").
623+ /// Custom scan providers (e.g., from extensions) are not currently detected.
624+ fn collect_seq_scan_nodes ( value : & serde_json:: Value , found : & mut Vec < String > ) {
625+ match value {
626+ serde_json:: Value :: Object ( map) => {
627+ if let Some ( node_type) = map. get ( "Node Type" ) . and_then ( |v| v. as_str ( ) ) {
628+ if node_type == "Seq Scan" || node_type == "Parallel Seq Scan" {
629+ let relation = map
630+ . get ( "Relation Name" )
631+ . and_then ( |v| v. as_str ( ) )
632+ . unwrap_or ( "unknown" ) ;
633+ found. push ( format ! ( "{} on {}" , node_type, relation) ) ;
634+ }
635+ }
636+ for v in map. values ( ) {
637+ collect_seq_scan_nodes ( v, found) ;
638+ }
639+ }
640+ serde_json:: Value :: Array ( arr) => {
641+ for item in arr {
642+ collect_seq_scan_nodes ( item, found) ;
643+ }
644+ }
645+ _ => { }
646+ }
647+ }
648+
649+ // ============================================================================
650+ // pg_stat_statements Helpers (Tier 2)
651+ // ============================================================================
652+
653+ /// Statistics from pg_stat_statements for a matched query
654+ ///
655+ /// Contains key performance metrics from the pg_stat_statements view.
656+ /// See PostgreSQL documentation for pg_stat_statements column definitions.
657+ #[ derive( Debug , Clone ) ]
658+ pub struct PgStatEntry {
659+ /// Number of times the query was executed
660+ pub calls : i64 ,
661+ /// Mean execution time in milliseconds
662+ pub mean_exec_time : f64 ,
663+ /// Population standard deviation of execution time in milliseconds
664+ pub stddev_exec_time : f64 ,
665+ /// Total execution time in milliseconds across all calls
666+ pub total_exec_time : f64 ,
667+ /// The normalized query string from pg_stat_statements
668+ pub query : String ,
669+ }
670+
671+ /// Ensure pg_stat_statements extension is available
672+ ///
673+ /// Creates the extension if it doesn't exist. Should be called once
674+ /// at the start of benchmark tests that need pg_stat_statements.
675+ ///
676+ /// Requires `shared_preload_libraries=pg_stat_statements` in the PostgreSQL
677+ /// server configuration (see docker-compose.yml).
678+ pub async fn ensure_pg_stat_statements ( pool : & PgPool ) -> Result < ( ) > {
679+ sqlx:: query ( "CREATE EXTENSION IF NOT EXISTS pg_stat_statements" )
680+ . execute ( pool)
681+ . await
682+ . with_context ( || "creating pg_stat_statements extension" ) ?;
683+ Ok ( ( ) )
684+ }
685+
686+ /// Reset all pg_stat_statements counters
687+ ///
688+ /// Clears cumulative per-query statistics so the next sampling window starts
689+ /// from zero. Call this before the measurement phase of a benchmark case to
690+ /// ensure `read_pg_stat_statements` reflects only the queries executed after
691+ /// the reset — not leftovers from prior cases or setup work.
692+ ///
693+ /// Requires the `pg_stat_statements` extension to be loaded
694+ /// (see `ensure_pg_stat_statements`).
695+ ///
696+ /// # Example
697+ /// ```ignore
698+ /// ensure_pg_stat_statements(&pool).await?;
699+ /// reset_pg_stat_statements(&pool).await?;
700+ /// // ... run benchmark queries ...
701+ /// let stats = read_pg_stat_statements(&pool, "%FROM bench%").await?;
702+ /// ```
703+ pub async fn reset_pg_stat_statements ( pool : & PgPool ) -> Result < ( ) > {
704+ sqlx:: query ( "SELECT pg_stat_statements_reset(NULL::oid, NULL::oid, (SELECT oid FROM pg_database WHERE datname = current_database()))" )
705+ . execute ( pool)
706+ . await
707+ . with_context ( || "resetting pg_stat_statements counters for current database" ) ?;
708+ Ok ( ( ) )
709+ }
710+
711+ /// Read query statistics from pg_stat_statements
712+ ///
713+ /// Looks up a query in the `pg_stat_statements` view using a SQL LIKE pattern.
714+ /// Requires the `pg_stat_statements` extension to be loaded
715+ /// (see `ensure_pg_stat_statements`).
716+ ///
717+ /// # Arguments
718+ /// * `pool` - Database connection pool
719+ /// * `query_pattern` - SQL LIKE pattern to match against normalized query text
720+ /// (e.g., `"%FROM ore WHERE%"`).
721+ /// Note: `pg_stat_statements` normalizes queries by replacing literal values
722+ /// with `$N` placeholders. Patterns must match the normalized form
723+ /// (e.g., `"%FROM bench WHERE e = $1%"`, not `"%FROM bench WHERE e = 'abc'%"`).
724+ ///
725+ /// # Returns
726+ /// `PgStatEntry` for the matched query. Returns error if no match or multiple matches.
727+ ///
728+ /// # Example
729+ /// ```ignore
730+ /// ensure_pg_stat_statements(&pool).await?;
731+ /// let stats = read_pg_stat_statements(&pool, "%FROM ore WHERE%").await?;
732+ /// assert!(stats.mean_exec_time < 5.0, "Query regression: {}ms", stats.mean_exec_time);
733+ /// ```
734+ pub async fn read_pg_stat_statements ( pool : & PgPool , query_pattern : & str ) -> Result < PgStatEntry > {
735+ let sql = "SELECT query, calls, mean_exec_time, stddev_exec_time, total_exec_time \
736+ FROM pg_stat_statements \
737+ WHERE query LIKE $1 \
738+ AND dbid = (SELECT oid FROM pg_database WHERE datname = current_database())";
739+
740+ let rows: Vec < ( String , i64 , f64 , f64 , f64 ) > = sqlx:: query_as ( sql)
741+ . bind ( query_pattern)
742+ . fetch_all ( pool)
743+ . await
744+ . with_context ( || format ! ( "reading pg_stat_statements for pattern: {}" , query_pattern) ) ?;
745+
746+ match rows. len ( ) {
747+ 0 => Err ( anyhow:: anyhow!(
748+ "No pg_stat_statements entry found matching pattern: {}" ,
749+ query_pattern
750+ ) ) ,
751+ 1 => {
752+ let ( query, calls, mean_exec_time, stddev_exec_time, total_exec_time) =
753+ rows. into_iter ( ) . next ( ) . unwrap ( ) ;
754+ Ok ( PgStatEntry {
755+ calls,
756+ mean_exec_time,
757+ stddev_exec_time,
758+ total_exec_time,
759+ query,
760+ } )
761+ }
762+ n => Err ( anyhow:: anyhow!(
763+ "Expected 1 pg_stat_statements entry but found {} matching pattern: {}" ,
764+ n,
765+ query_pattern
766+ ) ) ,
767+ }
768+ }
0 commit comments