From 4c0eb3cb0174a7f03d7431b488376fb5720d303e Mon Sep 17 00:00:00 2001 From: Stevo Mitric Date: Fri, 29 May 2026 13:23:01 +0000 Subject: [PATCH 1/3] [WIP][SPARK-57103][SQL] Wire ordering for nanosecond timestamp types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Implement `Ordering` for `TimestampNTZNanosType(p)` and `TimestampLTZNanosType(p)`, both in the interpreted path (`PhysicalDataType.ordering`) and the codegen path (`CodeGenerator.genComp`). This is the second of three PRs for SPARK-57103. The first (`[SPARK-57103][SQL] Add Comparable to TimestampNanosVal`, commit 0b0ffb711b2) made the value class `Comparable`. The remaining PR will extend `hash`, `xxhash64`, and `murmur3` for the two nanos types. Changes: - `PhysicalDataType.scala`: replace the two `orderedOperationUnsupportedByDataTypeError` throws with `implicitly[Ordering[InternalType]]`, following the `PhysicalGeographyType` / `PhysicalGeometryType` precedent. Resolves via `scala.math.Ordering.ordered[T <: Comparable[T]]` now that `TimestampNanosVal` is `Comparable`. - `CodeGenerator.scala`: add an explicit `genComp` arm that calls `compareTo`. Required because the existing AtomicType fallback would emit `c1.compare(c2)`, which fails to compile on `TimestampNanosVal` (it has `compareTo`, not `compare`). - Updated the scaladoc on both physical types to note that only hash remains as future work. ### Why are the changes needed? Without ordering, SQL operators that need a total order on the type (`ORDER BY`, sort-merge join, sort-based `GROUP BY`, `DISTINCT`) cannot execute against nanos-precision columns. The two physical types previously threw at runtime. ### Does this PR introduce _any_ user-facing change? No. The nanos types remain gated behind `spark.sql.timestampNanosTypes.enabled`; this PR only fills in the ordering hole their `PhysicalDataType` had. ### How was this patch tested? Added 10 unit tests in `OrderingSuite` (5 cases × 2 types): equal values, `epochMicros` primary key, `nanosWithinMicro` tie-breaker, `Long.MinValue` / `Long.MaxValue` boundary, and pre-epoch (negative `epochMicros`). Each case verifies both `InterpretedOrdering` and `LazilyGeneratedOrdering` agree on ASC and DESC, and that `compare(a, a) == 0`. ``` build/mvn test -pl sql/catalyst \ -DwildcardSuites=org.apache.spark.sql.catalyst.expressions.OrderingSuite ``` Tests: 66/66 passing (10 new). Also ran `DataTypeSuite` and the catalyst-side `TimestampNanos*Suite` set: 344/344 passing. Not adding nanos types to `DataTypeTestUtils.atomicTypes` yet because the generic `GenerateOrdering with $dataType` test there uses `RandomDataGenerator`, which does not yet support nanos types (tracked in SPARK-57034). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.7) --- .../expressions/codegen/CodeGenerator.scala | 4 ++ .../sql/catalyst/types/PhysicalDataType.scala | 12 ++-- .../catalyst/expressions/OrderingSuite.scala | 56 +++++++++++++++++++ 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 330116e592482..5cb339f7eb4f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -667,6 +667,10 @@ class CodegenContext extends Logging { case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)" case BinaryType => s"org.apache.spark.unsafe.types.ByteArray.compareBinary($c1, $c2)" case CalendarIntervalType => s"$c1.compareTo($c2)" + // Nanos timestamps are AtomicTypes but their internal value is TimestampNanosVal, which + // exposes compareTo (not compare); call it explicitly instead of falling through to the + // AtomicType arm below. + case _: TimestampNTZNanosType | _: TimestampLTZNanosType => s"$c1.compareTo($c2)" case NullType => "0" case array: ArrayType => val elementType = array.elementType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala index d89feb7e0dfde..806ec1342c492 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala @@ -177,13 +177,11 @@ case object PhysicalCalendarIntervalType extends PhysicalCalendarIntervalType * Storage layout is identical to [[PhysicalTimestampLTZNanosType]]; both types exist so the * NTZ/LTZ distinction propagates through the physical-type system to consumers that need it. * - * Ordering, compare, and hash are not implemented yet and will be added in a follow-up issue. + * Hash is not implemented yet and will be added in a follow-up issue. */ class PhysicalTimestampNTZNanosType() extends PhysicalDataType { - override private[sql] def ordering = - throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError( - "PhysicalTimestampNTZNanosType") override private[sql] type InternalType = TimestampNanosVal + override private[sql] val ordering = implicitly[Ordering[InternalType]] @transient private[sql] lazy val tag = typeTag[InternalType] } case object PhysicalTimestampNTZNanosType extends PhysicalTimestampNTZNanosType @@ -197,13 +195,11 @@ case object PhysicalTimestampNTZNanosType extends PhysicalTimestampNTZNanosType * Storage layout is identical to [[PhysicalTimestampNTZNanosType]]; both types exist so the * NTZ/LTZ distinction propagates through the physical-type system to consumers that need it. * - * Ordering, compare, and hash are not implemented yet and will be added in a follow-up issue. + * Hash is not implemented yet and will be added in a follow-up issue. */ class PhysicalTimestampLTZNanosType() extends PhysicalDataType { - override private[sql] def ordering = - throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError( - "PhysicalTimestampLTZNanosType") override private[sql] type InternalType = TimestampNanosVal + override private[sql] val ordering = implicitly[Ordering[InternalType]] @transient private[sql] lazy val tag = typeTag[InternalType] } case object PhysicalTimestampLTZNanosType extends PhysicalTimestampLTZNanosType diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala index 06c8b5ccef652..5f24e5065f1c8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, GenerateOrdering, LazilyGeneratedOrdering} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.TimestampNanosVal import org.apache.spark.util.ArrayImplicits._ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -141,6 +142,61 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { GenerateOrdering.generate(Array.fill(5000)(sortOrder).toImmutableArraySeq) } + // SPARK-57103: ordering for nanosecond timestamp types. Not driven by the generic + // `atomicTypes` loop above because `RandomDataGenerator` does not yet support the new + // types (tracked separately in SPARK-57034); we hand-roll edge cases here instead. + private def compareNanos( + dataType: AtomicType, + a: TimestampNanosVal, + b: TimestampNanosVal, + expected: Int): Unit = { + test(s"compare two $dataType values: a = $a, b = $b") { + val rowA = InternalRow(a) + val rowB = InternalRow(b) + Seq(Ascending, Descending).foreach { direction => + val sortOrder = direction match { + case Ascending => BoundReference(0, dataType, nullable = true).asc + case Descending => BoundReference(0, dataType, nullable = true).desc + } + val expectedCompareResult = direction match { + case Ascending => signum(expected) + case Descending => -1 * signum(expected) + } + val intOrdering = new InterpretedOrdering(sortOrder :: Nil) + val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil) + Seq(intOrdering, genOrdering).foreach { ordering => + assert(ordering.compare(rowA, rowA) === 0) + assert(ordering.compare(rowB, rowB) === 0) + assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult) + assert(signum(ordering.compare(rowB, rowA)) === -1 * expectedCompareResult) + } + } + } + } + + Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9)).foreach { dt => + // equal values + compareNanos(dt, + TimestampNanosVal.fromParts(1000L, 100.toShort), + TimestampNanosVal.fromParts(1000L, 100.toShort), 0) + // primary key (epochMicros) decides + compareNanos(dt, + TimestampNanosVal.fromParts(1000L, 999.toShort), + TimestampNanosVal.fromParts(1001L, 0.toShort), -1) + // tie-breaker (nanosWithinMicro) within the same micro + compareNanos(dt, + TimestampNanosVal.fromParts(1000L, 100.toShort), + TimestampNanosVal.fromParts(1000L, 101.toShort), -1) + // Long boundary: plain subtraction would overflow; Ordering must use Long.compare. + compareNanos(dt, + TimestampNanosVal.fromParts(Long.MinValue, 0.toShort), + TimestampNanosVal.fromParts(Long.MaxValue, 0.toShort), -1) + // pre-epoch sorts before epoch regardless of nanos + compareNanos(dt, + TimestampNanosVal.fromParts(-1L, 999.toShort), + TimestampNanosVal.fromParts(0L, 0.toShort), -1) + } + test("SPARK-21344: BinaryType comparison does signed byte array comparison") { val data = Seq( (Array[Byte](1), Array[Byte](-1)), From 66cba813ac1e7549825699d7903c59fd7f421c95 Mon Sep 17 00:00:00 2001 From: Stevo Mitric Date: Fri, 29 May 2026 16:27:36 +0000 Subject: [PATCH 2/3] [SPARK-57103][SQL] Drop comment on genComp nanos arm --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 5cb339f7eb4f7..8cf38951dc9ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -667,9 +667,6 @@ class CodegenContext extends Logging { case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)" case BinaryType => s"org.apache.spark.unsafe.types.ByteArray.compareBinary($c1, $c2)" case CalendarIntervalType => s"$c1.compareTo($c2)" - // Nanos timestamps are AtomicTypes but their internal value is TimestampNanosVal, which - // exposes compareTo (not compare); call it explicitly instead of falling through to the - // AtomicType arm below. case _: TimestampNTZNanosType | _: TimestampLTZNanosType => s"$c1.compareTo($c2)" case NullType => "0" case array: ArrayType => From fcb00def137bfe7b28a3a104445482bedc854e59 Mon Sep 17 00:00:00 2001 From: Stevo Mitric Date: Sun, 31 May 2026 21:38:10 +0000 Subject: [PATCH 3/3] resolve comments --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 ++ .../spark/sql/catalyst/expressions/OrderingSuite.scala | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 8cf38951dc9ae..75c080a85077c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -667,6 +667,8 @@ class CodegenContext extends Logging { case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)" case BinaryType => s"org.apache.spark.unsafe.types.ByteArray.compareBinary($c1, $c2)" case CalendarIntervalType => s"$c1.compareTo($c2)" + // TimestampNanosVal exposes only `compareTo`; the AtomicType fallback below emits + // `$c1.compare($c2)`, which would not resolve as a Java method call. case _: TimestampNTZNanosType | _: TimestampLTZNanosType => s"$c1.compareTo($c2)" case NullType => "0" case array: ArrayType => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala index 5f24e5065f1c8..8505a1c8a2c18 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala @@ -195,8 +195,15 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { compareNanos(dt, TimestampNanosVal.fromParts(-1L, 999.toShort), TimestampNanosVal.fromParts(0L, 0.toShort), -1) + // null sorts before any value under default NullsFirst semantics + compareNanos(dt, null, TimestampNanosVal.fromParts(0L, 0.toShort), -1) } + // Ordering is precision-independent. One case at p = 7 documents that intent. + compareNanos(TimestampNTZNanosType(7), + TimestampNanosVal.fromParts(0L, 0.toShort), + TimestampNanosVal.fromParts(0L, 1.toShort), -1) + test("SPARK-21344: BinaryType comparison does signed byte array comparison") { val data = Seq( (Array[Byte](1), Array[Byte](-1)),