Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import java.util.regex.Pattern

import scala.util.control.NonFatal

import org.apache.spark.QueryContext
import org.apache.spark.{QueryContext, SparkException}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros, rebaseJulianToGregorianDays, rebaseJulianToGregorianMicros}
import org.apache.spark.sql.errors.ExecutionErrors
import org.apache.spark.sql.types.{DateType, TimestampType, TimeType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.sql.types.{DateType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampType, TimeType}
import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String}
import org.apache.spark.util.SparkClassUtils

trait SparkDateTimeUtils {
Expand Down Expand Up @@ -474,10 +474,10 @@ trait SparkDateTimeUtils {
* order to distinguish between 0L and null. The following formats are allowed:
*
* `[+-]yyyy*` `[+-]yyyy*-[m]m` `[+-]yyyy*-[m]m-[d]d` `[+-]yyyy*-[m]m-[d]d `
* `[+-]yyyy*-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
* `[+-]yyyy*-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
* `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
* `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
* `[+-]yyyy*-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][ns][ns][ns][zone_id]`
* `[+-]yyyy*-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][ns][ns][ns][zone_id]`
* `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][ns][ns][ns][zone_id]`
* `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][ns][ns][ns][zone_id]`
*
* where `zone_id` should have one of the forms:
* - Z - Zulu time zone UTC+0
Expand All @@ -491,6 +491,11 @@ trait SparkDateTimeUtils {
* - +|-hhmmss
* - Region-based zone IDs in the form `area/city`, such as `Europe/Paris`
*
* Up to 9 fractional-second digits are accepted. Digits 1-6 are kept as microseconds in
* `segments(6)` (backward-compatible micro behavior), digits 7-9 are kept as the
* sub-microsecond remainder in `segments(9)` (a value in [0, 999]), and digits beyond the 9th
* are dropped.
*
* @return
* timestamp segments, time zone id and whether the input is just time without a date. If the
* input string can't be parsed as timestamp, the result timestamp segments are empty.
Expand All @@ -499,7 +504,8 @@ trait SparkDateTimeUtils {
def isValidDigits(segment: Int, digits: Int): Boolean = {
// A Long is able to represent a timestamp within [+-]200 thousand years
val maxDigitsYear = 6
// For the nanosecond part, more than 6 digits is allowed, but will be truncated.
// Fractional digits 1-6 form microseconds; digits 7-9 are retained as the sub-microsecond
// remainder in segments(9); only digits beyond the 9th are dropped.
segment == 6 || (segment == 0 && digits >= 4 && digits <= maxDigitsYear) ||
// For the zoneId segment(7), it's could be zero digits when it's a region-based zone ID
(segment == 7 && digits <= 2) ||
Expand All @@ -509,7 +515,12 @@ trait SparkDateTimeUtils {
return (Array.empty, None, false)
}
var tz: Option[String] = None
val segments: Array[Int] = Array[Int](1, 1, 1, 0, 0, 0, 0, 0, 0)
// Indices 0-6 hold year, month, day, hour, minute, second and the microsecond part of the
// fractional second (digits 1-6). Index 9 is an output-only slot that holds the
// sub-microsecond remainder (fractional digits 7-9) as a value in [0, 999]; it is never
// written by the parsing loop below. Indices 7-8 are written by the loop as `i` advances
// but their values are never read by any caller.
val segments: Array[Int] = Array[Int](1, 1, 1, 0, 0, 0, 0, 0, 0, 0)
var i = 0
var currentSegmentValue = 0
var currentSegmentDigits = 0
Expand All @@ -522,6 +533,7 @@ trait SparkDateTimeUtils {
}

var digitsMilli = 0
var nanosWithinMicro = 0
var justTime = false
var yearSign: Option[Int] = None
if (bytes(j) == '-' || bytes(j) == '+') {
Expand Down Expand Up @@ -604,7 +616,9 @@ trait SparkDateTimeUtils {
i += 1
}
} else {
if (i < segments.length && (b == ':' || b == ' ')) {
// Bound is fixed at 9 (the original number of parsed segments) so that the trailing
// output-only slot segments(9) is never written by the parsing loop.
if (i < 9 && (b == ':' || b == ' ')) {
if (!isValidDigits(i, currentSegmentDigits)) {
return (Array.empty, None, false)
}
Expand All @@ -620,10 +634,13 @@ trait SparkDateTimeUtils {
if (i == 6) {
digitsMilli += 1
}
// We will truncate the nanosecond part if there are more than 6 digits, which results
// in loss of precision
if (i != 6 || currentSegmentDigits < 6) {
// Fractional digits 1-6 form the microsecond part stored in segments(6).
currentSegmentValue = currentSegmentValue * 10 + parsedValue
} else if (currentSegmentDigits < 9) {
// Fractional digits 7-9 are retained as the sub-microsecond remainder. Digits beyond
// the 9th are dropped (loss of precision below the nanosecond grid).
nanosWithinMicro = nanosWithinMicro * 10 + parsedValue
}
currentSegmentDigits += 1
}
Expand All @@ -640,6 +657,17 @@ trait SparkDateTimeUtils {
digitsMilli += 1
}

// Right-pad the captured sub-microsecond digits (the 7th to 9th fractional digits) so that
// segments(9) always holds a value in [0, 999]. The number of captured digits is
// clamp(digitsMilli - 6, 0, 3); fewer captured digits means the remainder is left-aligned and
// must be scaled up (e.g. ".0000001" -> 100, ".00000012" -> 120, ".000000123" -> 123).
var subMicroDigits = math.max(0, math.min(digitsMilli, 9) - 6)
while (subMicroDigits < 3) {
nanosWithinMicro *= 10
subMicroDigits += 1
}
segments(9) = nanosWithinMicro

// This step also validates time zone part
val zoneId = tz.map(zoneName => getZoneId(zoneName.trim))
segments(0) *= yearSign.getOrElse(1)
Expand Down Expand Up @@ -713,6 +741,130 @@ trait SparkDateTimeUtils {
}
}

/**
* Truncates the sub-microsecond remainder (`segments(9)`, a value in [0, 999]) to the given
* fractional-second `precision`. Since microseconds occupy fractional digits 1-6, a `precision`
* in [7, 9] only affects the sub-microsecond digits: digits beyond `precision` are dropped
* (truncation toward zero, consistent with the microsecond parsing path).
*/
private def truncateNanosWithinMicro(nanosWithinMicro: Int, precision: Int): Short = {
val factor = precision match {
case 7 => 100
case 8 => 10
case 9 => 1
case _ =>
throw SparkException.internalError(
s"truncateNanosWithinMicro called with precision $precision outside [7, 9]")
}
((nanosWithinMicro / factor) * factor).toShort
}

/**
* Trims and parses a given UTF8 string into a [[TimestampNanosVal]] (epoch microseconds plus a
* sub-microsecond remainder in [0, 999]) for `TIMESTAMP_LTZ(precision)` with `precision` in [7,
* 9]. Fractional digits beyond `precision` are truncated. The return type is [[Option]] in
* order to distinguish between a valid zero value and null. Please refer to
* `parseTimestampString` for the allowed formats.
*/
def stringToTimestampLTZNanos(
s: UTF8String,
precision: Int,
timeZoneId: ZoneId): Option[TimestampNanosVal] = {
if (precision < 7 || precision > 9) {
throw SparkException.internalError(
s"stringToTimestampLTZNanos: precision $precision is out of range [7, 9]")
}
try {
val (segments, parsedZoneId, justTime) = parseTimestampString(s)
if (segments.isEmpty) {
return None
}
val zoneId = parsedZoneId.getOrElse(timeZoneId)
val nanoseconds = MICROSECONDS.toNanos(segments(6))
val localTime = LocalTime.of(segments(3), segments(4), segments(5), nanoseconds.toInt)
val localDate = if (justTime) {
LocalDate.now(zoneId)
} else {
LocalDate.of(segments(0), segments(1), segments(2))
}
val localDateTime = LocalDateTime.of(localDate, localTime)
val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
val instant = Instant.from(zonedDateTime)
val epochMicros = instantToMicros(instant)
Some(
TimestampNanosVal
.fromParts(epochMicros, truncateNanosWithinMicro(segments(9), precision)))
} catch {
case NonFatal(_) => None
}
}

def stringToTimestampLTZNanosAnsi(
s: UTF8String,
precision: Int,
timeZoneId: ZoneId,
context: QueryContext = null): TimestampNanosVal = {
stringToTimestampLTZNanos(s, precision, timeZoneId).getOrElse {
throw ExecutionErrors.invalidInputInCastToDatetimeError(
s,
TimestampLTZNanosType(precision),
context)
}
}

/**
* Trims and parses a given UTF8 string into a [[TimestampNanosVal]] (epoch microseconds plus a
* sub-microsecond remainder in [0, 999]) for `TIMESTAMP_NTZ(precision)` with `precision` in [7,
* 9]. Fractional digits beyond `precision` are truncated. The result is independent of time
* zones; a time zone component is discarded when `allowTimeZone` is `true` and rejected
* (returns `None`) otherwise. The return type is [[Option]] in order to distinguish between a
* valid zero value and null. Please refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampNTZNanos(
s: UTF8String,
precision: Int,
allowTimeZone: Boolean = true): Option[TimestampNanosVal] = {
if (precision < 7 || precision > 9) {
throw SparkException.internalError(
s"stringToTimestampNTZNanos: precision $precision is out of range [7, 9]")
}
try {
val (segments, zoneIdOpt, justTime) = parseTimestampString(s)
if (segments.isEmpty || justTime || !allowTimeZone && zoneIdOpt.isDefined) {
return None
}
val nanoseconds = MICROSECONDS.toNanos(segments(6))
val localTime = LocalTime.of(segments(3), segments(4), segments(5), nanoseconds.toInt)
val localDate = LocalDate.of(segments(0), segments(1), segments(2))
val localDateTime = LocalDateTime.of(localDate, localTime)
val epochMicros = localDateTimeToMicros(localDateTime)
Some(
TimestampNanosVal
.fromParts(epochMicros, truncateNanosWithinMicro(segments(9), precision)))
} catch {
case NonFatal(_) => None
}
}

/**
* ANSI variant of [[stringToTimestampNTZNanos]]. Throws
* [[org.apache.spark.SparkDateTimeException]] on invalid input. Uses `allowTimeZone = true`: a
* time zone component in the string is silently discarded rather than rejected. Callers that
* need strict NTZ rejection should call [[stringToTimestampNTZNanos]] directly with
* `allowTimeZone = false`.
*/
def stringToTimestampNTZNanosAnsi(
s: UTF8String,
precision: Int,
context: QueryContext = null): TimestampNanosVal = {
stringToTimestampNTZNanos(s, precision).getOrElse {
throw ExecutionErrors.invalidInputInCastToDatetimeError(
s,
TimestampNTZNanosType(precision),
context)
}
}

/**
* Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the
* number of microseconds since the midnight. The result will be independent of time zones.
Expand Down
Loading