diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 952d084ac1901..014216777df22 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -18,7 +18,6 @@
import scala.PartialFunction;
-import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
@@ -328,12 +327,28 @@ public CalendarInterval getInterval(int rowId) {
return new CalendarInterval(months, days, microseconds);
}
+ /**
+ * Returns the nanosecond NTZ timestamp value for {@code rowId}, or null if the slot is null.
+ *
+ * To support this type, implementations must implement {@link #getChild(int)} and define 2 child
+ * vectors: child 0 is a long vector holding {@code epochMicros}; child 1 is a short vector
+ * holding {@code nanosWithinMicro} (values in [0, 999]).
+ */
public TimestampNanosVal getTimestampNTZNanos(int rowId) {
- throw SparkUnsupportedOperationException.apply();
+ if (isNullAt(rowId)) return null;
+ return TimestampNanosVal.fromTrustedRowBytes(
+ getChild(0).getLong(rowId), getChild(1).getShort(rowId));
}
+ /**
+ * Returns the nanosecond LTZ timestamp value for {@code rowId}, or null if the slot is null.
+ *
+ * Storage layout is identical to {@link #getTimestampNTZNanos(int)}.
+ */
public TimestampNanosVal getTimestampLTZNanos(int rowId) {
- throw SparkUnsupportedOperationException.apply();
+ if (isNullAt(rowId)) return null;
+ return TimestampNanosVal.fromTrustedRowBytes(
+ getChild(0).getLong(rowId), getChild(1).getShort(rowId));
}
/**
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 1ca9290e3b7c2..d07a345f7dfbd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -40,6 +40,7 @@
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
@@ -106,6 +107,9 @@ public static void populate(
} else if (pdt instanceof PhysicalCalendarIntervalType) {
// The value of `numRows` is irrelevant.
col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
+ } else if (pdt instanceof PhysicalTimestampNTZNanosType ||
+ pdt instanceof PhysicalTimestampLTZNanosType) {
+ col.setTimestampNanosVal((TimestampNanosVal) row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalVariantType) {
col.setVariant((VariantVal)row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalStructType) {
@@ -171,7 +175,8 @@ public static Map toJavaIntMap(ColumnarMap map) {
private static void appendValue(WritableColumnVector dst, DataType t, Object o) {
if (o == null) {
- if (t instanceof CalendarIntervalType || t instanceof VariantType) {
+ if (t instanceof CalendarIntervalType || t instanceof VariantType ||
+ t instanceof TimestampNTZNanosType || t instanceof TimestampLTZNanosType) {
dst.appendStruct(true);
} else {
dst.appendNull();
@@ -219,6 +224,11 @@ private static void appendValue(WritableColumnVector dst, DataType t, Object o)
dst.appendStruct(false);
dst.getChild(0).appendByteArray(v.getValue(), 0, v.getValue().length);
dst.getChild(1).appendByteArray(v.getMetadata(), 0, v.getMetadata().length);
+ } else if (t instanceof TimestampNTZNanosType || t instanceof TimestampLTZNanosType) {
+ TimestampNanosVal v = (TimestampNanosVal) o;
+ dst.appendStruct(false);
+ dst.getChild(0).appendLong(v.epochMicros);
+ dst.getChild(1).appendShort(v.nanosWithinMicro);
} else if (t instanceof DateType) {
dst.appendInt(DateTimeUtils.fromJavaDate((Date) o));
} else if (t instanceof TimestampType) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
index 094d6edb6d259..fd99df0fb9f0f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
@@ -24,6 +24,7 @@
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
@@ -73,6 +74,11 @@ public ConstantColumnVector(int numRows, DataType type) {
this.childData[0] = new ConstantColumnVector(1, DataTypes.IntegerType);
this.childData[1] = new ConstantColumnVector(1, DataTypes.IntegerType);
this.childData[2] = new ConstantColumnVector(1, DataTypes.LongType);
+ } else if (type instanceof TimestampNTZNanosType || type instanceof TimestampLTZNanosType) {
+ // Two columns. EpochMicros as Long. NanosWithinMicro as Short.
+ this.childData = new ConstantColumnVector[2];
+ this.childData[0] = new ConstantColumnVector(1, DataTypes.LongType);
+ this.childData[1] = new ConstantColumnVector(1, DataTypes.ShortType);
} else if (type instanceof VariantType) {
this.childData = new ConstantColumnVector[2];
this.childData[0] = new ConstantColumnVector(1, DataTypes.BinaryType);
@@ -359,6 +365,14 @@ public void setCalendarInterval(CalendarInterval value) {
this.childData[2].setLong(value.microseconds);
}
+ /**
+ * Sets the nanosecond timestamp `value` for all rows
+ */
+ public void setTimestampNanosVal(TimestampNanosVal value) {
+ this.childData[0].setLong(value.epochMicros);
+ this.childData[1].setShort(value.nanosWithinMicro);
+ }
+
/**
* Sets the Variant `value` for all rows
*/
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index b06dc50d9018b..3495d660bb7d5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -99,6 +99,10 @@ public InternalRow copy() {
row.update(i, getMap(i).copy());
} else if (dt instanceof VariantType) {
row.update(i, getVariant(i));
+ } else if (dt instanceof TimestampNTZNanosType) {
+ row.update(i, getTimestampNTZNanos(i));
+ } else if (dt instanceof TimestampLTZNanosType) {
+ row.update(i, getTimestampLTZNanos(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
@@ -232,6 +236,10 @@ public Object get(int ordinal, DataType dataType) {
return getMap(ordinal);
} else if (dataType instanceof VariantType) {
return getVariant(ordinal);
+ } else if (dataType instanceof TimestampNTZNanosType) {
+ return getTimestampNTZNanos(ordinal);
+ } else if (dataType instanceof TimestampLTZNanosType) {
+ return getTimestampLTZNanos(ordinal);
} else {
throw new SparkUnsupportedOperationException(
"_LEGACY_ERROR_TEMP_3192", Map.of("dt", dataType.toString()));
@@ -261,6 +269,10 @@ public void update(int ordinal, Object value) {
setDecimal(ordinal, d, t.precision());
} else if (dt instanceof CalendarIntervalType) {
setInterval(ordinal, (CalendarInterval) value);
+ } else if (dt instanceof TimestampNTZNanosType) {
+ setTimestampNTZNanos(ordinal, (TimestampNanosVal) value);
+ } else if (dt instanceof TimestampLTZNanosType) {
+ setTimestampLTZNanos(ordinal, (TimestampNanosVal) value);
} else {
throw new SparkUnsupportedOperationException(
"_LEGACY_ERROR_TEMP_3192", Map.of("dt", dt.toString()));
@@ -326,4 +338,14 @@ public void setInterval(int ordinal, CalendarInterval value) {
columns[ordinal].putNotNull(rowId);
columns[ordinal].putInterval(rowId, value);
}
+
+ public void setTimestampNTZNanos(int ordinal, TimestampNanosVal value) {
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putTimestampNTZNanos(rowId, value);
+ }
+
+ public void setTimestampLTZNanos(int ordinal, TimestampNanosVal value) {
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putTimestampLTZNanos(rowId, value);
+ }
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 0f5b23ad85390..302840dbe2f3b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -33,6 +33,7 @@
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
/**
@@ -493,6 +494,16 @@ public void putInterval(int rowId, CalendarInterval value) {
getChild(2).putLong(rowId, value.microseconds);
}
+ public void putTimestampNTZNanos(int rowId, TimestampNanosVal value) {
+ getChild(0).putLong(rowId, value.epochMicros);
+ getChild(1).putShort(rowId, value.nanosWithinMicro);
+ }
+
+ public void putTimestampLTZNanos(int rowId, TimestampNanosVal value) {
+ getChild(0).putLong(rowId, value.epochMicros);
+ getChild(1).putShort(rowId, value.nanosWithinMicro);
+ }
+
@Override
public UTF8String getUTF8String(int rowId) {
if (isNullAt(rowId)) return null;
@@ -751,7 +762,9 @@ public final int appendStruct(boolean isNull) {
putNull(elementsAppended);
elementsAppended++;
for (WritableColumnVector c: childColumns) {
- if (c.type instanceof StructType || c.type instanceof VariantType) {
+ if (c.type instanceof StructType || c.type instanceof VariantType
+ || c.type instanceof TimestampNTZNanosType
+ || c.type instanceof TimestampLTZNanosType) {
c.appendStruct(true);
} else {
c.appendNull();
@@ -1056,6 +1069,11 @@ protected WritableColumnVector(int capacity, DataType dataType) {
this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType);
+ } else if (type instanceof TimestampNTZNanosType || type instanceof TimestampLTZNanosType) {
+ // Two columns. EpochMicros as Long. NanosWithinMicro as Short.
+ this.childColumns = new WritableColumnVector[2];
+ this.childColumns[0] = reserveNewColumn(capacity, DataTypes.LongType);
+ this.childColumns[1] = reserveNewColumn(capacity, DataTypes.ShortType);
} else if (type instanceof VariantType) {
this.childColumns = new WritableColumnVector[2];
this.childColumns[0] = reserveNewColumn(capacity, DataTypes.BinaryType);
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 00b1f0248cd3e..4cfb258704563 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -272,6 +272,8 @@ private object RowToColumnConverter {
case _: GeometryType => GeometryConverter
case CalendarIntervalType => CalendarConverter
case VariantType => VariantConverter
+ case _: TimestampNTZNanosType => TimestampNTZNanosConverter
+ case _: TimestampLTZNanosType => TimestampLTZNanosConverter
case at: ArrayType => ArrayConverter(getConverterForType(at.elementType, at.containsNull))
case st: StructType => new StructConverter(st.fields.map(
(f) => getConverterForType(f.dataType, f.nullable)))
@@ -284,6 +286,8 @@ private object RowToColumnConverter {
if (nullable) {
dataType match {
case CalendarIntervalType | VariantType => new StructNullableTypeConverter(core)
+ case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
+ new StructNullableTypeConverter(core)
case st: StructType => new StructNullableTypeConverter(core)
case _ => new BasicNullableTypeConverter(core)
}
@@ -374,6 +378,24 @@ private object RowToColumnConverter {
}
}
+ private object TimestampNTZNanosConverter extends TypeConverter {
+ override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = {
+ val v = row.getTimestampNTZNanos(column)
+ cv.appendStruct(false)
+ cv.getChild(0).appendLong(v.epochMicros)
+ cv.getChild(1).appendShort(v.nanosWithinMicro)
+ }
+ }
+
+ private object TimestampLTZNanosConverter extends TypeConverter {
+ override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = {
+ val v = row.getTimestampLTZNanos(column)
+ cv.appendStruct(false)
+ cv.getChild(0).appendLong(v.epochMicros)
+ cv.getChild(1).appendShort(v.nanosWithinMicro)
+ }
+ }
+
private case class ArrayConverter(childConverter: TypeConverter) extends TypeConverter {
override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = {
val values = row.getArray(column)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowToColumnConverterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowToColumnConverterSuite.scala
index 621228fabf875..96d7fb6fbd094 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowToColumnConverterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowToColumnConverterSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String}
import org.apache.spark.util.ArrayImplicits._
class RowToColumnConverterSuite extends SparkFunSuite {
@@ -130,6 +130,60 @@ class RowToColumnConverterSuite extends SparkFunSuite {
}
}
+ test("TimestampNTZNanosType column roundtrip") {
+ val t = TimestampNTZNanosType(9)
+ val schema = StructType(Seq(StructField("ts", t)))
+ val values = Seq(
+ TimestampNanosVal.fromParts(0L, 0.toShort),
+ TimestampNanosVal.fromParts(1_000_000L, 999.toShort),
+ TimestampNanosVal.fromParts(-1L, 123.toShort))
+ val rows = values.map(v => InternalRow(v))
+ val vectors = convertRows(rows, schema)
+ values.zipWithIndex.foreach { case (v, i) =>
+ assert(vectors.head.getTimestampNTZNanos(i) === v)
+ }
+ }
+
+ test("TimestampNTZNanosType column with nulls") {
+ val t = TimestampNTZNanosType(9)
+ val schema = StructType(Seq(StructField("ts", t, nullable = true)))
+ val rows = Seq(
+ InternalRow(TimestampNanosVal.fromParts(100L, 42.toShort)),
+ InternalRow(null),
+ InternalRow(TimestampNanosVal.fromParts(200L, 1.toShort)))
+ val vectors = convertRows(rows, schema)
+ assert(vectors.head.getTimestampNTZNanos(0) === TimestampNanosVal.fromParts(100L, 42.toShort))
+ assert(vectors.head.isNullAt(1))
+ assert(vectors.head.getTimestampNTZNanos(2) === TimestampNanosVal.fromParts(200L, 1.toShort))
+ }
+
+ test("TimestampLTZNanosType column roundtrip") {
+ val t = TimestampLTZNanosType(9)
+ val schema = StructType(Seq(StructField("ts", t)))
+ val values = Seq(
+ TimestampNanosVal.fromParts(0L, 0.toShort),
+ TimestampNanosVal.fromParts(1_000_000L, 999.toShort),
+ TimestampNanosVal.fromParts(-1L, 123.toShort))
+ val rows = values.map(v => InternalRow(v))
+ val vectors = convertRows(rows, schema)
+ values.zipWithIndex.foreach { case (v, i) =>
+ assert(vectors.head.getTimestampLTZNanos(i) === v)
+ }
+ }
+
+ test("TimestampLTZNanosType column with nulls") {
+ val t = TimestampLTZNanosType(9)
+ val schema = StructType(Seq(StructField("ts", t, nullable = true)))
+ val rows = Seq(
+ InternalRow(TimestampNanosVal.fromParts(100L, 42.toShort)),
+ InternalRow(null),
+ InternalRow(TimestampNanosVal.fromParts(200L, 1.toShort)))
+ val vectors = convertRows(rows, schema)
+ assert(vectors.head.getTimestampLTZNanos(0) === TimestampNanosVal.fromParts(100L, 42.toShort))
+ assert(vectors.head.isNullAt(1))
+ assert(vectors.head.getTimestampLTZNanos(2) === TimestampNanosVal.fromParts(200L, 1.toShort))
+ }
+
test("multiple columns") {
val schema = StructType(
Seq(StructField("s", ShortType), StructField("i", IntegerType), StructField("l", LongType)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 8c84742c16efb..e55a793dee767 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarArray
-import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
+import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String, VariantVal}
import org.apache.spark.util.ArrayImplicits._
class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
@@ -379,6 +379,66 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
}
}
+ testVectors("timestamp_ntz_nanos", 10, TimestampNTZNanosType(9)) { testVector =>
+ val values = (0 until 10).map(i => TimestampNanosVal.fromParts(i * 1000L, i.toShort))
+ values.foreach { v =>
+ testVector.putNotNull(testVector.elementsAppended)
+ testVector.putTimestampNTZNanos(testVector.elementsAppended, v)
+ testVector.elementsAppended += 1
+ }
+ values.zipWithIndex.foreach { case (v, i) =>
+ assert(testVector.getTimestampNTZNanos(i) === v)
+ }
+ testVector.putNull(0)
+ assert(testVector.isNullAt(0))
+ }
+
+ testVectors("timestamp_ltz_nanos", 10, TimestampLTZNanosType(9)) { testVector =>
+ val values = (0 until 10).map(i => TimestampNanosVal.fromParts(i * 1000L, i.toShort))
+ values.foreach { v =>
+ testVector.putNotNull(testVector.elementsAppended)
+ testVector.putTimestampLTZNanos(testVector.elementsAppended, v)
+ testVector.elementsAppended += 1
+ }
+ values.zipWithIndex.foreach { case (v, i) =>
+ assert(testVector.getTimestampLTZNanos(i) === v)
+ }
+ testVector.putNull(0)
+ assert(testVector.isNullAt(0))
+ }
+
+ testVectors("mutable ColumnarRow with TimestampNTZNanosType", 5,
+ TimestampNTZNanosType(9)) { testVector =>
+ val mutableRow = new MutableColumnarRow(Array(testVector))
+ val values = (0 until 5).map(i => TimestampNanosVal.fromParts(i * 100L, i.toShort))
+ values.zipWithIndex.foreach { case (v, i) =>
+ mutableRow.rowId = i
+ mutableRow.setTimestampNTZNanos(0, v)
+ }
+ values.zipWithIndex.foreach { case (v, i) =>
+ mutableRow.rowId = i
+ assert(mutableRow.getTimestampNTZNanos(0) === v)
+ assert(mutableRow.get(0, TimestampNTZNanosType(9)) === v)
+ assert(mutableRow.copy().get(0, TimestampNTZNanosType(9)) === v)
+ }
+ }
+
+ testVectors("mutable ColumnarRow with TimestampLTZNanosType", 5,
+ TimestampLTZNanosType(9)) { testVector =>
+ val mutableRow = new MutableColumnarRow(Array(testVector))
+ val values = (0 until 5).map(i => TimestampNanosVal.fromParts(i * 100L, i.toShort))
+ values.zipWithIndex.foreach { case (v, i) =>
+ mutableRow.rowId = i
+ mutableRow.setTimestampLTZNanos(0, v)
+ }
+ values.zipWithIndex.foreach { case (v, i) =>
+ mutableRow.rowId = i
+ assert(mutableRow.getTimestampLTZNanos(0) === v)
+ assert(mutableRow.get(0, TimestampLTZNanosType(9)) === v)
+ assert(mutableRow.copy().get(0, TimestampLTZNanosType(9)) === v)
+ }
+ }
+
testVectors("mutable ColumnarRow with TimestampNTZType", 10, TimestampNTZType) { testVector =>
val mutableRow = new MutableColumnarRow(Array(testVector))
(0 until 10).foreach { i =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala
index b1c0d6c1d7d51..314eac92c195e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal}
import org.apache.spark.unsafe.types.UTF8String
class ColumnVectorUtilsSuite extends SparkFunSuite {
@@ -135,6 +135,24 @@ class ColumnVectorUtilsSuite extends SparkFunSuite {
}
}
+ testConstantColumnVector("fill TimestampNTZNanosType", 10,
+ TimestampNTZNanosType(9)) { vector =>
+ val v = TimestampNanosVal.fromParts(1_234_567L, 999.toShort)
+ ColumnVectorUtils.populate(vector, InternalRow(v), 0)
+ (0 until 10).foreach { i =>
+ assert(vector.getTimestampNTZNanos(i) === v)
+ }
+ }
+
+ testConstantColumnVector("fill TimestampLTZNanosType", 10,
+ TimestampLTZNanosType(9)) { vector =>
+ val v = TimestampNanosVal.fromParts(-999L, 42.toShort)
+ ColumnVectorUtils.populate(vector, InternalRow(v), 0)
+ (0 until 10).foreach { i =>
+ assert(vector.getTimestampLTZNanos(i) === v)
+ }
+ }
+
testConstantColumnVector("fill array of ints", 10, ArrayType(IntegerType)) { vector =>
val arr = new GenericArrayData(Array[Any](1, 2, 3, 4, 5))
ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 40f73450eb21d..3b5f05210dee3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnarBatchRow, ColumnVector}
import org.apache.spark.tags.ExtendedSQLTest
import org.apache.spark.unsafe.Platform
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
+import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal, UTF8String, VariantVal}
import org.apache.spark.util.ArrayImplicits._
/**
@@ -1694,6 +1694,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
StructField("binary", BinaryType) ::
StructField("ts_ntz", TimestampNTZType) ::
StructField("variant", VariantType) ::
+ StructField("ts_ntz_nanos", TimestampNTZNanosType(9)) ::
+ StructField("ts_ltz_nanos", TimestampLTZNanosType(9)) ::
Nil)
var mapBuilder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
mapBuilder.put(1, 10)
@@ -1710,6 +1712,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
val variantVal1 = new VariantVal(Array[Byte](1, 2, 3), Array[Byte](4, 5))
val variantVal2 = new VariantVal(Array[Byte](6), Array[Byte](7, 8))
+ val tsNTZNanos1 = TimestampNanosVal.fromParts(1_000_000L, 123.toShort)
+ val tsNTZNanos2 = TimestampNanosVal.fromParts(-500L, 999.toShort)
+ val tsLTZNanos1 = TimestampNanosVal.fromParts(2_000_000L, 42.toShort)
+ val tsLTZNanos2 = TimestampNanosVal.fromParts(0L, 0.toShort)
val row1 = new GenericInternalRow(Array[Any](
UTF8String.fromString("a string"),
@@ -1729,7 +1735,9 @@ class ColumnarBatchSuite extends SparkFunSuite {
mapBuilder.build(),
"Spark SQL".getBytes(),
tsNTZ1,
- variantVal1
+ variantVal1,
+ tsNTZNanos1,
+ tsLTZNanos1
))
mapBuilder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
@@ -1753,7 +1761,9 @@ class ColumnarBatchSuite extends SparkFunSuite {
mapBuilder.build(),
"Parquet".getBytes(),
tsNTZ2,
- variantVal2
+ variantVal2,
+ tsNTZNanos2,
+ tsLTZNanos2
))
val row3 = new GenericInternalRow(Array[Any](
@@ -1774,6 +1784,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
null,
null,
null,
+ null,
+ null,
null
))
@@ -1909,6 +1921,20 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(columns(17).isNullAt(2))
assert(columns(17).getChild(0).isNullAt(2))
assert(columns(17).getChild(1).isNullAt(2))
+
+ assert(columns(18).dataType() == TimestampNTZNanosType(9))
+ assert(columns(18).getTimestampNTZNanos(0) == tsNTZNanos1)
+ assert(columns(18).getTimestampNTZNanos(1) == tsNTZNanos2)
+ assert(columns(18).isNullAt(2))
+ assert(columns(18).getChild(0).isNullAt(2))
+ assert(columns(18).getChild(1).isNullAt(2))
+
+ assert(columns(19).dataType() == TimestampLTZNanosType(9))
+ assert(columns(19).getTimestampLTZNanos(0) == tsLTZNanos1)
+ assert(columns(19).getTimestampLTZNanos(1) == tsLTZNanos2)
+ assert(columns(19).isNullAt(2))
+ assert(columns(19).getChild(0).isNullAt(2))
+ assert(columns(19).getChild(1).isNullAt(2))
} finally {
batch.close()
}