@@ -21,26 +21,38 @@ mod listing_table_builder;
2121pub mod stream_schema_provider;
2222
2323use actix_web:: Either ;
24+ use arrow_schema:: SchemaRef ;
2425use chrono:: NaiveDateTime ;
2526use chrono:: { DateTime , Duration , Utc } ;
2627use datafusion:: arrow:: record_batch:: RecordBatch ;
2728use datafusion:: common:: tree_node:: Transformed ;
2829use datafusion:: execution:: disk_manager:: DiskManager ;
29- use datafusion:: execution:: { SendableRecordBatchStream , SessionState , SessionStateBuilder } ;
30+ use datafusion:: execution:: {
31+ RecordBatchStream , SendableRecordBatchStream , SessionState , SessionStateBuilder ,
32+ } ;
3033use datafusion:: logical_expr:: expr:: Alias ;
3134use datafusion:: logical_expr:: {
3235 Aggregate , Explain , Filter , LogicalPlan , PlanType , Projection , ToStringifiedPlan ,
3336} ;
37+ use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
38+ use datafusion:: physical_plan:: {
39+ ExecutionPlan , ExecutionPlanProperties , collect_partitioned, execute_stream_partitioned,
40+ } ;
3441use datafusion:: prelude:: * ;
3542use datafusion:: sql:: parser:: DFParser ;
3643use datafusion:: sql:: resolve:: resolve_table_references;
3744use datafusion:: sql:: sqlparser:: dialect:: PostgreSqlDialect ;
45+ use futures:: Stream ;
46+ use futures:: stream:: select_all;
3847use itertools:: Itertools ;
3948use once_cell:: sync:: Lazy ;
4049use serde:: { Deserialize , Serialize } ;
4150use serde_json:: { Value , json} ;
4251use std:: ops:: Bound ;
52+ use std:: pin:: Pin ;
4353use std:: sync:: Arc ;
54+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
55+ use std:: task:: { Context , Poll } ;
4456use sysinfo:: System ;
4557use tokio:: runtime:: Runtime ;
4658
@@ -55,6 +67,7 @@ use crate::catalog::manifest::Manifest;
5567use crate :: catalog:: snapshot:: Snapshot ;
5668use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
5769use crate :: handlers:: http:: query:: QueryError ;
70+ use crate :: metrics:: increment_bytes_scanned_in_query_by_date;
5871use crate :: option:: Mode ;
5972use crate :: parseable:: PARSEABLE ;
6073use crate :: storage:: { ObjectStorageProvider , ObjectStoreFormat } ;
@@ -77,7 +90,27 @@ pub async fn execute(
7790 is_streaming : bool ,
7891) -> Result <
7992 (
80- Either < Vec < RecordBatch > , SendableRecordBatchStream > ,
93+ Either <
94+ Vec < RecordBatch > ,
95+ Pin <
96+ Box <
97+ RecordBatchStreamAdapter <
98+ select_all:: SelectAll <
99+ Pin <
100+ Box <
101+ dyn RecordBatchStream <
102+ Item = Result <
103+ RecordBatch ,
104+ datafusion:: error:: DataFusionError ,
105+ > ,
106+ > + Send ,
107+ > ,
108+ > ,
109+ > ,
110+ > ,
111+ > ,
112+ > ,
113+ > ,
81114 Vec < String > ,
82115 ) ,
83116 ExecuteError ,
@@ -178,7 +211,27 @@ impl Query {
178211 is_streaming : bool ,
179212 ) -> Result <
180213 (
181- Either < Vec < RecordBatch > , SendableRecordBatchStream > ,
214+ Either <
215+ Vec < RecordBatch > ,
216+ Pin <
217+ Box <
218+ RecordBatchStreamAdapter <
219+ select_all:: SelectAll <
220+ Pin <
221+ Box <
222+ dyn RecordBatchStream <
223+ Item = Result <
224+ RecordBatch ,
225+ datafusion:: error:: DataFusionError ,
226+ > ,
227+ > + Send ,
228+ > ,
229+ > ,
230+ > ,
231+ > ,
232+ > ,
233+ > ,
234+ > ,
182235 Vec < String > ,
183236 ) ,
184237 ExecuteError ,
@@ -199,10 +252,49 @@ impl Query {
199252 return Ok ( ( Either :: Left ( vec ! [ ] ) , fields) ) ;
200253 }
201254
255+ let plan = QUERY_SESSION
256+ . state ( )
257+ . create_physical_plan ( df. logical_plan ( ) )
258+ . await ?;
259+
202260 let results = if !is_streaming {
203- Either :: Left ( df. collect ( ) . await ?)
261+ let task_ctx = QUERY_SESSION . task_ctx ( ) ;
262+
263+ let batches = collect_partitioned ( plan. clone ( ) , task_ctx. clone ( ) )
264+ . await ?
265+ . into_iter ( )
266+ . flatten ( )
267+ . collect ( ) ;
268+
269+ let actual_io_bytes = get_total_bytes_scanned ( & plan) ;
270+
271+ // Track billing metrics for query scan
272+ let current_date = chrono:: Utc :: now ( ) . date_naive ( ) . to_string ( ) ;
273+ increment_bytes_scanned_in_query_by_date ( actual_io_bytes, & current_date) ;
274+
275+ Either :: Left ( batches)
204276 } else {
205- Either :: Right ( df. execute_stream ( ) . await ?)
277+ let task_ctx = QUERY_SESSION . task_ctx ( ) ;
278+
279+ let output_partitions = plan. output_partitioning ( ) . partition_count ( ) ;
280+
281+ let monitor_state = Arc :: new ( MonitorState {
282+ plan : plan. clone ( ) ,
283+ active_streams : AtomicUsize :: new ( output_partitions) ,
284+ } ) ;
285+
286+ let streams = execute_stream_partitioned ( plan. clone ( ) , task_ctx. clone ( ) ) ?
287+ . into_iter ( )
288+ . map ( |s| {
289+ let wrapped = PartitionedMetricMonitor :: new ( s, monitor_state. clone ( ) ) ;
290+ Box :: pin ( wrapped) as SendableRecordBatchStream
291+ } )
292+ . collect_vec ( ) ;
293+
294+ let merged_stream = futures:: stream:: select_all ( streams) ;
295+
296+ let final_stream = RecordBatchStreamAdapter :: new ( plan. schema ( ) , merged_stream) ;
297+ Either :: Right ( Box :: pin ( final_stream) )
206298 } ;
207299
208300 Ok ( ( results, fields) )
@@ -293,6 +385,24 @@ impl Query {
293385 }
294386}
295387
388+ /// Recursively sums up "bytes_scanned" from all nodes in the plan
389+ fn get_total_bytes_scanned ( plan : & Arc < dyn ExecutionPlan > ) -> u64 {
390+ let mut total_bytes = 0 ;
391+
392+ if let Some ( metrics) = plan. metrics ( ) {
393+ // "bytes_scanned" is the standard key used by ParquetExec
394+ if let Some ( scanned) = metrics. sum_by_name ( "bytes_scanned" ) {
395+ total_bytes += scanned. as_usize ( ) as u64 ;
396+ }
397+ }
398+
399+ for child in plan. children ( ) {
400+ total_bytes += get_total_bytes_scanned ( child) ;
401+ }
402+
403+ total_bytes
404+ }
405+
296406/// Record of counts for a given time bin.
297407#[ derive( Debug , Serialize , Clone , Deserialize ) ]
298408pub struct CountsRecord {
@@ -741,6 +851,82 @@ pub mod error {
741851 }
742852}
743853
854+ /// Shared state across all partitions
855+ struct MonitorState {
856+ plan : Arc < dyn ExecutionPlan > ,
857+ active_streams : AtomicUsize ,
858+ }
859+
860+ /// A wrapper that monitors the ExecutionPlan and logs metrics when the stream finishes.
861+ pub struct PartitionedMetricMonitor {
862+ // The actual stream doing the work
863+ inner : SendableRecordBatchStream ,
864+ /// State of the streams
865+ state : Arc < MonitorState > ,
866+ // Ensure we only emit metrics once even if polled after completion/error
867+ is_finished : bool ,
868+ }
869+
870+ impl PartitionedMetricMonitor {
871+ fn new ( inner : SendableRecordBatchStream , state : Arc < MonitorState > ) -> Self {
872+ Self {
873+ inner,
874+ state,
875+ is_finished : false ,
876+ }
877+ }
878+ }
879+
880+ impl Stream for PartitionedMetricMonitor {
881+ type Item = datafusion:: error:: Result < RecordBatch > ;
882+
883+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
884+ if self . is_finished {
885+ return Poll :: Ready ( None ) ;
886+ }
887+
888+ let poll = self . inner . as_mut ( ) . poll_next ( cx) ;
889+
890+ // Check if the stream just finished
891+ match & poll {
892+ Poll :: Ready ( None ) => {
893+ self . is_finished = true ;
894+ self . check_if_last_stream ( ) ;
895+ }
896+ Poll :: Ready ( Some ( Err ( e) ) ) => {
897+ tracing:: error!( "Stream Failed with error: {}" , e) ;
898+ self . is_finished = true ;
899+ self . check_if_last_stream ( ) ;
900+ }
901+ _ => { }
902+ }
903+
904+ poll
905+ }
906+
907+ fn size_hint ( & self ) -> ( usize , Option < usize > ) {
908+ ( 0 , None )
909+ }
910+ }
911+
912+ impl RecordBatchStream for PartitionedMetricMonitor {
913+ fn schema ( & self ) -> SchemaRef {
914+ self . inner . schema ( )
915+ }
916+ }
917+
918+ impl PartitionedMetricMonitor {
919+ fn check_if_last_stream ( & self ) {
920+ let prev_count = self . state . active_streams . fetch_sub ( 1 , Ordering :: SeqCst ) ;
921+
922+ if prev_count == 1 {
923+ let bytes = get_total_bytes_scanned ( & self . state . plan ) ;
924+ let current_date = chrono:: Utc :: now ( ) . date_naive ( ) . to_string ( ) ;
925+ increment_bytes_scanned_in_query_by_date ( bytes, & current_date) ;
926+ }
927+ }
928+ }
929+
744930#[ cfg( test) ]
745931mod tests {
746932 use serde_json:: json;
0 commit comments