Skip to content

Commit e2f15f6

Browse files
kavpreetgrewalHeartSaVioR
authored andcommitted
[SPARK-56243][SS] Throw detailed error on malformed Kafka record timestamps
### What changes were proposed in this pull request? Throw a detailed `KAFKA_MALFORMED_RECORD_TIMESTAMP` message when a record has a malformed or incorrect precision timestamp. ### Why are the changes needed? Some users may use custom producers which set the timestamp to an incorrect precision level than what is expected by Kafka. Upon hitting this issue, they would get a generic arithmetic overflow error. ### Does this PR introduce _any_ user-facing change? Yes, it introduces a new error message, however there is no other behavioral change. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No. Closes #55038 from kavpreetgrewal/SPARK-56243-timestamp-parsing. Authored-by: Kavpreet Grewal <kavpreet.grewal@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent ccd4206 commit e2f15f6

4 files changed

Lines changed: 74 additions & 2 deletions

File tree

connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,5 +120,13 @@
120120
"message" : [
121121
"The specified start <offsetType> <startOffset> is greater than the end <offsetType> <endOffset> for topic <topic> partition <partition>."
122122
]
123+
},
124+
"KAFKA_MALFORMED_RECORD_TIMESTAMP" : {
125+
"message": [
126+
"Record at topic <topic> partition <partition> offset <offset> has timestamp value <timestamp> that overflows when converting to Spark's microsecond-precision TimestampType.",
127+
"Kafka record timestamps must be in milliseconds since epoch per the Kafka protocol. The provided value suggests the producer may be writing timestamps in microseconds or nanoseconds instead.",
128+
"Ensure the Kafka producer sets the ProducerRecord timestamp in milliseconds."
129+
],
130+
"sqlState": "22008"
123131
}
124132
}

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,25 @@ object KafkaExceptions {
178178
"assignedPartitions" -> assignedPartitions.toString))
179179
}
180180

181+
def kafkaMalformedRecordTimestamp(
182+
topic: String,
183+
partition: Int,
184+
offset: Long,
185+
timestamp: Long,
186+
cause: Throwable
187+
): KafkaIllegalStateException = {
188+
new KafkaIllegalStateException(
189+
errorClass = "KAFKA_MALFORMED_RECORD_TIMESTAMP",
190+
messageParameters = Map(
191+
"topic" -> topic,
192+
"partition" -> partition.toString,
193+
"offset" -> offset.toString,
194+
"timestamp" -> timestamp.toString
195+
),
196+
cause = cause
197+
)
198+
}
199+
181200
def nullTopicInData(): KafkaIllegalStateException = {
182201
new KafkaIllegalStateException(
183202
errorClass = "KAFKA_NULL_TOPIC_IN_DATA",

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ private[kafka010] class KafkaRecordToRowConverter {
3939
val toInternalRowWithoutHeaders: Record => InternalRow =
4040
(cr: Record) => InternalRow(
4141
cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset,
42-
DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id
42+
parseTimestamp(cr), cr.timestampType.id
4343
)
4444

4545
val toInternalRowWithHeaders: Record => InternalRow =
4646
(cr: Record) => InternalRow(
4747
cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset,
48-
DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id,
48+
parseTimestamp(cr), cr.timestampType.id,
4949
if (cr.headers.iterator().hasNext) {
5050
new GenericArrayData(cr.headers.iterator().asScala
5151
.map(header =>
@@ -90,4 +90,19 @@ private[kafka010] object KafkaRecordToRowConverter {
9090
def kafkaSchema(includeHeaders: Boolean): StructType = {
9191
if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
9292
}
93+
94+
private def parseTimestamp(cr: Record): Long = {
95+
try {
96+
DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp))
97+
} catch {
98+
case e: ArithmeticException =>
99+
throw KafkaExceptions.kafkaMalformedRecordTimestamp(
100+
cr.topic(),
101+
cr.partition(),
102+
cr.offset(),
103+
cr.timestamp(),
104+
e
105+
)
106+
}
107+
}
93108
}

connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1589,6 +1589,36 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with
15891589
q.stop()
15901590
}
15911591
}
1592+
1593+
test("SPARK-56243: malformed record timestamp throws detailed error") {
1594+
val topic = newTopic()
1595+
testUtils.createTopic(topic, partitions = 1)
1596+
1597+
// Nanosecond-precision timestamp that overflows millis-to-micros conversion
1598+
val nanoTimestamp = 1712345678123456789L
1599+
val record = new RecordBuilder(topic, "value").partition(0).timestamp(nanoTimestamp).build()
1600+
testUtils.sendMessages(Seq(record))
1601+
1602+
val kafka = spark
1603+
.readStream
1604+
.format("kafka")
1605+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
1606+
.option("subscribe", topic)
1607+
.option("startingOffsets", "earliest")
1608+
.load()
1609+
1610+
testStream(kafka)(
1611+
StartStream(),
1612+
ExpectFailure[KafkaIllegalStateException](e => {
1613+
val ex = e.asInstanceOf[KafkaIllegalStateException]
1614+
assert(ex.getCondition === "KAFKA_MALFORMED_RECORD_TIMESTAMP")
1615+
assert(ex.getSqlState === "22008")
1616+
assert(ex.getMessage.contains(topic))
1617+
assert(ex.getMessage.contains(nanoTimestamp.toString))
1618+
assert(ex.getCause.isInstanceOf[ArithmeticException])
1619+
})
1620+
)
1621+
}
15921622
}
15931623

15941624
abstract class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {

0 commit comments

Comments
 (0)