Skip to content

Commit 03776da

Browse files
authored
[AURON #2083] Support kafka partition discovery (#2111)
# Which issue does this PR close? Closes #2083 # Rationale for this change * Auron Kafka Source supports automatic detection of new Kafka partitions # What changes are included in this PR? * modify AuronKafkaDynamicTableFactory and AuronKafkaDynamicTableSource to add a partition discovery interval * modify AuronKafkaSourceFunction to add partition discovery and write to native * modify `kafka_scan_exec.rs` to enhance the ability to periodically monitor partition changes # Are there any user-facing changes? * No # How was this patch tested? * No kafka environment
1 parent ffbf437 commit 03776da

4 files changed

Lines changed: 184 additions & 6 deletions

File tree

auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ public class AuronKafkaDynamicTableFactory implements DynamicTableSourceFactory
9393
.withDescription(
9494
"When mock data generated, remember that the first three columns of each row are serialized_kafka_records_partition, serialized_kafka_records_offset, and serialized_kafka_records_timestamp.");
9595

96+
public static final ConfigOption<Long> PARTITION_DISCOVERY_INTERVAL_MS = ConfigOptions.key(
97+
"partition.discovery.interval.ms")
98+
.longType()
99+
.defaultValue(300000L)
100+
.withDescription("Kafka source partition discovery interval in milliseconds. "
101+
+ "Non-positive values disable partition discovery. Default is 300000 (5 minutes).");
102+
96103
@Override
97104
public DynamicTableSource createDynamicTableSource(Context context) {
98105
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
@@ -114,7 +121,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
114121
formatConfig,
115122
tableOptions.get(BUFFER_SIZE),
116123
tableOptions.get(START_UP_MODE),
117-
tableOptions.get(KAFKA_MOCK_DATA));
124+
tableOptions.get(KAFKA_MOCK_DATA),
125+
tableOptions.get(PARTITION_DISCOVERY_INTERVAL_MS));
118126
} catch (Exception e) {
119127
throw new FlinkRuntimeException("Could not create Auron Kafka dynamic table source", e);
120128
}
@@ -146,6 +154,10 @@ public Set<ConfigOption<?>> optionalOptions() {
146154
options.add(PB_ROOT_MESSAGE_NAME);
147155
options.add(BUFFER_SIZE);
148156
options.add(NESTED_COLS_FIELD_MAPPING);
157+
options.add(PB_SKIP_FIELDS);
158+
options.add(START_UP_MODE);
159+
options.add(KAFKA_MOCK_DATA);
160+
options.add(PARTITION_DISCOVERY_INTERVAL_MS);
149161
return options;
150162
}
151163

auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class AuronKafkaDynamicTableSource implements ScanTableSource, SupportsWa
4848
private final int bufferSize;
4949
private final String startupMode;
5050
private final String mockData;
51+
private final long partitionDiscoveryIntervalMs;
5152
/** Watermark strategy that is used to generate per-partition watermark. */
5253
protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
5354

@@ -59,7 +60,8 @@ public AuronKafkaDynamicTableSource(
5960
Map<String, String> formatConfig,
6061
int bufferSize,
6162
String startupMode,
62-
String mockData) {
63+
String mockData,
64+
long partitionDiscoveryIntervalMs) {
6365
final LogicalType physicalType = physicalDataType.getLogicalType();
6466
Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row data type expected.");
6567
this.physicalDataType = physicalDataType;
@@ -70,6 +72,7 @@ public AuronKafkaDynamicTableSource(
7072
this.bufferSize = bufferSize;
7173
this.startupMode = startupMode;
7274
this.mockData = mockData;
75+
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
7376
}
7477

7578
@Override
@@ -88,7 +91,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
8891
format,
8992
formatConfig,
9093
bufferSize,
91-
startupMode);
94+
startupMode,
95+
partitionDiscoveryIntervalMs);
9296

