Skip to content

Commit 8dece8d

Browse files
committed
Workaround for tests failing due to behavior change in first consumer poll
1 parent 39f1008 commit 8dece8d

3 files changed

Lines changed: 12 additions & 2 deletions

File tree

datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,13 @@ protected void trackEventsProcessedProgress(int recordCount) {
508508
}
509509

510510
protected ConsumerRecords<?, ?> consumerPoll(long pollInterval) {
511-
return _consumer.poll(Duration.ofMillis(pollInterval));
511+
if (pollInterval == 0) {
512+
// Brooklin calls poll with 0 pollInterval when a task is newly initialized. There's a behavior change between old
513+
// and new poll in this case. We need to understand that behavior better before removing usages of deprecated API
514+
return _consumer.poll(pollInterval);
515+
} else {
516+
return _consumer.poll(Duration.ofMillis(pollInterval));
517+
}
512518
}
513519

514520
/**

datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,10 @@ protected void maybeCommitOffsets(Consumer<?, ?> consumer, boolean hardCommit) {
423423
if (_enablePartitionAssignment && _consumerAssignment.isEmpty()) {
424424
// Kafka rejects a poll if there is empty assignment
425425
return ConsumerRecords.EMPTY;
426+
} else if (pollInterval == 0) {
427+
// BMM calls poll with 0 pollInterval when a task is newly initialized. There's a behavior change between old poll
428+
// and new poll in this case. We need to understand that behavior better before removing usages of deprecated API
429+
return _consumer.poll(pollInterval);
426430
} else {
427431
return _consumer.poll(Duration.ofMillis(pollInterval));
428432
}

gradle/maven.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
allprojects {
2-
version = "5.3.0-SNAPSHOT"
2+
version = "5.3.2-SNAPSHOT"
33
}
44

55
subprojects {

0 commit comments

Comments
 (0)