Skip to content

Commit f0953e1

Browse files
authored
[AURON #1849] Introduce native json deserializer (#2112)
# Which issue does this PR close? Closes #1849 # Rationale for this change * auron flink kafka connector support json # What changes are included in this PR? * add json_deserializer to deserialize JSON data from Kafka * modify kafka_scan_exec to supports selecting different deserializers based on the data format # Are there any user-facing changes? * No # How was this patch tested? * No kafka environment, test via rust UT for json_deserializer
1 parent 03776da commit f0953e1

4 files changed

Lines changed: 1069 additions & 27 deletions

File tree

native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ use sonic_rs::{JsonContainerTrait, JsonValueTrait};
5252

5353
use crate::{
5454
common::{column_pruning::ExecuteWithColumnPruning, execution_context::ExecutionContext},
55-
flink::serde::{flink_deserializer::FlinkDeserializer, pb_deserializer::PbDeserializer},
55+
flink::serde::{
56+
flink_deserializer::FlinkDeserializer, json_deserializer::JsonDeserializer,
57+
pb_deserializer::PbDeserializer,
58+
},
5659
rdkafka::Message,
5760
};
5861

@@ -132,6 +135,7 @@ impl KafkaScanExec {
132135
exec_ctx.output_schema(),
133136
exec_ctx.clone(),
134137
serialized_pb_stream,
138+
self.data_format,
135139
self.format_config_json.clone(),
136140
)?;
137141
Ok(deserialized_pb_stream)
@@ -481,6 +485,7 @@ fn parse_records(
481485
schema: SchemaRef,
482486
exec_ctx: Arc<ExecutionContext>,
483487
mut input_stream: SendableRecordBatchStream,
488+
data_format: i32,
484489
parser_config_json: String,
485490
) -> Result<SendableRecordBatchStream> {
486491
let parser_config = sonic_rs::from_str::<sonic_rs::Value>(&parser_config_json)
@@ -523,13 +528,17 @@ fn parse_records(
523528
"KafkaScanExec.ParseRecords",
524529
move |sender| async move {
525530
// TODO: json parser
526-
let mut parser: Box<dyn FlinkDeserializer> = Box::new(PbDeserializer::new(
527-
&file_descriptor_bytes,
528-
&root_message_name,
529-
schema.clone(),
530-
&nested_msg_mapping,
531-
&skip_fields_vec,
532-
)?);
531+
let mut parser: Box<dyn FlinkDeserializer> = if data_format == 0 {
532+
Box::new(JsonDeserializer::new(schema.clone(), &nested_msg_mapping)?)
533+
} else {
534+
Box::new(PbDeserializer::new(
535+
&file_descriptor_bytes,
536+
&root_message_name,
537+
schema.clone(),
538+
&nested_msg_mapping,
539+
&skip_fields_vec,
540+
)?)
541+
};
533542
while let Some(batch) = input_stream.next().await.transpose()? {
534543
let kafka_partition = batch
535544
.column(0)

0 commit comments

Comments
 (0)