9397
if (watermarkStrategy != null) {
9498
sourceFunction.setWatermarkStrategy(watermarkStrategy);
@@ -116,7 +120,15 @@ public boolean isBounded() {
116120
@Override
117121
public DynamicTableSource copy() {
118122
return new AuronKafkaDynamicTableSource(
119-
physicalDataType, kafkaTopic, kafkaProperties, format, formatConfig, bufferSize, startupMode, mockData);
123+
physicalDataType,
124+
kafkaTopic,
125+
kafkaProperties,
126+
format,
127+
formatConfig,
128+
bufferSize,
129+
startupMode,
130+
mockData,
131+
partitionDiscoveryIntervalMs);
120132
}
121133

122134
@Override

auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import java.io.InputStream;
2323
import java.lang.reflect.Field;
2424
import java.util.*;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.TimeUnit;
2528
import org.apache.auron.flink.arrow.FlinkArrowReader;
2629
import org.apache.auron.flink.arrow.FlinkArrowUtils;
2730
import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
@@ -94,6 +97,7 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData
9497
private final Map<String, String> formatConfig;
9598
private final int bufferSize;
9699
private final String startupMode;
100+
private final long partitionDiscoveryIntervalMs;
97101
private String mockData;
98102
private transient PhysicalPlanNode physicalPlanNode;
99103

@@ -117,6 +121,10 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData
117121
private transient KafkaConsumer<byte[], byte[]> kafkaConsumer;
118122
private transient List<Integer> assignedPartitions;
119123

124+
// Partition discovery related
125+
private transient ScheduledExecutorService partitionDiscoveryScheduler;
126+
private transient volatile int knownPartitionCount;
127+
120128
// Watermark related: uses table-runtime WatermarkGenerator directly
121129
private WatermarkStrategy<RowData> watermarkStrategy;
122130
private transient WatermarkGenerator tableWatermarkGenerator;
@@ -131,7 +139,8 @@ public AuronKafkaSourceFunction(
131139
String format,
132140
Map<String, String> formatConfig,
133141
int bufferSize,
134-
String startupMode) {
142+
String startupMode,
143+
long partitionDiscoveryIntervalMs) {
135144
this.outputType = outputType;
136145
this.auronOperatorId = auronOperatorId;
137146
this.topic = topic;
@@ -140,6 +149,7 @@ public AuronKafkaSourceFunction(
140149
this.formatConfig = formatConfig;
141150
this.bufferSize = bufferSize;
142151
this.startupMode = startupMode;
152+
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
143153
}
144154

145155
@Override
@@ -223,6 +233,7 @@ public void open(Configuration config) throws Exception {
223233
auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
224234
auronRuntimeInfo.put("restored_offsets", restoredOffsets);
225235
auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
236+
auronRuntimeInfo.put("partition_discovery_interval_ms", partitionDiscoveryIntervalMs);
226237
JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo));
227238
LOG.info(
228239
"Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, "
@@ -231,6 +242,25 @@ public void open(Configuration config) throws Exception {
231242
enableCheckpoint,
232243
subtaskIndex,
233244
assignedPartitions);
245+
246+
// 4. Initialize partition discovery scheduler
247+
this.knownPartitionCount = partitionInfos.size();
248+
if (partitionDiscoveryIntervalMs > 0) {
249+
this.partitionDiscoveryScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
250+
Thread t = new Thread(r, "auron-kafka-partition-discovery-" + subtaskIndex);
251+
t.setDaemon(true);
252+
return t;
253+
});
254+
partitionDiscoveryScheduler.scheduleWithFixedDelay(
255+
() -> discoverNewPartitions(subtaskIndex, numSubtasks),
256+
partitionDiscoveryIntervalMs,
257+
partitionDiscoveryIntervalMs,
258+
TimeUnit.MILLISECONDS);
259+
LOG.info(
260+
"Partition discovery enabled for subtask {} with interval {}ms",
261+
subtaskIndex,
262+
partitionDiscoveryIntervalMs);
263+
}
234264
}
235265
sourcePlan.setKafkaScan(scanExecNode.build());
236266
this.physicalPlanNode = sourcePlan.build();
@@ -356,6 +386,15 @@ public void cancel() {
356386
public void close() throws Exception {
357387
this.isRunning = false;
358388

389+
// Shut down partition discovery scheduler before closing the consumer it uses
390+
if (partitionDiscoveryScheduler != null) {
391+
try {
392+
partitionDiscoveryScheduler.shutdownNow();
393+
} catch (Exception e) {
394+
LOG.warn("Fail to shut down kafka partition discovery thread pool", e);
395+
}
396+
}
397+
359398
// Close the metadata-only Kafka Consumer
360399
if (kafkaConsumer != null) {
361400
kafkaConsumer.close();
@@ -476,4 +515,46 @@ public void setMockData(String mockData) {
476515
Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null");
477516
this.mockData = mockData;
478517
}
518+
519+
private void discoverNewPartitions(int subtaskIndex, int numSubtasks) {
520+
if (isRunning) {
521+
try {
522+
List<PartitionInfo> currentPartitionInfos = kafkaConsumer.partitionsFor(topic);
523+
int currentPartitionCount = currentPartitionInfos.size();
524+
525+
if (currentPartitionCount > knownPartitionCount) {
526+
LOG.info(
527+
"Discovered new partitions for topic {}: {} -> {}",
528+
topic,
529+
knownPartitionCount,
530+
currentPartitionCount);
531+
532+
// Always send all new partitions since initialPartitionCount (not incremental)
533+
List<Integer> allNewPartitionsForThisSubtask = new ArrayList<>();
534+
for (PartitionInfo partitionInfo : currentPartitionInfos) {
535+
int partitionId = partitionInfo.partition();
536+
if (partitionId >= knownPartitionCount) {
537+
if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) {
538+
allNewPartitionsForThisSubtask.add(partitionId);
539+
}
540+
}
541+
}
542+
543+
if (!allNewPartitionsForThisSubtask.isEmpty()) {
544+
String newPartitionsKey = auronOperatorIdWithSubtaskIndex + "-new-partitions";
545+
LOG.info(
546+
"Subtask {} discovered new partitions to consume: {}",
547+
subtaskIndex,
548+
allNewPartitionsForThisSubtask);
549+
JniBridge.putResource(
550+
newPartitionsKey, mapper.writeValueAsString(allNewPartitionsForThisSubtask));
551+
}
552+
553+
knownPartitionCount = currentPartitionCount;
554+
}
555+
} catch (Exception e) {
556+
LOG.warn("Error discovering new partitions for topic {}: {}", topic, e.getMessage());
557+
}
558+
}
559+
}
479560
}

native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,15 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use std::{any::Any, collections::HashMap, env, fmt::Formatter, fs, sync::Arc};
16+
use std::{
17+
any::Any,
18+
collections::{HashMap, HashSet},
19+
env,
20+
fmt::Formatter,
21+
fs,
22+
sync::Arc,
23+
time::Instant,
24+
};
1725

1826
use arrow::array::{
1927
ArrayBuilder, BinaryArray, BinaryBuilder, Int32Array, Int32Builder, Int64Array, Int64Builder,
@@ -262,6 +270,10 @@ fn read_serialized_records_from_kafka(
262270
"No partitions found for topic: {kafka_topic}"
263271
)));
264272
}
273+
let partition_discovery_interval_ms = task_json
274+
.get("partition_discovery_interval_ms")
275+
.as_i64()
276+
.expect("partition_discovery_interval_ms is not valid json");
265277
let kafka_properties = sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
266278
.expect("kafka_properties_json is not valid json");
267279
let mut config = ClientConfig::new();
@@ -337,6 +349,11 @@ fn read_serialized_records_from_kafka(
337349
let mut serialized_kafka_records_offset_builder = Int64Builder::with_capacity(0);
338350
let mut serialized_kafka_records_timestamp_builder = Int64Builder::with_capacity(0);
339351
let mut serialized_pb_records_builder = BinaryBuilder::with_capacity(batch_size, 0);
352+
353+
let mut last_partition_check = Instant::now();
354+
let partition_check_interval =
355+
std::time::Duration::from_millis(partition_discovery_interval_ms.max(0) as u64);
356+
340357
loop {
341358
while serialized_pb_records_builder.len() < batch_size {
342359
match consumer.recv().await {
@@ -363,6 +380,62 @@ fn read_serialized_records_from_kafka(
363380
],
364381
)?;
365382
sender.send(batch).await;
383+
384+
// Check for new partitions if partition discovery is enabled
385+
if partition_discovery_interval_ms > 0
386+
&& last_partition_check.elapsed() >= partition_check_interval
387+
{
388+
let mut known_partitions: HashSet<i32> = partitions.iter().cloned().collect();
389+
last_partition_check = Instant::now();
390+
let new_partitions_key = auron_operator_id.clone() + "-new-partitions";
391+
let resource_id = jni_new_string!(&new_partitions_key)?;
392+
let java_json_str = jni_call_static!(
393+
JniBridge.getResource(resource_id.as_obj()) -> JObject
394+
)?;
395+
if !java_json_str.0.is_null() {
396+
let new_partitions_json = jni_get_string!(java_json_str.as_obj().into())
397+
.expect("new_partitions json is not valid java string");
398+
let new_partitions: Vec<i32> = sonic_rs::from_str(&new_partitions_json)
399+
.expect("new_partitions_json is not valid json");
400+
401+
let truly_new: Vec<i32> = new_partitions
402+
.iter()
403+
.filter(|p| !known_partitions.contains(p))
404+
.cloned()
405+
.collect();
406+
407+
if !truly_new.is_empty() {
408+
log::info!(
409+
"Subtask {subtask_index} discovered new partitions: \
410+
{truly_new:?}, consuming from beginning"
411+
);
412+
413+
known_partitions.extend(&truly_new);
414+
415+
let all_partitions: Vec<i32> =
416+
known_partitions.iter().cloned().collect();
417+
418+
let mut ressgined =
419+
consumer.position().expect("Cannot got partitions position");
420+
421+
// New partitions start from the beginning
422+
for &p in &truly_new {
423+
let _ = ressgined.add_partition_offset(
424+
&kafka_topic,
425+
p,
426+
Offset::Beginning,
427+
);
428+
}
429+
430+
consumer
431+
.assign(&ressgined)
432+
.expect("Cannot reassign partitions to consumer");
433+
434+
partitions = all_partitions;
435+
}
436+
}
437+
}
438+
366439
if enable_checkpoint {
367440
// if checkpoint is enabled, commit offsets to kafka
368441
let offset_to_commit = auron_operator_id.clone() + "-offsets2commit";

0 commit comments

Comments
 (0)