-
-
Notifications
You must be signed in to change notification settings - Fork 162
Expand file tree
/
Copy pathresponse.rs
More file actions
68 lines (59 loc) · 2.25 KB
/
response.rs
File metadata and controls
68 lines (59 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use datafusion::arrow::record_batch::RecordBatch;
use serde_json::{Value, json};
use tracing::info;
pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
pub fill_null: bool,
pub with_fields: bool,
}
impl QueryResponse {
pub fn to_json(&self) -> Result<Value, QueryError> {
info!("{}", "Returning query results");
// Process in batches to avoid massive allocations
const BATCH_SIZE: usize = 100; // Process 100 record batches at a time
let mut all_values = Vec::new();
for chunk in self.records.chunks(BATCH_SIZE) {
let mut json_records = record_batches_to_json(chunk)?;
if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
if !map.contains_key(field) {
map.insert(field.clone(), Value::Null);
}
}
}
}
// Convert this batch to values and add to collection
let batch_values: Vec<Value> = json_records.into_iter().map(Value::Object).collect();
all_values.extend(batch_values);
}
let response = if self.with_fields {
json!({
"fields": self.fields,
"records": all_values,
})
} else {
Value::Array(all_values)
};
Ok(response)
}
}