From 2912125c5885e9cab31f64dc37ab55ff65d9db4d Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 29 May 2026 11:02:29 +0200 Subject: [PATCH 1/5] [SPARK-57100][SQL] Add columnar (ColumnVector) support for nanosecond timestamp types Implement read/write/append support for TimestampNTZNanosType and TimestampLTZNanosType in column vectors, following the CalendarInterval two-child-vector pattern (Long for epochMicros, Short for nanosWithinMicro). Co-authored-by: Max Gekk --- .../spark/sql/vectorized/ColumnVector.java | 20 ++++++- .../vectorized/ColumnVectorUtils.java | 13 ++++- .../vectorized/ConstantColumnVector.java | 14 +++++ .../vectorized/WritableColumnVector.java | 16 ++++++ .../apache/spark/sql/execution/Columnar.scala | 22 ++++++++ .../execution/RowToColumnConverterSuite.scala | 56 ++++++++++++++++++- 6 files changed, 137 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index 952d084ac1901..ec72d52dae91d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -328,12 +328,28 @@ public CalendarInterval getInterval(int rowId) { return new CalendarInterval(months, days, microseconds); } + /** + * Returns the nanosecond NTZ timestamp value for {@code rowId}, or null if the slot is null. + *

+ * To support this type, implementations must implement {@link #getChild(int)} and define 2 child + * vectors: child 0 is a long vector holding {@code epochMicros}; child 1 is an int vector + * holding {@code nanosWithinMicro} (values in [0, 999]). + */ public TimestampNanosVal getTimestampNTZNanos(int rowId) { - throw SparkUnsupportedOperationException.apply(); + if (isNullAt(rowId)) return null; + return TimestampNanosVal.fromTrustedRowBytes( + getChild(0).getLong(rowId), getChild(1).getShort(rowId)); } + /** + * Returns the nanosecond LTZ timestamp value for {@code rowId}, or null if the slot is null. + *

+ * Storage layout is identical to {@link #getTimestampNTZNanos(int)}. + */ public TimestampNanosVal getTimestampLTZNanos(int rowId) { - throw SparkUnsupportedOperationException.apply(); + if (isNullAt(rowId)) return null; + return TimestampNanosVal.fromTrustedRowBytes( + getChild(0).getLong(rowId), getChild(1).getShort(rowId)); } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 1ca9290e3b7c2..bc9b07713505b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -40,6 +40,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.TimestampNanosVal; import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.unsafe.types.VariantVal; @@ -106,6 +107,10 @@ public static void populate( } else if (pdt instanceof PhysicalCalendarIntervalType) { // The value of `numRows` is irrelevant. col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t)); + } else if (pdt instanceof PhysicalTimestampNTZNanosType) { + col.setTimestampNanosVal((TimestampNanosVal) row.get(fieldIdx, t)); + } else if (pdt instanceof PhysicalTimestampLTZNanosType) { + col.setTimestampNanosVal((TimestampNanosVal) row.get(fieldIdx, t)); } else if (pdt instanceof PhysicalVariantType) { col.setVariant((VariantVal)row.get(fieldIdx, t)); } else if (pdt instanceof PhysicalStructType) { @@ -171,7 +176,8 @@ public static Map toJavaIntMap(ColumnarMap map) { private static void appendValue(WritableColumnVector dst, DataType t, Object o) { if (o == null) { - if (t instanceof CalendarIntervalType || t instanceof VariantType) { + if (t instanceof CalendarIntervalType || t instanceof VariantType || + t instanceof TimestampNTZNanosType || t instanceof TimestampLTZNanosType) { dst.appendStruct(true); } else { dst.appendNull(); @@ -219,6 +225,11 @@ private static void appendValue(WritableColumnVector dst, DataType t, Object o) dst.appendStruct(false); dst.getChild(0).appendByteArray(v.getValue(), 0, v.getValue().length); dst.getChild(1).appendByteArray(v.getMetadata(), 0, v.getMetadata().length); + } else if (t instanceof TimestampNTZNanosType || t instanceof TimestampLTZNanosType) { + TimestampNanosVal v = (TimestampNanosVal) o; + dst.appendStruct(false); + dst.getChild(0).appendLong(v.epochMicros); + dst.getChild(1).appendShort(v.nanosWithinMicro); } else if (t instanceof DateType) { dst.appendInt(DateTimeUtils.fromJavaDate((Date) o)); } else if (t instanceof TimestampType) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java index 094d6edb6d259..fd99df0fb9f0f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java @@ -24,6 +24,7 @@ import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.TimestampNanosVal; import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.unsafe.types.VariantVal; @@ -73,6 +74,11 @@ public ConstantColumnVector(int numRows, DataType type) { this.childData[0] = new ConstantColumnVector(1, DataTypes.IntegerType); this.childData[1] = new ConstantColumnVector(1, DataTypes.IntegerType); this.childData[2] = new ConstantColumnVector(1, DataTypes.LongType); + } else if (type instanceof TimestampNTZNanosType || type instanceof TimestampLTZNanosType) { + // Two columns. EpochMicros as Long. NanosWithinMicro as Short. + this.childData = new ConstantColumnVector[2]; + this.childData[0] = new ConstantColumnVector(1, DataTypes.LongType); + this.childData[1] = new ConstantColumnVector(1, DataTypes.ShortType); } else if (type instanceof VariantType) { this.childData = new ConstantColumnVector[2]; this.childData[0] = new ConstantColumnVector(1, DataTypes.BinaryType); @@ -359,6 +365,14 @@ public void setCalendarInterval(CalendarInterval value) { this.childData[2].setLong(value.microseconds); } + /** + * Sets the nanosecond timestamp `value` for all rows + */ + public void setTimestampNanosVal(TimestampNanosVal value) { + this.childData[0].setLong(value.epochMicros); + this.childData[1].setShort(value.nanosWithinMicro); + } + /** * Sets the Variant `value` for all rows */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 0f5b23ad85390..0276514e56f08 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.TimestampNanosVal; import org.apache.spark.unsafe.types.UTF8String; /** @@ -493,6 +494,16 @@ public void putInterval(int rowId, CalendarInterval value) { getChild(2).putLong(rowId, value.microseconds); } + public void putTimestampNTZNanos(int rowId, TimestampNanosVal value) { + getChild(0).putLong(rowId, value.epochMicros); + getChild(1).putShort(rowId, value.nanosWithinMicro); + } + + public void putTimestampLTZNanos(int rowId, TimestampNanosVal value) { + getChild(0).putLong(rowId, value.epochMicros); + getChild(1).putShort(rowId, value.nanosWithinMicro); + } + @Override public UTF8String getUTF8String(int rowId) { if (isNullAt(rowId)) return null; @@ -1056,6 +1067,11 @@ protected WritableColumnVector(int capacity, DataType dataType) { this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType); this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType); this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType); + } else if (type instanceof TimestampNTZNanosType || type instanceof TimestampLTZNanosType) { + // Two columns. EpochMicros as Long. NanosWithinMicro as Short. + this.childColumns = new WritableColumnVector[2]; + this.childColumns[0] = reserveNewColumn(capacity, DataTypes.LongType); + this.childColumns[1] = reserveNewColumn(capacity, DataTypes.ShortType); } else if (type instanceof VariantType) { this.childColumns = new WritableColumnVector[2]; this.childColumns[0] = reserveNewColumn(capacity, DataTypes.BinaryType); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 00b1f0248cd3e..4cfb258704563 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -272,6 +272,8 @@ private object RowToColumnConverter { case _: GeometryType => GeometryConverter case CalendarIntervalType => CalendarConverter case VariantType => VariantConverter + case _: TimestampNTZNanosType => TimestampNTZNanosConverter + case _: TimestampLTZNanosType => TimestampLTZNanosConverter case at: ArrayType => ArrayConverter(getConverterForType(at.elementType, at.containsNull)) case st: StructType => new StructConverter(st.fields.map( (f) => getConverterForType(f.dataType, f.nullable))) @@ -284,6 +286,8 @@ private object RowToColumnConverter { if (nullable) { dataType match { case CalendarIntervalType | VariantType => new StructNullableTypeConverter(core) + case _: TimestampNTZNanosType | _: TimestampLTZNanosType => + new StructNullableTypeConverter(core) case st: StructType => new StructNullableTypeConverter(core) case _ => new BasicNullableTypeConverter(core) } @@ -374,6 +378,24 @@ private object RowToColumnConverter { } } + private object TimestampNTZNanosConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val v = row.getTimestampNTZNanos(column) + cv.appendStruct(false) + cv.getChild(0).appendLong(v.epochMicros) + cv.getChild(1).appendShort(v.nanosWithinMicro) + } + } + + private object TimestampLTZNanosConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val v = row.getTimestampLTZNanos(column) + cv.appendStruct(false) + cv.getChild(0).appendLong(v.epochMicros) + cv.getChild(1).appendShort(v.nanosWithinMicro) + } + } + private case class ArrayConverter(childConverter: TypeConverter) extends TypeConverter { override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { val values = row.getArray(column) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowToColumnConverterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowToColumnConverterSuite.scala index 621228fabf875..96d7fb6fbd094 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowToColumnConverterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowToColumnConverterSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData} import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String} import org.apache.spark.util.ArrayImplicits._ class RowToColumnConverterSuite extends SparkFunSuite { @@ -130,6 +130,60 @@ class RowToColumnConverterSuite extends SparkFunSuite { } } + test("TimestampNTZNanosType column roundtrip") { + val t = TimestampNTZNanosType(9) + val schema = StructType(Seq(StructField("ts", t))) + val values = Seq( + TimestampNanosVal.fromParts(0L, 0.toShort), + TimestampNanosVal.fromParts(1_000_000L, 999.toShort), + TimestampNanosVal.fromParts(-1L, 123.toShort)) + val rows = values.map(v => InternalRow(v)) + val vectors = convertRows(rows, schema) + values.zipWithIndex.foreach { case (v, i) => + assert(vectors.head.getTimestampNTZNanos(i) === v) + } + } + + test("TimestampNTZNanosType column with nulls") { + val t = TimestampNTZNanosType(9) + val schema = StructType(Seq(StructField("ts", t, nullable = true))) + val rows = Seq( + InternalRow(TimestampNanosVal.fromParts(100L, 42.toShort)), + InternalRow(null), + InternalRow(TimestampNanosVal.fromParts(200L, 1.toShort))) + val vectors = convertRows(rows, schema) + assert(vectors.head.getTimestampNTZNanos(0) === TimestampNanosVal.fromParts(100L, 42.toShort)) + assert(vectors.head.isNullAt(1)) + assert(vectors.head.getTimestampNTZNanos(2) === TimestampNanosVal.fromParts(200L, 1.toShort)) + } + + test("TimestampLTZNanosType column roundtrip") { + val t = TimestampLTZNanosType(9) + val schema = StructType(Seq(StructField("ts", t))) + val values = Seq( + TimestampNanosVal.fromParts(0L, 0.toShort), + TimestampNanosVal.fromParts(1_000_000L, 999.toShort), + TimestampNanosVal.fromParts(-1L, 123.toShort)) + val rows = values.map(v => InternalRow(v)) + val vectors = convertRows(rows, schema) + values.zipWithIndex.foreach { case (v, i) => + assert(vectors.head.getTimestampLTZNanos(i) === v) + } + } + + test("TimestampLTZNanosType column with nulls") { + val t = TimestampLTZNanosType(9) + val schema = StructType(Seq(StructField("ts", t, nullable = true))) + val rows = Seq( + InternalRow(TimestampNanosVal.fromParts(100L, 42.toShort)), + InternalRow(null), + InternalRow(TimestampNanosVal.fromParts(200L, 1.toShort))) + val vectors = convertRows(rows, schema) + assert(vectors.head.getTimestampLTZNanos(0) === TimestampNanosVal.fromParts(100L, 42.toShort)) + assert(vectors.head.isNullAt(1)) + assert(vectors.head.getTimestampLTZNanos(2) === TimestampNanosVal.fromParts(200L, 1.toShort)) + } + test("multiple columns") { val schema = StructType( Seq(StructField("s", ShortType), StructField("i", IntegerType), StructField("l", LongType))) From 68fa77477ee412ccb4011a8fb333da3dd58f7574 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 29 May 2026 11:27:44 +0200 Subject: [PATCH 2/5] [SPARK-57100][SQL] Fix review findings in nanosecond timestamp column-vector support Four issues found in code review: 1. appendStruct(true) null-propagation: extend the StructType|VariantType guard in WritableColumnVector to also recurse for CalendarIntervalType, TimestampNTZNanosType, and TimestampLTZNanosType children, so that a nullable struct field of these types correctly propagates nulls into their own child sub-columns, preventing index divergence. 2. MutableColumnarRow: add copy(), get(), and update() branches for TimestampNTZNanosType and TimestampLTZNanosType, plus setTimestampNTZNanos and setTimestampLTZNanos setters. 3. ColumnVector Javadoc: fix "int vector" -> "short vector" for child 1 of the nanosecond timestamp layout. 4. Test coverage: add testVectors (OnHeap + OffHeap) for both nanos types to ColumnVectorSuite; add populate tests to ColumnVectorUtilsSuite; add nanos columns to the ColumnarBatchSuite RowToColumnConverter end-to-end test. Co-authored-by: Max Gekk --- .../spark/sql/vectorized/ColumnVector.java | 2 +- .../vectorized/MutableColumnarRow.java | 22 +++++++++ .../vectorized/WritableColumnVector.java | 5 +- .../vectorized/ColumnVectorSuite.scala | 46 ++++++++++++++++++- .../vectorized/ColumnVectorUtilsSuite.scala | 20 +++++++- .../vectorized/ColumnarBatchSuite.scala | 31 ++++++++++++- 6 files changed, 120 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index ec72d52dae91d..483703d50169f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -332,7 +332,7 @@ public CalendarInterval getInterval(int rowId) { * Returns the nanosecond NTZ timestamp value for {@code rowId}, or null if the slot is null. *

* To support this type, implementations must implement {@link #getChild(int)} and define 2 child - * vectors: child 0 is a long vector holding {@code epochMicros}; child 1 is an int vector + * vectors: child 0 is a long vector holding {@code epochMicros}; child 1 is a short vector * holding {@code nanosWithinMicro} (values in [0, 999]). */ public TimestampNanosVal getTimestampNTZNanos(int rowId) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java index b06dc50d9018b..3495d660bb7d5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java @@ -99,6 +99,10 @@ public InternalRow copy() { row.update(i, getMap(i).copy()); } else if (dt instanceof VariantType) { row.update(i, getVariant(i)); + } else if (dt instanceof TimestampNTZNanosType) { + row.update(i, getTimestampNTZNanos(i)); + } else if (dt instanceof TimestampLTZNanosType) { + row.update(i, getTimestampLTZNanos(i)); } else { throw new RuntimeException("Not implemented. " + dt); } @@ -232,6 +236,10 @@ public Object get(int ordinal, DataType dataType) { return getMap(ordinal); } else if (dataType instanceof VariantType) { return getVariant(ordinal); + } else if (dataType instanceof TimestampNTZNanosType) { + return getTimestampNTZNanos(ordinal); + } else if (dataType instanceof TimestampLTZNanosType) { + return getTimestampLTZNanos(ordinal); } else { throw new SparkUnsupportedOperationException( "_LEGACY_ERROR_TEMP_3192", Map.of("dt", dataType.toString())); @@ -261,6 +269,10 @@ public void update(int ordinal, Object value) { setDecimal(ordinal, d, t.precision()); } else if (dt instanceof CalendarIntervalType) { setInterval(ordinal, (CalendarInterval) value); + } else if (dt instanceof TimestampNTZNanosType) { + setTimestampNTZNanos(ordinal, (TimestampNanosVal) value); + } else if (dt instanceof TimestampLTZNanosType) { + setTimestampLTZNanos(ordinal, (TimestampNanosVal) value); } else { throw new SparkUnsupportedOperationException( "_LEGACY_ERROR_TEMP_3192", Map.of("dt", dt.toString())); @@ -326,4 +338,14 @@ public void setInterval(int ordinal, CalendarInterval value) { columns[ordinal].putNotNull(rowId); columns[ordinal].putInterval(rowId, value); } + + public void setTimestampNTZNanos(int ordinal, TimestampNanosVal value) { + columns[ordinal].putNotNull(rowId); + columns[ordinal].putTimestampNTZNanos(rowId, value); + } + + public void setTimestampLTZNanos(int ordinal, TimestampNanosVal value) { + columns[ordinal].putNotNull(rowId); + columns[ordinal].putTimestampLTZNanos(rowId, value); + } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 0276514e56f08..708eb8b4146f2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -762,7 +762,10 @@ public final int appendStruct(boolean isNull) { putNull(elementsAppended); elementsAppended++; for (WritableColumnVector c: childColumns) { - if (c.type instanceof StructType || c.type instanceof VariantType) { + if (c.type instanceof StructType || c.type instanceof VariantType + || c.type instanceof CalendarIntervalType + || c.type instanceof TimestampNTZNanosType + || c.type instanceof TimestampLTZNanosType) { c.appendStruct(true); } else { c.appendNull(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 8c84742c16efb..314eb07250b58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarArray -import org.apache.spark.unsafe.types.{UTF8String, VariantVal} +import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String, VariantVal} import org.apache.spark.util.ArrayImplicits._ class ColumnVectorSuite extends SparkFunSuite with SQLHelper { @@ -379,6 +379,50 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper { } } + testVectors("timestamp_ntz_nanos", 10, TimestampNTZNanosType(9)) { testVector => + val values = (0 until 10).map(i => TimestampNanosVal.fromParts(i * 1000L, i.toShort)) + values.foreach { v => + testVector.putNotNull(testVector.elementsAppended) + testVector.putTimestampNTZNanos(testVector.elementsAppended, v) + testVector.elementsAppended += 1 + } + values.zipWithIndex.foreach { case (v, i) => + assert(testVector.getTimestampNTZNanos(i) === v) + } + testVector.putNull(0) + assert(testVector.isNullAt(0)) + } + + testVectors("timestamp_ltz_nanos", 10, TimestampLTZNanosType(9)) { testVector => + val values = (0 until 10).map(i => TimestampNanosVal.fromParts(i * 1000L, i.toShort)) + values.foreach { v => + testVector.putNotNull(testVector.elementsAppended) + testVector.putTimestampLTZNanos(testVector.elementsAppended, v) + testVector.elementsAppended += 1 + } + values.zipWithIndex.foreach { case (v, i) => + assert(testVector.getTimestampLTZNanos(i) === v) + } + testVector.putNull(0) + assert(testVector.isNullAt(0)) + } + + testVectors("mutable ColumnarRow with TimestampNTZNanosType", 5, + TimestampNTZNanosType(9)) { testVector => + val mutableRow = new MutableColumnarRow(Array(testVector)) + val values = (0 until 5).map(i => TimestampNanosVal.fromParts(i * 100L, i.toShort)) + values.zipWithIndex.foreach { case (v, i) => + mutableRow.rowId = i + mutableRow.setTimestampNTZNanos(0, v) + } + values.zipWithIndex.foreach { case (v, i) => + mutableRow.rowId = i + assert(mutableRow.getTimestampNTZNanos(0) === v) + assert(mutableRow.get(0, TimestampNTZNanosType(9)) === v) + assert(mutableRow.copy().get(0, TimestampNTZNanosType(9)) === v) + } + } + testVectors("mutable ColumnarRow with TimestampNTZType", 10, TimestampNTZType) { testVector => val mutableRow = new MutableColumnarRow(Array(testVector)) (0 until 10).foreach { i => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala index b1c0d6c1d7d51..314eac92c195e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.CalendarInterval +import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal} import org.apache.spark.unsafe.types.UTF8String class ColumnVectorUtilsSuite extends SparkFunSuite { @@ -135,6 +135,24 @@ class ColumnVectorUtilsSuite extends SparkFunSuite { } } + testConstantColumnVector("fill TimestampNTZNanosType", 10, + TimestampNTZNanosType(9)) { vector => + val v = TimestampNanosVal.fromParts(1_234_567L, 999.toShort) + ColumnVectorUtils.populate(vector, InternalRow(v), 0) + (0 until 10).foreach { i => + assert(vector.getTimestampNTZNanos(i) === v) + } + } + + testConstantColumnVector("fill TimestampLTZNanosType", 10, + TimestampLTZNanosType(9)) { vector => + val v = TimestampNanosVal.fromParts(-999L, 42.toShort) + ColumnVectorUtils.populate(vector, InternalRow(v), 0) + (0 until 10).foreach { i => + assert(vector.getTimestampLTZNanos(i) === v) + } + } + testConstantColumnVector("fill array of ints", 10, ArrayType(IntegerType)) { vector => val arr = new GenericArrayData(Array[Any](1, 2, 3, 4, 5)) ColumnVectorUtils.populate(vector, InternalRow(arr), 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 40f73450eb21d..cd643aae9f148 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnarBatchRow, ColumnVector} import org.apache.spark.tags.ExtendedSQLTest import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.types.TimestampNanosVal import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal} import org.apache.spark.util.ArrayImplicits._ @@ -1694,6 +1695,8 @@ class ColumnarBatchSuite extends SparkFunSuite { StructField("binary", BinaryType) :: StructField("ts_ntz", TimestampNTZType) :: StructField("variant", VariantType) :: + StructField("ts_ntz_nanos", TimestampNTZNanosType(9)) :: + StructField("ts_ltz_nanos", TimestampLTZNanosType(9)) :: Nil) var mapBuilder = new ArrayBasedMapBuilder(IntegerType, IntegerType) mapBuilder.put(1, 10) @@ -1710,6 +1713,10 @@ class ColumnarBatchSuite extends SparkFunSuite { val variantVal1 = new VariantVal(Array[Byte](1, 2, 3), Array[Byte](4, 5)) val variantVal2 = new VariantVal(Array[Byte](6), Array[Byte](7, 8)) + val tsNTZNanos1 = TimestampNanosVal.fromParts(1_000_000L, 123.toShort) + val tsNTZNanos2 = TimestampNanosVal.fromParts(-500L, 999.toShort) + val tsLTZNanos1 = TimestampNanosVal.fromParts(2_000_000L, 42.toShort) + val tsLTZNanos2 = TimestampNanosVal.fromParts(0L, 0.toShort) val row1 = new GenericInternalRow(Array[Any]( UTF8String.fromString("a string"), @@ -1729,7 +1736,9 @@ class ColumnarBatchSuite extends SparkFunSuite { mapBuilder.build(), "Spark SQL".getBytes(), tsNTZ1, - variantVal1 + variantVal1, + tsNTZNanos1, + tsLTZNanos1 )) mapBuilder = new ArrayBasedMapBuilder(IntegerType, IntegerType) @@ -1753,7 +1762,9 @@ class ColumnarBatchSuite extends SparkFunSuite { mapBuilder.build(), "Parquet".getBytes(), tsNTZ2, - variantVal2 + variantVal2, + tsNTZNanos2, + tsLTZNanos2 )) val row3 = new GenericInternalRow(Array[Any]( @@ -1774,6 +1785,8 @@ class ColumnarBatchSuite extends SparkFunSuite { null, null, null, + null, + null, null )) @@ -1909,6 +1922,20 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(columns(17).isNullAt(2)) assert(columns(17).getChild(0).isNullAt(2)) assert(columns(17).getChild(1).isNullAt(2)) + + assert(columns(18).dataType() == TimestampNTZNanosType(9)) + assert(columns(18).getTimestampNTZNanos(0) == tsNTZNanos1) + assert(columns(18).getTimestampNTZNanos(1) == tsNTZNanos2) + assert(columns(18).isNullAt(2)) + assert(columns(18).getChild(0).isNullAt(2)) + assert(columns(18).getChild(1).isNullAt(2)) + + assert(columns(19).dataType() == TimestampLTZNanosType(9)) + assert(columns(19).getTimestampLTZNanos(0) == tsLTZNanos1) + assert(columns(19).getTimestampLTZNanos(1) == tsLTZNanos2) + assert(columns(19).isNullAt(2)) + assert(columns(19).getChild(0).isNullAt(2)) + assert(columns(19).getChild(1).isNullAt(2)) } finally { batch.close() } From e46c1f25a81f99a6e5801f47f29646ec9f079b9b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 29 May 2026 13:58:25 +0200 Subject: [PATCH 3/5] [SPARK-57100][SQL] Fix Scalastyle import order in ColumnarBatchSuite Co-authored-by: Max Gekk --- .../spark/sql/execution/vectorized/ColumnarBatchSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index cd643aae9f148..3b5f05210dee3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -45,8 +45,7 @@ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnarBatchRow, ColumnVector} import org.apache.spark.tags.ExtendedSQLTest import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.types.TimestampNanosVal -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal} +import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal, UTF8String, VariantVal} import org.apache.spark.util.ArrayImplicits._ /** From 7a10e70edd90350520d20daa71cea82fda45b501 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 29 May 2026 19:59:19 +0200 Subject: [PATCH 4/5] Remove unused import --- .../main/java/org/apache/spark/sql/vectorized/ColumnVector.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index 483703d50169f..014216777df22 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -18,7 +18,6 @@ import scala.PartialFunction; -import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; From 51e74ca0c7998341d73de9fa059911fbc874ae36 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 31 May 2026 19:42:38 +0200 Subject: [PATCH 5/5] [SPARK-57100][SQL] Address review comments for nanosecond timestamp column-vector support Apply review feedback from the PR: 1. Extract the CalendarIntervalType fix out of appendStruct: WritableColumnVector no longer recurses for CalendarIntervalType children, keeping only the nanosecond timestamp types in this PR. The interval fix will be handled separately. 2. Collapse the two identical nanos branches in ColumnVectorUtils.populate into a single condition with ||, matching appendValue. 3. Add a MutableColumnarRow test for TimestampLTZNanosType, mirroring the existing TimestampNTZNanosType test. Co-authored-by: Max Gekk --- .../execution/vectorized/ColumnVectorUtils.java | 5 ++--- .../vectorized/WritableColumnVector.java | 1 - .../execution/vectorized/ColumnVectorSuite.scala | 16 ++++++++++++++++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index bc9b07713505b..d07a345f7dfbd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -107,9 +107,8 @@ public static void populate( } else if (pdt instanceof PhysicalCalendarIntervalType) { // The value of `numRows` is irrelevant. col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t)); - } else if (pdt instanceof PhysicalTimestampNTZNanosType) { - col.setTimestampNanosVal((TimestampNanosVal) row.get(fieldIdx, t)); - } else if (pdt instanceof PhysicalTimestampLTZNanosType) { + } else if (pdt instanceof PhysicalTimestampNTZNanosType || + pdt instanceof PhysicalTimestampLTZNanosType) { col.setTimestampNanosVal((TimestampNanosVal) row.get(fieldIdx, t)); } else if (pdt instanceof PhysicalVariantType) { col.setVariant((VariantVal)row.get(fieldIdx, t)); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 708eb8b4146f2..302840dbe2f3b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -763,7 +763,6 @@ public final int appendStruct(boolean isNull) { elementsAppended++; for (WritableColumnVector c: childColumns) { if (c.type instanceof StructType || c.type instanceof VariantType - || c.type instanceof CalendarIntervalType || c.type instanceof TimestampNTZNanosType || c.type instanceof TimestampLTZNanosType) { c.appendStruct(true); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 314eb07250b58..e55a793dee767 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -423,6 +423,22 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper { } } + testVectors("mutable ColumnarRow with TimestampLTZNanosType", 5, + TimestampLTZNanosType(9)) { testVector => + val mutableRow = new MutableColumnarRow(Array(testVector)) + val values = (0 until 5).map(i => TimestampNanosVal.fromParts(i * 100L, i.toShort)) + values.zipWithIndex.foreach { case (v, i) => + mutableRow.rowId = i + mutableRow.setTimestampLTZNanos(0, v) + } + values.zipWithIndex.foreach { case (v, i) => + mutableRow.rowId = i + assert(mutableRow.getTimestampLTZNanos(0) === v) + assert(mutableRow.get(0, TimestampLTZNanosType(9)) === v) + assert(mutableRow.copy().get(0, TimestampLTZNanosType(9)) === v) + } + } + testVectors("mutable ColumnarRow with TimestampNTZType", 10, TimestampNTZType) { testVector => val mutableRow = new MutableColumnarRow(Array(testVector)) (0 until 10).foreach { i =>