@@ -73,6 +73,26 @@ use crate::parseable::{DEFAULT_TENANT, PARSEABLE};
7373use crate :: storage:: { ObjectStorageProvider , ObjectStoreFormat } ;
7474use crate :: utils:: time:: TimeRange ;
7575
76+ /// Boxed record-batch stream used as the streaming half of query results.
77+ type BoxedBatchStream = Pin <
78+ Box <
79+ RecordBatchStreamAdapter <
80+ select_all:: SelectAll <
81+ Pin <
82+ Box <
83+ dyn RecordBatchStream <
84+ Item = Result < RecordBatch , datafusion:: error:: DataFusionError > ,
85+ > + Send ,
86+ > ,
87+ > ,
88+ > ,
89+ > ,
90+ > ,
91+ > ;
92+
93+ /// Result type returned by query execution: either collected batches or a streaming adapter, plus field names.
94+ type QueryResult = Result < ( Either < Vec < RecordBatch > , BoxedBatchStream > , Vec < String > ) , ExecuteError > ;
95+
7696// pub static QUERY_SESSION: Lazy<SessionContext> =
7797// Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));
7898
@@ -133,37 +153,7 @@ impl InMemorySessionContext {
133153
134154/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
135155/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
136- pub async fn execute (
137- query : Query ,
138- is_streaming : bool ,
139- tenant_id : & Option < String > ,
140- ) -> Result <
141- (
142- Either <
143- Vec < RecordBatch > ,
144- Pin <
145- Box <
146- RecordBatchStreamAdapter <
147- select_all:: SelectAll <
148- Pin <
149- Box <
150- dyn RecordBatchStream <
151- Item = Result <
152- RecordBatch ,
153- datafusion:: error:: DataFusionError ,
154- > ,
155- > + Send ,
156- > ,
157- > ,
158- > ,
159- > ,
160- > ,
161- > ,
162- > ,
163- Vec < String > ,
164- ) ,
165- ExecuteError ,
166- > {
156+ pub async fn execute ( query : Query , is_streaming : bool , tenant_id : & Option < String > ) -> QueryResult {
167157 let id = tenant_id. clone ( ) ;
168158 QUERY_RUNTIME
169159 . spawn ( async move { query. execute ( is_streaming, & id) . await } )
@@ -272,37 +262,15 @@ impl Query {
272262 /// this function returns the result of the query
273263 /// if streaming is true, it returns a stream
274264 /// if streaming is false, it returns a vector of record batches
275- pub async fn execute (
276- & self ,
277- is_streaming : bool ,
278- tenant_id : & Option < String > ,
279- ) -> Result <
280- (
281- Either <
282- Vec < RecordBatch > ,
283- Pin <
284- Box <
285- RecordBatchStreamAdapter <
286- select_all:: SelectAll <
287- Pin <
288- Box <
289- dyn RecordBatchStream <
290- Item = Result <
291- RecordBatch ,
292- datafusion:: error:: DataFusionError ,
293- > ,
294- > + Send ,
295- > ,
296- > ,
297- > ,
298- > ,
299- > ,
300- > ,
301- > ,
302- Vec < String > ,
303- ) ,
304- ExecuteError ,
305- > {
265+ #[ tracing:: instrument(
266+ name = "datafusion.execute" ,
267+ skip( self , is_streaming, tenant_id) ,
268+ fields(
269+ db. system. name = "datafusion" ,
270+ db. operation. name = "SELECT" ,
271+ )
272+ ) ]
273+ pub async fn execute ( & self , is_streaming : bool , tenant_id : & Option < String > ) -> QueryResult {
306274 let df = QUERY_SESSION
307275 . get_ctx ( )
308276 . execute_logical_plan ( self . final_logical_plan ( tenant_id) )
0 commit comments