diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala new file mode 100644 index 0000000000000..025d7a3385338 --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimestampNanosTypeApiOps.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types.ops + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.errors.DataTypeErrorsBase +import org.apache.spark.sql.types.{TimestampLTZNanosType, TimestampNTZNanosType} +import org.apache.spark.unsafe.types.TimestampNanosVal + +/** + * Client-side (spark-api) operations shared by the nanosecond timestamp types + * (TimestampNTZNanosType and TimestampLTZNanosType). + * + * Internal values are [[org.apache.spark.unsafe.types.TimestampNanosVal]] (epoch micros + nanos + * within the micro). The two concrete subclasses differ only in their DataType and SQL-literal + * prefix; storage and formatting are identical. + * + * SCOPE (SPARK-57101): this issue wires physical representation, literals, row accessors, and + * codegen class selection. String formatting here is an interim implementation until dedicated + * fractional-second formatters land in a follow-up issue; Dataset encoders are out of scope + * (SPARK-57033 and related), so getEncoder reports the type as unsupported, matching the legacy + * RowEncoder behavior. + * + * @since 4.3.0 + */ +abstract class TimestampNanosTypeApiOps extends TypeApiOps with DataTypeErrorsBase { + + /** SQL literal prefix for this type, e.g. "TIMESTAMP_NTZ" or "TIMESTAMP_LTZ". */ + protected def sqlTypeName: String + + // ==================== String Formatting (interim) ==================== + + override def format(v: Any): String = v.asInstanceOf[TimestampNanosVal].toString + + override def toSQLValue(v: Any): String = s"$sqlTypeName '${format(v)}'" + + // ==================== Row Encoding ==================== + + // Encoders are handled in a follow-up issue (SPARK-57033). Until then, report the type as + // unsupported with the same error as the legacy RowEncoder fallback to preserve parity. + override def getEncoder: AgnosticEncoder[_] = + throw new AnalysisException( + errorClass = "UNSUPPORTED_DATA_TYPE_FOR_ENCODER", + messageParameters = Map("dataType" -> toSQLType(dataType))) +} + +/** + * Client-side operations for [[org.apache.spark.sql.types.TimestampNTZNanosType]]. + * + * @param t + * The TimestampNTZNanosType with precision information + * @since 4.3.0 + */ +class TimestampNTZNanosTypeApiOps(val t: TimestampNTZNanosType) extends TimestampNanosTypeApiOps { + override def dataType: TimestampNTZNanosType = t + override protected def sqlTypeName: String = "TIMESTAMP_NTZ" +} + +/** + * Client-side operations for [[org.apache.spark.sql.types.TimestampLTZNanosType]]. + * + * @param t + * The TimestampLTZNanosType with precision information + * @since 4.3.0 + */ +class TimestampLTZNanosTypeApiOps(val t: TimestampLTZNanosType) extends TimestampNanosTypeApiOps { + override def dataType: TimestampLTZNanosType = t + override protected def sqlTypeName: String = "TIMESTAMP_LTZ" +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala index fff5b8b6a022e..ae690119f1485 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala @@ -21,7 +21,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.internal.SqlApiConf -import org.apache.spark.sql.types.{DataType, TimeType} +import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType, TimestampNTZNanosType, TimeType} import org.apache.spark.unsafe.types.UTF8String /** @@ -159,6 +159,8 @@ object TypeApiOps { if (!SqlApiConf.get.typesFrameworkEnabled) return None dt match { case tt: TimeType => Some(new TimeTypeApiOps(tt)) + case t: TimestampNTZNanosType => Some(new TimestampNTZNanosTypeApiOps(t)) + case t: TimestampLTZNanosType => Some(new TimestampLTZNanosTypeApiOps(t)) // Add new types here - single registration point case _ => None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala index 1323f3737cff8..68bc7fb74b3ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala @@ -145,28 +145,32 @@ object InternalRow { def getAccessor(dt: DataType, nullable: Boolean = true): (SpecializedGetters, Int) => Any = { val getValueNullSafe: (SpecializedGetters, Int) => Any = dt match { case u: UserDefinedType[_] => getAccessor(u.sqlType, nullable) - case _ => PhysicalDataType(dt) match { - case PhysicalBooleanType => (input, ordinal) => input.getBoolean(ordinal) - case PhysicalByteType => (input, ordinal) => input.getByte(ordinal) - case PhysicalShortType => (input, ordinal) => input.getShort(ordinal) - case PhysicalIntegerType => (input, ordinal) => input.getInt(ordinal) - case PhysicalLongType => (input, ordinal) => input.getLong(ordinal) - case PhysicalFloatType => (input, ordinal) => input.getFloat(ordinal) - case PhysicalDoubleType => (input, ordinal) => input.getDouble(ordinal) - case _: PhysicalStringType => (input, ordinal) => input.getUTF8String(ordinal) - case PhysicalBinaryType => (input, ordinal) => input.getBinary(ordinal) - case PhysicalCalendarIntervalType => (input, ordinal) => input.getInterval(ordinal) - case PhysicalTimestampNTZNanosType => (input, ordinal) => - input.getTimestampNTZNanos(ordinal) - case PhysicalTimestampLTZNanosType => (input, ordinal) => - input.getTimestampLTZNanos(ordinal) - case t: PhysicalDecimalType => (input, ordinal) => - input.getDecimal(ordinal, t.precision, t.scale) - case t: PhysicalStructType => (input, ordinal) => input.getStruct(ordinal, t.fields.length) - case _: PhysicalArrayType => (input, ordinal) => input.getArray(ordinal) - case _: PhysicalMapType => (input, ordinal) => input.getMap(ordinal) - case _ => (input, ordinal) => input.get(ordinal, dt) - } + case _ => + TypeOps(dt).map(_.getScalaAccessor).getOrElse { + PhysicalDataType(dt) match { + case PhysicalBooleanType => (input, ordinal) => input.getBoolean(ordinal) + case PhysicalByteType => (input, ordinal) => input.getByte(ordinal) + case PhysicalShortType => (input, ordinal) => input.getShort(ordinal) + case PhysicalIntegerType => (input, ordinal) => input.getInt(ordinal) + case PhysicalLongType => (input, ordinal) => input.getLong(ordinal) + case PhysicalFloatType => (input, ordinal) => input.getFloat(ordinal) + case PhysicalDoubleType => (input, ordinal) => input.getDouble(ordinal) + case _: PhysicalStringType => (input, ordinal) => input.getUTF8String(ordinal) + case PhysicalBinaryType => (input, ordinal) => input.getBinary(ordinal) + case PhysicalCalendarIntervalType => (input, ordinal) => input.getInterval(ordinal) + case PhysicalTimestampNTZNanosType => (input, ordinal) => + input.getTimestampNTZNanos(ordinal) + case PhysicalTimestampLTZNanosType => (input, ordinal) => + input.getTimestampLTZNanos(ordinal) + case t: PhysicalDecimalType => (input, ordinal) => + input.getDecimal(ordinal, t.precision, t.scale) + case t: PhysicalStructType => + (input, ordinal) => input.getStruct(ordinal, t.fields.length) + case _: PhysicalArrayType => (input, ordinal) => input.getArray(ordinal) + case _: PhysicalMapType => (input, ordinal) => input.getMap(ordinal) + case _ => (input, ordinal) => input.get(ordinal, dt) + } + } } if (nullable) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala index a5b8d0857c998..88c3e181e645b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.types.ops.TypeOps import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.TimestampNanosVal /** * A parent class for mutable container objects that are reused when the values are changed, @@ -186,6 +187,21 @@ final class MutableAny extends MutableValue { } } +final class MutableTimestampNanos extends MutableValue { + var value: TimestampNanosVal = _ + override def boxed: Any = if (isNull) null else value + override def update(v: Any): Unit = { + isNull = false + value = v.asInstanceOf[TimestampNanosVal] + } + override def copy(): MutableTimestampNanos = { + val newCopy = new MutableTimestampNanos + newCopy.isNull = isNull + newCopy.value = value + newCopy + } +} + /** * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen * based on the dataTypes of each column. The intent is to decrease garbage when modifying the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 330116e592482..252bfd6e49c26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1689,23 +1689,27 @@ object CodeGenerator extends Logging { val jt = javaType(dataType) dataType match { case udt: UserDefinedType[_] => getValue(input, udt.sqlType, ordinal) - case _ if isPrimitiveType(jt) => s"$input.get${primitiveTypeName(jt)}($ordinal)" - case _ => PhysicalDataType(dataType) match { - case _: PhysicalArrayType => s"$input.getArray($ordinal)" - case PhysicalBinaryType => s"$input.getBinary($ordinal)" - case _: PhysicalGeographyType => s"$input.getGeography($ordinal)" - case _: PhysicalGeometryType => s"$input.getGeometry($ordinal)" - case PhysicalCalendarIntervalType => s"$input.getInterval($ordinal)" - case PhysicalTimestampNTZNanosType => s"$input.getTimestampNTZNanos($ordinal)" - case PhysicalTimestampLTZNanosType => s"$input.getTimestampLTZNanos($ordinal)" - case t: PhysicalDecimalType => s"$input.getDecimal($ordinal, ${t.precision}, ${t.scale})" - case _: PhysicalMapType => s"$input.getMap($ordinal)" - case PhysicalNullType => "null" - case _: PhysicalStringType => s"$input.getUTF8String($ordinal)" - case t: PhysicalStructType => s"$input.getStruct($ordinal, ${t.fields.length})" - case PhysicalVariantType => s"$input.getVariant($ordinal)" - case _ => s"($jt)$input.get($ordinal, null)" - } + case _ => + TypeOps(dataType).map(_.getCodegenGetter(input, ordinal)).getOrElse { + if (isPrimitiveType(jt)) s"$input.get${primitiveTypeName(jt)}($ordinal)" + else PhysicalDataType(dataType) match { + case _: PhysicalArrayType => s"$input.getArray($ordinal)" + case PhysicalBinaryType => s"$input.getBinary($ordinal)" + case _: PhysicalGeographyType => s"$input.getGeography($ordinal)" + case _: PhysicalGeometryType => s"$input.getGeometry($ordinal)" + case PhysicalCalendarIntervalType => s"$input.getInterval($ordinal)" + case PhysicalTimestampNTZNanosType => s"$input.getTimestampNTZNanos($ordinal)" + case PhysicalTimestampLTZNanosType => s"$input.getTimestampLTZNanos($ordinal)" + case t: PhysicalDecimalType => + s"$input.getDecimal($ordinal, ${t.precision}, ${t.scale})" + case _: PhysicalMapType => s"$input.getMap($ordinal)" + case PhysicalNullType => "null" + case _: PhysicalStringType => s"$input.getUTF8String($ordinal)" + case t: PhysicalStructType => s"$input.getStruct($ordinal, ${t.fields.length})" + case PhysicalVariantType => s"$input.getVariant($ordinal)" + case _ => s"($jt)$input.get($ordinal, null)" + } + } } } @@ -1769,20 +1773,28 @@ object CodeGenerator extends Logging { def setColumn(row: String, dataType: DataType, ordinal: Int, value: String): String = { val jt = javaType(dataType) dataType match { - case _ if isPrimitiveType(jt) => s"$row.set${primitiveTypeName(jt)}($ordinal, $value)" + case udt: UserDefinedType[_] => setColumn(row, udt.sqlType, ordinal, value) case CalendarIntervalType => s"$row.setInterval($ordinal, $value)" - case _: TimestampNTZNanosType => s"$row.setTimestampNTZNanos($ordinal, $value)" - case _: TimestampLTZNanosType => s"$row.setTimestampLTZNanos($ordinal, $value)" case t: DecimalType => s"$row.setDecimal($ordinal, $value, ${t.precision})" - case udt: UserDefinedType[_] => setColumn(row, udt.sqlType, ordinal, value) // The UTF8String, InternalRow, ArrayData and MapData may came from UnsafeRow, we should copy // it to avoid keeping a "pointer" to a memory region which may get updated afterwards. case _: StringType | _: StructType | _: ArrayType | _: MapType => s"$row.update($ordinal, $value.copy())" - case _ => s"$row.update($ordinal, $value)" + case _ => + TypeOps(dataType).map(_.getCodegenSetter(row, ordinal, value)).getOrElse { + if (isPrimitiveType(jt)) s"$row.set${primitiveTypeName(jt)}($ordinal, $value)" + else setColumnDefault(row, dataType, ordinal, value) + } } } + private def setColumnDefault( + row: String, dataType: DataType, ordinal: Int, value: String): String = dataType match { + case _: TimestampNTZNanosType => s"$row.setTimestampNTZNanos($ordinal, $value)" + case _: TimestampLTZNanosType => s"$row.setTimestampLTZNanos($ordinal, $value)" + case _ => s"$row.update($ordinal, $value)" + } + /** * Update a column in MutableRow from ExprCode. * @@ -1982,26 +1994,29 @@ object CodeGenerator extends Logging { case udt: UserDefinedType[_] => javaType(udt.sqlType) case ObjectType(cls) if cls.isArray => s"${javaType(ObjectType(cls.getComponentType))}[]" case ObjectType(cls) => cls.getName - case _ => PhysicalDataType(dt) match { - case _: PhysicalArrayType => "ArrayData" - case PhysicalBinaryType => "byte[]" - case PhysicalBooleanType => JAVA_BOOLEAN - case PhysicalByteType => JAVA_BYTE - case PhysicalCalendarIntervalType => "CalendarInterval" - case PhysicalTimestampNTZNanosType => "TimestampNanosVal" - case PhysicalTimestampLTZNanosType => "TimestampNanosVal" - case PhysicalIntegerType => JAVA_INT - case _: PhysicalDecimalType => "Decimal" - case PhysicalDoubleType => JAVA_DOUBLE - case PhysicalFloatType => JAVA_FLOAT - case PhysicalLongType => JAVA_LONG - case _: PhysicalMapType => "MapData" - case PhysicalShortType => JAVA_SHORT - case _: PhysicalStringType => "UTF8String" - case _: PhysicalStructType => "InternalRow" - case _: PhysicalVariantType => "VariantVal" - case _ => "Object" - } + case _ => + TypeOps(dt).map(_.getJavaClass.getSimpleName).getOrElse { + PhysicalDataType(dt) match { + case _: PhysicalArrayType => "ArrayData" + case PhysicalBinaryType => "byte[]" + case PhysicalBooleanType => JAVA_BOOLEAN + case PhysicalByteType => JAVA_BYTE + case PhysicalCalendarIntervalType => "CalendarInterval" + case PhysicalTimestampNTZNanosType => "TimestampNanosVal" + case PhysicalTimestampLTZNanosType => "TimestampNanosVal" + case PhysicalIntegerType => JAVA_INT + case _: PhysicalDecimalType => "Decimal" + case PhysicalDoubleType => JAVA_DOUBLE + case PhysicalFloatType => JAVA_FLOAT + case PhysicalLongType => JAVA_LONG + case _: PhysicalMapType => "MapData" + case PhysicalShortType => JAVA_SHORT + case _: PhysicalStringType => "UTF8String" + case _: PhysicalStructType => "InternalRow" + case _: PhysicalVariantType => "VariantVal" + case _ => "Object" + } + } } def javaClass(dt: DataType): Class[_] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 0f64eefe1a060..188b3ab8f9568 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.types.ops.TypeOps import org.apache.spark.sql.types._ /** @@ -113,9 +114,16 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro // Can't call setNullAt() for DecimalType with precision larger than 18. s"$rowWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});" case CalendarIntervalType => s"$rowWriter.write($index, (CalendarInterval) null);" - case _: TimestampNTZNanosType | _: TimestampLTZNanosType => - s"$rowWriter.write($index, (TimestampNanosVal) null);" - case _ => s"$rowWriter.setNullAt($index);" + case _ => + TypeOps(dt) + .flatMap(_.getCodegenNullWrite(rowWriter, index.toString)) + .getOrElse { + dt match { + case _: TimestampNTZNanosType | _: TimestampLTZNanosType => + s"$rowWriter.write($index, (TimestampNanosVal) null);" + case _ => s"$rowWriter.setNullAt($index);" + } + } } val writeField = writeElement(ctx, input.value, index.toString, dt, rowWriter) @@ -183,9 +191,16 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS => s"$arrayWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});" case CalendarIntervalType => s"$arrayWriter.write($index, (CalendarInterval) null);" - case _: TimestampNTZNanosType | _: TimestampLTZNanosType => - s"$arrayWriter.write($index, (TimestampNanosVal) null);" - case _ => s"$arrayWriter.setNull${elementOrOffsetSize}Bytes($index);" + case _ => + TypeOps(et) + .flatMap(_.getCodegenNullWrite(arrayWriter, index)) + .getOrElse { + et match { + case _: TimestampNTZNanosType | _: TimestampLTZNanosType => + s"$arrayWriter.write($index, (TimestampNanosVal) null);" + case _ => s"$arrayWriter.setNull${elementOrOffsetSize}Bytes($index);" + } + } } val elementAssignment = if (containsNull) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 0b1402b9103c8..0a77b005ee647 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -540,11 +540,15 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression { } case ByteType | ShortType => ExprCode.forNonNullValue(JavaCode.expression(s"($javaType)$value", dataType)) - case TimestampType | TimestampNTZType | LongType | _: DayTimeIntervalType | _: TimeType => + case TimestampType | TimestampNTZType | LongType | _: DayTimeIntervalType => toExprCode(s"${value}L") case _ => - val constRef = ctx.addReferenceObj("literal", value, javaType) - ExprCode.forNonNullValue(JavaCode.global(constRef, dataType)) + TypeOps(dataType) + .map(ops => toExprCode(ops.getJavaLiteral(value))) + .getOrElse { + val constRef = ctx.addReferenceObj("literal", value, javaType) + ExprCode.forNonNullValue(JavaCode.global(constRef, dataType)) + } } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala index 0cf152079c520..411a2bd410e24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimeTypeOps.scala @@ -22,7 +22,7 @@ import java.time.LocalTime import org.apache.arrow.vector.{TimeNanoVector, ValueVector} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, MutableLong, MutableValue} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, MutableLong, MutableValue, SpecializedGetters} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalLongType} import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -67,6 +67,9 @@ case class TimeTypeOps(override val t: TimeType) extends TimeTypeApiOps(t) with override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit = (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) + override def getScalaAccessor: (SpecializedGetters, Int) => Any = + (input, ordinal) => input.getLong(ordinal) + // ==================== Literal Creation ==================== override def getDefaultLiteral: Literal = Literal.create(0L, t) @@ -88,6 +91,14 @@ case class TimeTypeOps(override val t: TimeType) extends TimeTypeApiOps(t) with DateTimeUtils.nanosToLocalTime(row.getLong(column)) } + // ==================== Code Generation ==================== + + override def getCodegenGetter(input: String, ordinal: String): String = + s"$input.getLong($ordinal)" + + override def getCodegenSetter(row: String, ordinal: Int, value: String): String = + s"$row.setLong($ordinal, $value)" + // ==================== Optional Operations ==================== override def createSerializer(input: Expression): Option[Expression] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala new file mode 100644 index 0000000000000..26c0ff09e2baf --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOps.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.types.ops + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Literal, MutableTimestampNanos, MutableValue, SpecializedGetters} +import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalTimestampLTZNanosType, PhysicalTimestampNTZNanosType} +import org.apache.spark.sql.types.{TimestampLTZNanosType, TimestampNTZNanosType} +import org.apache.spark.sql.types.ops.{TimestampLTZNanosTypeApiOps, TimestampNTZNanosTypeApiOps} +import org.apache.spark.unsafe.types.TimestampNanosVal + +/** + * Server-side (catalyst) operations shared by the nanosecond timestamp types + * (TimestampNTZNanosType and TimestampLTZNanosType). + * + * Internal values are [[TimestampNanosVal]] (epoch micros + nanos within the micro), stored in + * [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] via a 16-byte variable-length payload; + * see [[org.apache.spark.sql.catalyst.expressions.TimestampNanosRowValues]]. + * + * SCOPE (SPARK-57101): this issue covers physical representation, literals, row accessors, and + * codegen class selection. java.time conversion is out of scope (SPARK-57033), so the external + * conversion methods are identity over [[TimestampNanosVal]], matching the legacy identity + * converter behavior. Concrete subclasses supply the NTZ/LTZ-specific physical type and row + * accessors. + * + * @since 4.3.0 + */ +trait TimestampNanosTypeOps extends TypeOps { + + // ==================== Physical Type Representation ==================== + + override def getJavaClass: Class[_] = classOf[TimestampNanosVal] + + override def getMutableValue: MutableValue = new MutableTimestampNanos + + // ==================== Literal Creation ==================== + + override def getDefaultLiteral: Literal = Literal.create(TimestampNanosVal.ZERO, dataType) + + override def getJavaLiteral(v: Any): String = { + val tn = v.asInstanceOf[TimestampNanosVal] + "org.apache.spark.unsafe.types.TimestampNanosVal.fromParts(" + + s"${tn.epochMicros}L, (short) ${tn.nanosWithinMicro})" + } + + // ==================== Code Generation ==================== + + // Both NTZ and LTZ store a variable-length 16-byte payload in UnsafeRow, so a typed null must + // use the write(i, (TimestampNanosVal) null) overload rather than setNullAt. + override def getCodegenNullWrite(writer: String, index: String): Option[String] = + Some(s"$writer.write($index, (TimestampNanosVal) null);") + + // ==================== External Type Conversion ==================== + + // java.time conversion is handled in a follow-up issue (SPARK-57033); until then values pass + // through unchanged as TimestampNanosVal, matching the legacy identity converter. + override def toCatalystImpl(scalaValue: Any): Any = scalaValue + + override def toScala(catalystValue: Any): Any = catalystValue +} + +/** + * Server-side operations for [[TimestampNTZNanosType]]. + * + * @param t + * The TimestampNTZNanosType with precision information + * @since 4.3.0 + */ +case class TimestampNTZNanosTypeOps(override val t: TimestampNTZNanosType) + extends TimestampNTZNanosTypeApiOps(t) with TimestampNanosTypeOps { + + override def getPhysicalType: PhysicalDataType = PhysicalTimestampNTZNanosType + + override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit = + (input, v) => input.setTimestampNTZNanos(ordinal, v.asInstanceOf[TimestampNanosVal]) + + override def getScalaAccessor: (SpecializedGetters, Int) => Any = + (input, ordinal) => input.getTimestampNTZNanos(ordinal) + + override def getCodegenGetter(input: String, ordinal: String): String = + s"$input.getTimestampNTZNanos($ordinal)" + + override def getCodegenSetter(row: String, ordinal: Int, value: String): String = + s"$row.setTimestampNTZNanos($ordinal, $value)" + + override def toScalaImpl(row: InternalRow, column: Int): Any = + row.getTimestampNTZNanos(column) +} + +/** + * Server-side operations for [[TimestampLTZNanosType]]. + * + * @param t + * The TimestampLTZNanosType with precision information + * @since 4.3.0 + */ +case class TimestampLTZNanosTypeOps(override val t: TimestampLTZNanosType) + extends TimestampLTZNanosTypeApiOps(t) with TimestampNanosTypeOps { + + override def getPhysicalType: PhysicalDataType = PhysicalTimestampLTZNanosType + + override def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit = + (input, v) => input.setTimestampLTZNanos(ordinal, v.asInstanceOf[TimestampNanosVal]) + + override def getScalaAccessor: (SpecializedGetters, Int) => Any = + (input, ordinal) => input.getTimestampLTZNanos(ordinal) + + override def getCodegenGetter(input: String, ordinal: String): String = + s"$input.getTimestampLTZNanos($ordinal)" + + override def getCodegenSetter(row: String, ordinal: Int, value: String): String = + s"$row.setTimestampLTZNanos($ordinal, $value)" + + override def toScalaImpl(row: InternalRow, column: Int): Any = + row.getTimestampLTZNanos(column) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala index 7240f0533aa35..c7987fc6921b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/TypeOps.scala @@ -23,11 +23,11 @@ import org.apache.arrow.vector.ValueVector import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.WalkedTypePath -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, MutableValue} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, MutableValue, SpecializedGetters} import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.execution.arrow.ArrowFieldWriter import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, TimeType} +import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType, TimestampNTZNanosType, TimeType} /** * Server-side (catalyst) type operations for the Types Framework. @@ -100,6 +100,17 @@ trait TypeOps extends Serializable { */ def getRowWriter(ordinal: Int): (InternalRow, Any) => Unit + /** + * Returns an accessor function for reading this type's raw internal value from a row. + * + * Used by InternalRow.getAccessor. Returns the internal Catalyst representation (e.g., Long for + * TimeType, TimestampNanosVal for nanosecond timestamps), not the external Scala type. + * + * @return + * accessor function (SpecializedGetters, Int) => Any + */ + def getScalaAccessor: (SpecializedGetters, Int) => Any + // ==================== Literal Creation ==================== /** @@ -181,6 +192,54 @@ trait TypeOps extends Serializable { if (row.isNullAt(column)) null else toScalaImpl(row, column) } + // ==================== Code Generation ==================== + + /** + * Returns the Java expression string that reads a value of this type from a row in generated + * code. + * + * @param input + * Java variable name of the SpecializedGetters / InternalRow + * @param ordinal + * Java expression for the column ordinal + * @return + * Java expression string, e.g. "input.getTimestampNTZNanos(ordinal)" + */ + def getCodegenGetter(input: String, ordinal: String): String + + /** + * Returns the Java statement string that writes a value of this type into a row in generated + * code. + * + * @param row + * Java variable name of the InternalRow + * @param ordinal + * column ordinal + * @param value + * Java expression for the value to write + * @return + * Java statement string, e.g. "row.setTimestampNTZNanos(ordinal, value)" + */ + def getCodegenSetter(row: String, ordinal: Int, value: String): String + + /** + * Returns the Java statement string that writes a typed null into an UnsafeWriter in generated + * code, or None to delegate to the caller's default. + * + * Fixed-size (primitive-backed) types return None: the caller knows the right default for its + * context (setNullAt for row writers, setNullXBytes for array writers). Variable-length types + * (e.g. nanosecond timestamps) return Some(...) with the typed write(i, (Type) null) overload + * that preserves the variable-length offset. + * + * @param writer + * Java variable name of the UnsafeRowWriter / UnsafeArrayWriter + * @param index + * Java expression for the column index + * @return + * Some(Java statement string) for variable-length types; None for fixed-size types + */ + def getCodegenNullWrite(writer: String, index: String): Option[String] = None + // ==================== Serialization (optional) ==================== /** Creates a serializer expression (external -> internal). */ @@ -231,6 +290,8 @@ object TypeOps { if (!SQLConf.get.typesFrameworkEnabled) return None dt match { case tt: TimeType => Some(TimeTypeOps(tt)) + case t: TimestampNTZNanosType => Some(TimestampNTZNanosTypeOps(t)) + case t: TimestampLTZNanosType => Some(TimestampLTZNanosTypeOps(t)) // Add new types here - single registration point case _ => None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0aed28e92558f..cc291315312ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -637,7 +637,8 @@ object SQLConf { val TYPES_FRAMEWORK_ENABLED = buildConf("spark.sql.types.framework.enabled") .internal() - .doc("When true, use the Types Framework for supported types (currently TimeType). " + + .doc("When true, use the Types Framework for supported types (currently TimeType and the " + + "nanosecond timestamp types TimestampNTZNanosType and TimestampLTZNanosType). " + "The framework centralizes type-specific operations in Ops classes instead of " + "scattered pattern matching. When false, use legacy scattered implementation.") .version("4.2.0") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala new file mode 100644 index 0000000000000..23b1d9950a5a8 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/types/ops/TimestampNanosTypeOpsSuite.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.types.ops + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal, MutableTimestampNanos, SpecificInternalRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator} +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalTimestampLTZNanosType, PhysicalTimestampNTZNanosType} +import org.apache.spark.sql.catalyst.types.ops.{TimestampLTZNanosTypeOps, TimestampNTZNanosTypeOps} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType, TimestampNTZNanosType} +import org.apache.spark.sql.types.ops.TypeApiOps +import org.apache.spark.unsafe.types.TimestampNanosVal + +/** + * Tests for the Types Framework wiring of the nanosecond timestamp types (SPARK-57101). + * + * Verifies that TimestampNTZNanosType and TimestampLTZNanosType route physical representation, + * literals, row accessors, mutable values, and codegen class selection through TypeOps/TypeApiOps + * when spark.sql.types.framework.enabled is true, and that disabling the flag falls back to the + * legacy paths with identical results. + */ +class TimestampNanosTypeOpsSuite extends SparkFunSuite with SQLHelper { + + private val precisions = Seq(7, 8, 9) + + private val ntzVal = TimestampNanosVal.fromParts(1234567890123L, 42.toShort) + private val ltzVal = TimestampNanosVal.fromParts(-98765L, 999.toShort) + + // (dataType, expected physical type, sample value) tuples covering NTZ and LTZ for p in {7,8,9}. + private def ntzCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] = + precisions.map(p => (TimestampNTZNanosType(p), PhysicalTimestampNTZNanosType, ntzVal)) + + private def ltzCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] = + precisions.map(p => (TimestampLTZNanosType(p), PhysicalTimestampLTZNanosType, ltzVal)) + + private def allCases: Seq[(DataType, PhysicalDataType, TimestampNanosVal)] = ntzCases ++ ltzCases + + private def checkPhysicalAndLiteralAndCodegen( + dt: DataType, + physical: PhysicalDataType): Unit = { + assert(PhysicalDataType(dt) === physical, s"physical type for $dt") + val default = Literal.default(dt) + assert(default.dataType === dt, s"default literal type for $dt") + assert(default.value === TimestampNanosVal.ZERO, s"default literal value for $dt") + assert(CodeGenerator.javaClass(dt) === classOf[TimestampNanosVal], s"javaClass for $dt") + } + + private def checkRowRoundtrip(dt: DataType, value: TimestampNanosVal): Unit = { + val accessor = InternalRow.getAccessor(dt) + val writer = InternalRow.getWriter(0, dt) + + val genericRow = new GenericInternalRow(Array[Any](null, null)) + writer(genericRow, value) + assert(accessor(genericRow, 0) === value, s"GenericInternalRow roundtrip for $dt") + assert(accessor(new GenericInternalRow(Array[Any](null, null)), 0) === null) + + val specificRow = new SpecificInternalRow(Seq(dt)) + specificRow.update(0, value) + assert(accessor(specificRow, 0) === value, s"SpecificInternalRow roundtrip for $dt") + specificRow.update(0, null) + assert(accessor(specificRow, 0) === null) + } + + test("TypeOps and TypeApiOps are registered when the framework is enabled") { + allCases.foreach { case (dt, _, _) => + assert(TypeOps(dt).isDefined, s"TypeOps should be defined for $dt") + assert(TypeApiOps(dt).isDefined, s"TypeApiOps should be defined for $dt") + } + } + + test("physical type, default literal, and codegen class (framework enabled)") { + allCases.foreach { case (dt, physical, _) => + checkPhysicalAndLiteralAndCodegen(dt, physical) + } + } + + test("InternalRow and SpecificInternalRow roundtrip (framework enabled)") { + allCases.foreach { case (dt, _, value) => + checkRowRoundtrip(dt, value) + } + } + + test("SpecificInternalRow uses a dedicated MutableTimestampNanos holder") { + allCases.foreach { case (dt, _, _) => + val row = new SpecificInternalRow(Seq(dt)) + assert(row.values(0).isInstanceOf[MutableTimestampNanos], + s"expected MutableTimestampNanos for $dt") + } + } + + test("getEncoder is unsupported until encoders are wired (SPARK-57033)") { + allCases.foreach { case (dt, _, _) => + val e = intercept[AnalysisException](TypeApiOps(dt).get.getEncoder) + assert(e.getCondition === "UNSUPPORTED_DATA_TYPE_FOR_ENCODER") + } + } + + test("toSQLValue uses the NTZ/LTZ literal prefix (interim debug format until SPARK-57033)") { + // format() currently delegates to TimestampNanosVal.toString() as an interim implementation. + // These assertions pin the full output so that the switch to a real timestamp formatter is + // visible as a test failure rather than silently passing a prefix-only check. + val ntzOps = TypeApiOps(TimestampNTZNanosType(9)).get + assert(ntzOps.toSQLValue(ntzVal) === s"TIMESTAMP_NTZ '${ntzVal.toString}'") + val ltzOps = TypeApiOps(TimestampLTZNanosType(9)).get + assert(ltzOps.toSQLValue(ltzVal) === s"TIMESTAMP_LTZ '${ltzVal.toString}'") + } + + test("getScalaAccessor reads the correct NTZ/LTZ value via InternalRow.getAccessor") { + allCases.foreach { case (dt, _, value) => + val accessor = InternalRow.getAccessor(dt) + val row = new GenericInternalRow(Array[Any](value)) + assert(accessor(row, 0) === value, s"getScalaAccessor roundtrip for $dt") + val nullRow = new GenericInternalRow(Array[Any](null)) + assert(accessor(nullRow, 0) === null, s"getScalaAccessor null for $dt") + } + } + + test("CodeGenerator.javaType routes through getJavaClass.getSimpleName") { + allCases.foreach { case (dt, _, _) => + assert(CodeGenerator.javaType(dt) === "TimestampNanosVal", + s"javaType for $dt") + } + } + + test("getCodegenGetter and getCodegenSetter produce correct method-call strings") { + val ntzOps = TimestampNTZNanosTypeOps(TimestampNTZNanosType(9)) + assert(ntzOps.getCodegenGetter("row", "i") === "row.getTimestampNTZNanos(i)") + assert(ntzOps.getCodegenSetter("row", 0, "v") === "row.setTimestampNTZNanos(0, v)") + + val ltzOps = TimestampLTZNanosTypeOps(TimestampLTZNanosType(7)) + assert(ltzOps.getCodegenGetter("row", "i") === "row.getTimestampLTZNanos(i)") + assert(ltzOps.getCodegenSetter("row", 0, "v") === "row.setTimestampLTZNanos(0, v)") + } + + test("getCodegenNullWrite returns the typed-null write statement for variable-length nanos") { + allCases.foreach { case (dt, _, _) => + val ops = TypeOps(dt).get + assert(ops.getCodegenNullWrite("w", "3") === + Some("w.write(3, (TimestampNanosVal) null);"), + s"getCodegenNullWrite for $dt") + } + } + + test("MutableTimestampNanos.copy preserves isNull and value, independent of original") { + // null state: freshly allocated holder must copy as null + val blank = new MutableTimestampNanos + assert(blank.isNull) + val blankCopy = blank.copy() + assert(blankCopy.isNull) + assert(blankCopy.value === null) + + // value state: copy must carry the same TimestampNanosVal + blank.update(ntzVal) + val valueCopy = blank.copy() + assert(!valueCopy.isNull) + assert(valueCopy.value === ntzVal) + + // independence: mutating the original must not affect the copy + blank.update(ltzVal) + assert(valueCopy.value === ntzVal) + + // null-after-value: setNullAt flips isNull; copy must reflect that + blank.isNull = true + val nullCopy = blank.copy() + assert(nullCopy.isNull) + } + + test("Literal.doGenCode emits getJavaLiteral inline, not a reference object") { + allCases.foreach { case (dt, _, value) => + val literal = Literal.create(value, dt) + val ctx = new CodegenContext + val ev = literal.genCode(ctx) + // The generated value expression must contain the inline fromParts(...) call produced + // by getJavaLiteral, rather than a global field reference (addReferenceObj path). + assert(ev.value.code.contains("fromParts"), + s"expected inline fromParts call for $dt, got: ${ev.value.code}") + assert(!ctx.references.exists(_.isInstanceOf[TimestampNanosVal]), + s"TimestampNanosVal should not be added as a reference object for $dt") + } + } + + test("framework disabled falls back to identical legacy behavior") { + withSQLConf(SQLConf.TYPES_FRAMEWORK_ENABLED.key -> "false") { + allCases.foreach { case (dt, physical, value) => + assert(TypeOps(dt).isEmpty, s"TypeOps should be empty for $dt when disabled") + assert(TypeApiOps(dt).isEmpty, s"TypeApiOps should be empty for $dt when disabled") + checkPhysicalAndLiteralAndCodegen(dt, physical) + checkRowRoundtrip(dt, value) + } + } + } +}