diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 330116e592482..75c080a85077c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -667,6 +667,9 @@ class CodegenContext extends Logging { case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)" case BinaryType => s"org.apache.spark.unsafe.types.ByteArray.compareBinary($c1, $c2)" case CalendarIntervalType => s"$c1.compareTo($c2)" + // TimestampNanosVal exposes only `compareTo`; the AtomicType fallback below emits + // `$c1.compare($c2)`, which would not resolve as a Java method call. + case _: TimestampNTZNanosType | _: TimestampLTZNanosType => s"$c1.compareTo($c2)" case NullType => "0" case array: ArrayType => val elementType = array.elementType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala index d89feb7e0dfde..806ec1342c492 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala @@ -177,13 +177,11 @@ case object PhysicalCalendarIntervalType extends PhysicalCalendarIntervalType * Storage layout is identical to [[PhysicalTimestampLTZNanosType]]; both types exist so the * NTZ/LTZ distinction propagates through the physical-type system to consumers that need it. * - * Ordering, compare, and hash are not implemented yet and will be added in a follow-up issue. + * Hash is not implemented yet and will be added in a follow-up issue. */ class PhysicalTimestampNTZNanosType() extends PhysicalDataType { - override private[sql] def ordering = - throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError( - "PhysicalTimestampNTZNanosType") override private[sql] type InternalType = TimestampNanosVal + override private[sql] val ordering = implicitly[Ordering[InternalType]] @transient private[sql] lazy val tag = typeTag[InternalType] } case object PhysicalTimestampNTZNanosType extends PhysicalTimestampNTZNanosType @@ -197,13 +195,11 @@ case object PhysicalTimestampNTZNanosType extends PhysicalTimestampNTZNanosType * Storage layout is identical to [[PhysicalTimestampNTZNanosType]]; both types exist so the * NTZ/LTZ distinction propagates through the physical-type system to consumers that need it. * - * Ordering, compare, and hash are not implemented yet and will be added in a follow-up issue. + * Hash is not implemented yet and will be added in a follow-up issue. */ class PhysicalTimestampLTZNanosType() extends PhysicalDataType { - override private[sql] def ordering = - throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError( - "PhysicalTimestampLTZNanosType") override private[sql] type InternalType = TimestampNanosVal + override private[sql] val ordering = implicitly[Ordering[InternalType]] @transient private[sql] lazy val tag = typeTag[InternalType] } case object PhysicalTimestampLTZNanosType extends PhysicalTimestampLTZNanosType diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala index 06c8b5ccef652..8505a1c8a2c18 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, GenerateOrdering, LazilyGeneratedOrdering} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.TimestampNanosVal import org.apache.spark.util.ArrayImplicits._ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -141,6 +142,68 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { GenerateOrdering.generate(Array.fill(5000)(sortOrder).toImmutableArraySeq) } + // SPARK-57103: ordering for nanosecond timestamp types. Not driven by the generic + // `atomicTypes` loop above because `RandomDataGenerator` does not yet support the new + // types (tracked separately in SPARK-57034); we hand-roll edge cases here instead. + private def compareNanos( + dataType: AtomicType, + a: TimestampNanosVal, + b: TimestampNanosVal, + expected: Int): Unit = { + test(s"compare two $dataType values: a = $a, b = $b") { + val rowA = InternalRow(a) + val rowB = InternalRow(b) + Seq(Ascending, Descending).foreach { direction => + val sortOrder = direction match { + case Ascending => BoundReference(0, dataType, nullable = true).asc + case Descending => BoundReference(0, dataType, nullable = true).desc + } + val expectedCompareResult = direction match { + case Ascending => signum(expected) + case Descending => -1 * signum(expected) + } + val intOrdering = new InterpretedOrdering(sortOrder :: Nil) + val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil) + Seq(intOrdering, genOrdering).foreach { ordering => + assert(ordering.compare(rowA, rowA) === 0) + assert(ordering.compare(rowB, rowB) === 0) + assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult) + assert(signum(ordering.compare(rowB, rowA)) === -1 * expectedCompareResult) + } + } + } + } + + Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9)).foreach { dt => + // equal values + compareNanos(dt, + TimestampNanosVal.fromParts(1000L, 100.toShort), + TimestampNanosVal.fromParts(1000L, 100.toShort), 0) + // primary key (epochMicros) decides + compareNanos(dt, + TimestampNanosVal.fromParts(1000L, 999.toShort), + TimestampNanosVal.fromParts(1001L, 0.toShort), -1) + // tie-breaker (nanosWithinMicro) within the same micro + compareNanos(dt, + TimestampNanosVal.fromParts(1000L, 100.toShort), + TimestampNanosVal.fromParts(1000L, 101.toShort), -1) + // Long boundary: plain subtraction would overflow; Ordering must use Long.compare. + compareNanos(dt, + TimestampNanosVal.fromParts(Long.MinValue, 0.toShort), + TimestampNanosVal.fromParts(Long.MaxValue, 0.toShort), -1) + // pre-epoch sorts before epoch regardless of nanos + compareNanos(dt, + TimestampNanosVal.fromParts(-1L, 999.toShort), + TimestampNanosVal.fromParts(0L, 0.toShort), -1) + // null sorts before any value under default NullsFirst semantics + compareNanos(dt, null, TimestampNanosVal.fromParts(0L, 0.toShort), -1) + } + + // Ordering is precision-independent. One case at p = 7 documents that intent. + compareNanos(TimestampNTZNanosType(7), + TimestampNanosVal.fromParts(0L, 0.toShort), + TimestampNanosVal.fromParts(0L, 1.toShort), -1) + test("SPARK-21344: BinaryType comparison does signed byte array comparison") { val data = Seq( (Array[Byte](1), Array[Byte](-1)),