diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala index 2b0f8e293e76b..8284441e9e2b1 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala @@ -29,4 +29,15 @@ private[pipelines] object AutoCdcReservedNames { /** Common reserved-name prefix shared by AutoCDC internal columns and internal tables. */ val prefix: String = "__spark_autocdc_" + + /** + * Reserved name of the operational metadata column AutoCDC that is projected on every AutoCDC + * microbatch, auxiliary table, and target table. + * + * Shared across all SCD strategies and across the flow resolution, batch-processor, and + * streaming-write layers. + * + * Note that the schema of the CDC metadata column however can and does differ on the SCD-type. + */ + val cdcMetadataColName: String = s"${prefix}metadata" } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 0035f442fb00a..fe651985250ef 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -146,7 +146,7 @@ case class Scd1BatchProcessor( F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null)) validatedMicrobatch.withColumn( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.constructCdcMetadataCol( deleteSequence = rowDeleteSequence, upsertSequence = rowUpsertSequence, @@ -178,7 +178,7 @@ case class Scd1BatchProcessor( schema = microbatchWithCdcMetadataDf.schema, columnSelection = Some( ColumnSelection.ExcludeColumns( - Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName)) + Seq(UnqualifiedColumnName(AutoCdcReservedNames.cdcMetadataColName)) ) ), caseSensitive = caseSensitiveColumnComparison @@ -200,7 +200,7 @@ case class Scd1BatchProcessor( // select. Identifiers could have special characters such as '.'. F.col(QuotingUtils.quoteIdentifier(colName)) }) :+ F.col( - Scd1BatchProcessor.cdcMetadataColName + AutoCdcReservedNames.cdcMetadataColName ) microbatchWithCdcMetadataDf.select( @@ -226,7 +226,7 @@ case class Scd1BatchProcessor( val aliasedMicrobatchDf = microbatchDf.alias("microbatch") val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable") - val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName + val cdcMetadata = AutoCdcReservedNames.cdcMetadataColName val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata") val effectiveSeq = F.greatest( @@ -270,7 +270,7 @@ case class Scd1BatchProcessor( auxiliaryTableIdentifier: TableIdentifier ): Unit = { val auxIdentQuoted = auxiliaryTableIdentifier.quotedString - val meta = Scd1BatchProcessor.cdcMetadataColName + val meta = AutoCdcReservedNames.cdcMetadataColName // Project the reconciled microbatch down to just keys + `_cdc_metadata`; data columns are // irrelevant for the auxiliary table and should not be persisted. @@ -333,7 +333,7 @@ case class Scd1BatchProcessor( reconciledMicrobatchDf: DataFrame, targetTableIdentifier: TableIdentifier ): Unit = { - val meta = Scd1BatchProcessor.cdcMetadataColName + val meta = AutoCdcReservedNames.cdcMetadataColName val destinationTableStr = targetTableIdentifier.quotedString // (Re-)alias the reconciled microbatch DF for easy reference for the remainder of the merge. @@ -415,7 +415,7 @@ case class Scd1BatchProcessor( val resolver = microbatchSqlConf.resolver microbatch.schema.fieldNames - .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) + .find(resolver(_, AutoCdcReservedNames.cdcMetadataColName)) .foreach { conflictingColumnName => throw new AnalysisException( errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", @@ -423,7 +423,7 @@ case class Scd1BatchProcessor( "caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis), "columnName" -> conflictingColumnName, "schemaName" -> "microbatch", - "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + "reservedColumnName" -> AutoCdcReservedNames.cdcMetadataColName ) ) } @@ -437,7 +437,6 @@ object Scd1BatchProcessor { * enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction. */ private[autocdc] val winningRowColName: String = s"${AutoCdcReservedNames.prefix}winning_row" - private[pipelines] val cdcMetadataColName: String = s"${AutoCdcReservedNames.prefix}metadata" private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence" private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence" diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala new file mode 100644 index 0000000000000..4566cfad0894e --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{functions => F} +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.types.{DataType, StructType} + +/** + * Per-microbatch processor for SCD Type 2 AutoCDC flows, complying to the specified + * [[changeArgs]] configuration. + * + * @param changeArgs The CDC flow configuration. + * @param resolvedSequencingType The post-analysis [[DataType]] of the sequencing column, derived + * from the flow's resolved DataFrame at flow setup time. + */ +case class Scd2BatchProcessor( + changeArgs: ChangeArgs, + resolvedSequencingType: DataType) { + + /** + * Reconcile a CDC microbatch into the canonical form the auxiliary- and target-table merges + * consume. + * + * Step ordering is load-bearing: the row-extension steps reference user data columns that + * target-column selection is allowed to drop, so selection runs last. Unlike SCD1, no per-key + * deduplication step is needed - SCD2 preserves every event as part of the row's history. + * + * Requires the microbatch to have been validated upstream so that the sequencing column is + * non-null and orderable. + */ + private[autocdc] def preprocessMicrobatch(validatedBatchDf: DataFrame): DataFrame = { + validatedBatchDf + .transform(extendMicrobatchRowsWithStartAt) + .transform(extendMicrobatchRowsWithEndAt) + .transform(extendMicrobatchRowsWithCdcMetadata) + .transform(projectTargetColumnsOntoMicrobatch) + } + + /** + * Stamp each microbatch row with its currently known start-at (i.e active-from) using its + * sequencing. + */ + private def extendMicrobatchRowsWithStartAt(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = Scd2BatchProcessor.startAtColName, + col = changeArgs.sequencing.cast(resolvedSequencingType) + ) + } + + /** + * Stamp each microbatch delete event row with its end time sequence, as they are instantaneous + * events. + * + * Non-deletes leave a null end, as do not yet know if the row reprsents an active upsert, or a + * closed upsert. This will become clear in later reconciliation against the aux/target tables. + */ + private def extendMicrobatchRowsWithEndAt(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = Scd2BatchProcessor.endAtColName, + col = ( + changeArgs.deleteCondition match { + case Some(deleteCondition) => + F.when(deleteCondition, changeArgs.sequencing).otherwise(null) + case None => + F.lit(null) + } + ).cast(resolvedSequencingType) + ) + } + + /** + * Project the operational CDC metadata column carrying the literal event sequence. Downstream + * merges rely on it to preserve original event lineage regardless of how rows start/end-at are + * coalesced. + */ + private def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = AutoCdcReservedNames.cdcMetadataColName, + col = Scd2BatchProcessor.constructCdcMetadataStruct( + recordStartAt = changeArgs.sequencing, + sequencingType = resolvedSequencingType + ) + ) + } + + /** + * Apply the user's target column selection while preserving the SCD2 framework columns; the + * latter are required by downstream merges and persisted to both the auxiliary and target + * tables, so users cannot deselect them. + * + * Requires the framework columns to already be present on the input. + */ + private def projectTargetColumnsOntoMicrobatch( + microbatch: DataFrame + ): DataFrame = { + val dataSchema = StructType( + microbatch.schema.fields.filterNot(f => + Scd2BatchProcessor.reservedFrameworkColNames.contains(f.name) + ) + ) + val userSelectedDataSchema = + ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = dataSchema, + columnSelection = changeArgs.columnSelection, + caseSensitive = + microbatch.sparkSession.sessionState.conf.caseSensitiveAnalysis + ) + val finalColumnsToSelect: Seq[Column] = + userSelectedDataSchema.fieldNames.toSeq.map(colName => { + // Spark drops backticks in the schema, quote all identifiers for safety before executing + // select. Identifiers could have special characters such as '.'. + F.col(QuotingUtils.quoteIdentifier(colName)) + }) ++ Seq( + F.col(Scd2BatchProcessor.startAtColName), + F.col(Scd2BatchProcessor.endAtColName), + F.col(AutoCdcReservedNames.cdcMetadataColName) + ) + microbatch.select(finalColumnsToSelect: _*) + } + +} + +/** + * Concept: run of upsert events. + * + * A run is a maximal sequence of consecutive upsert events (in sorted order by sequencing) + * for the same key whose tracked-history-column values are all identical. The transition + * from a previous run's tail to a new run's head represents a real state change; every + * subsequent event in the run is a no-op continuation that logically coalesces with the head. + * + * Runs matter because SCD2 only emits a new visible historical row when a + * tracked-history column actually changes. By convention we choose that only the tail of a + * run produces a visible row in the target table; the rest become hidden rows in the aux + * table. Selecting the tail means the latest no-op upsert is reflected in the target table. + * + * Example, with trackHistoryCols = [name], events for some key: + * (S=5, name=Alice) -> starts run head at S=5. Row lives in aux table. + * (S=10, name=Alice) -> no-op, adds to run at S=5. Row lives in aux table. + * (S=15, name=Alice) -> no-op and tail of run at S=5. Row lives in target table with + * START_AT=5. + * (S=20, name=Charlie) -> new run head/tail (run size=1) at S=20. Row lives in target + * table. + * + * Now if a new late-arriving event (S=12, name=Bob) arrives for the same key, we have: + * (S=5, name=Alice) -> starts run head at S=5. Row lives in aux table. + * (S=10, name=Alice) -> no-op but now tail of run at S=5. Row now lives in target + * table with START_AT=5. + * (S=12, name=Bob) -> new run head/tail (run size=1) at S=12. Row lives in target + * table. + * (S=15, name=Alice) -> previously-visible tail converts to a new run head at S=15. Row + * remains in target table, but now with START_AT=15. + * (S=20, name=Charlie) -> new run head at S=20. Row lives in target table. + * + * Note that if we did not track the no-op events in the aux table for the run at S=5 before the + * event (S=12, name=Bob) arrived, then we would not have correctly reconciled that the event + * (S=10, name=Alice) is now the visible tail of the Alice run before Bob. + * + * ------------- + * Concept: target table. + * + * The user-consumable output table of the CDC transformation. Every row in the target table + * represents the visible tail of a run (maybe size 1), carrying the run head's START_AT and the + * latest row values for that run. The target table in its entirety represents the SCD2 + * representation of the CDC flow's source table. + * + * ------------- + * Concept: aux table. + * + * The side state table used to track out of order events from the CDC source. Two classes + * of events are represented as rows in this table: + * 1. Early-arriving deletes, with no matching upsert; this is considered a tombstone, + * and may match with a late-arriving upsert in a future microbatch. + * 2. No-op upserts (i.e. tails of runs); hidden no-op rows that may reconcile as + * state-changing run heads in a future microbatch. + * + * The aux table is considered an internal table that users should neither tamper nor consider + * public contract. + * + * ------------- + * Concept: same-sequence tie-break between an upsert and a delete. + * + * When an upsert event and a delete event share the same `__RECORD_START_AT`, the delete wins: + * the visible upsert is dropped (as a zero-width interval) and only the tombstone is written + * to the aux table. The reverse pair (delete arriving first, then an upsert at the same + * sequence) is symmetric: the tombstone closes the upsert at the same instant, again leaving + * a zero-width visible interval that is dropped, and only the tombstone survives. + * + * This tie-break is an internal contract only - we do not publicly guarantee deterministic + * resolution when two events for the same key share a sequence value. Users who care about + * ordering should ensure their sequencing column is unique per (key, event). + */ +object Scd2BatchProcessor { + /** + * Metadata field that represents the exact time (sequence) of the CDC event that produced + * this row. Null only for synthetic decomposition tails. + */ + private[autocdc] val recordStartAtFieldName: String = "__RECORD_START_AT" + + /** + * What this column represents depends on which AutoCDC artifact table it is read from. + * + * In the target table: + * The user-visible column representing when this row is considered active from, i.e. + * this upsert run's head [[recordStartAtFieldName]]. + * In the aux table: + * If this row represents a tombstone, then the same value as [[recordStartAtFieldName]]. + * Else this row represents a coalesced no-op row that is part of an upsert run. + * Inherit the [[recordStartAtFieldName]] of the head of this upsert's run. + * + * The invariant in both tables is: startAtColName <= recordStartAtFieldName. If an event was + * generated at time X, it is active by time X, or earlier if it is not a run head. + */ + private[autocdc] val startAtColName: String = "__START_AT" + + /** + * What this column represents depends on which AutoCDC artifact table it is read from. + * + * In the target table: + * The user-visible column representing when this row became inactive. Null IFF the row + * is active: neither superseded by a state-changing upsert nor affected by a delete. + * In the aux table: + * If this row is a tombstone, then by convention the sequence of the delete event that + * produced it. Delete events are considered instantaneous in time. + * Else this row is a coalesced no-op row that is part of an upsert run, and by + * convention the value will always be null. + */ + private[autocdc] val endAtColName: String = "__END_AT" + + /** + * Column names reserved by AutoCDC, that will be projected onto the microbatch and target + * tables. If the user's source dataframe contains any of these columns, SCD2 reconciliation + * will fail. + */ + private val reservedFrameworkColNames: Set[String] = Set( + startAtColName, + endAtColName, + AutoCdcReservedNames.cdcMetadataColName + ) + + /** + * Construct the CDC metadata struct column for SCD1, following the exact schema and field + * ordering defined by [[cdcMetadataColSchema]]. + */ + def constructCdcMetadataStruct( + recordStartAt: Column, + sequencingType: DataType + ): Column = { + F.struct( + recordStartAt.cast(sequencingType).as(recordStartAtFieldName) + ) + } +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala index 740533d7504ec..95fbbf1601f9e 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala @@ -289,7 +289,7 @@ class AutoCdcMergeFlow( // CDC operational metadata column at the end. StructType( userSelectedSchema.fields :+ StructField( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.cdcMetadataColSchema(sequencingType), nullable = false ) @@ -335,7 +335,7 @@ class AutoCdcMergeFlow( deleteSequence = F.lit(null), upsertSequence = F.lit(null), sequencingType = sequencingType - ).as(Scd1BatchProcessor.cdcMetadataColName) + ).as(AutoCdcReservedNames.cdcMetadataColName) df.select(userSelectedCols :+ emptyCdcMetadataCol: _*) case ScdType.Type2 => diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala index 0d1c33be21727..3841495f01c8f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala @@ -688,11 +688,12 @@ class Scd1MergeStreamingWrite( /** CDC metadata field resolved out of the flow's augmented schema. */ private lazy val cdcMetadataField: StructField = { val resolver = updateContext.spark.sessionState.conf.resolver + val cdcMetadataColName = AutoCdcReservedNames.cdcMetadataColName flow.schema.fields - .find(field => resolver(field.name, Scd1BatchProcessor.cdcMetadataColName)) + .find(field => resolver(field.name, cdcMetadataColName)) .getOrElse( throw SparkException.internalError( - s"CDC metadata column '${Scd1BatchProcessor.cdcMetadataColName}' was not found in the " + + s"CDC metadata column '$cdcMetadataColName' was not found in the " + s"AutoCDC flow's target table schema." ) ) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala index 0dc0a90276600..8688df071113b 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala @@ -82,7 +82,7 @@ trait AutoCdcCatalogExecutionTestBase { } /** - * Schema of the [[Scd1BatchProcessor.cdcMetadataColName]] struct column for a given + * Schema of the [[AutoCdcReservedNames.cdcMetadataColName]] struct column for a given * sequencing column type. Defaults to [[LongType]] because all current SCD1 tests use * `Long` sequencing. */ @@ -92,7 +92,7 @@ trait AutoCdcCatalogExecutionTestBase { .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, sequencingType) /** - * Build a [[Row]] matching the [[Scd1BatchProcessor.cdcMetadataColName]] struct's two fields, + * Build a [[Row]] matching the [[AutoCdcReservedNames.cdcMetadataColName]] struct's two fields, * in the order produced by [[Scd1BatchProcessor.constructCdcMetadataCol]]: */ protected def cdcMetadataRow[T](deleteSeq: Option[T], upsertSeq: Option[T]): Row = diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala index 932110b94afd1..c3dfa36109dcc 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala @@ -192,7 +192,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { /** Convenience to extract the [[StructType]] of the projected `_cdc_metadata` column. */ private def cdcMetadataStruct(schema: StructType): StructType = - schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + schema(AutoCdcReservedNames.cdcMetadataColName).dataType.asInstanceOf[StructType] test( "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when no " + @@ -206,7 +206,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { .add("seq", LongType) .add( StructField( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.cdcMetadataColSchema(LongType), nullable = false ) @@ -229,7 +229,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { .add("seq", LongType) .add( StructField( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.cdcMetadataColSchema(LongType), nullable = false ) @@ -250,7 +250,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { .add("seq", LongType) .add( StructField( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.cdcMetadataColSchema(LongType), nullable = false ) @@ -276,7 +276,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with nullable inner fields") { val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf()) - val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName) + val metaField = resolvedFlow.schema(AutoCdcReservedNames.cdcMetadataColName) assert(!metaField.nullable, "_cdc_metadata column itself must be non-null") val metaStruct = metaField.dataType.asInstanceOf[StructType] @@ -336,7 +336,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { // The user-selected portion drops `name`; the trailing column is the SCD1 metadata. assert( loadedDf.schema.fieldNames.toSeq == - Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName) + Seq("id", "seq", AutoCdcReservedNames.cdcMetadataColName) ) } @@ -351,7 +351,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { assert(loadedDf.schema == resolvedFlow.schema) assert( loadedDf.schema.fieldNames.toSeq == - Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName) + Seq("id", "seq", AutoCdcReservedNames.cdcMetadataColName) ) } @@ -448,7 +448,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { // Locks in the previous engine-level guard (Scd1BatchProcessor.extendMicrobatchRowsWith // CdcMetadata) at flow-construction time. Any future regression where a user-supplied // CDC stream carries the reserved metadata column name should fail eagerly here. - val sourceDf = sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType) + val sourceDf = sourceDfWithExtraColumns(AutoCdcReservedNames.cdcMetadataColName -> StringType) checkError( exception = intercept[AnalysisException] { @@ -458,7 +458,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { sqlState = "42710", parameters = Map( "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, - "columnName" -> Scd1BatchProcessor.cdcMetadataColName, + "columnName" -> AutoCdcReservedNames.cdcMetadataColName, "schemaName" -> "changeDataFeed", "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix ) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala index 475d25f5aa2cf..1aa2cbcd5417b 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala @@ -47,13 +47,13 @@ class Scd1BatchProcessorMergeSuite */ private val minimalSchema: StructType = new StructType() .add("id", IntegerType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) /** Minimal target-table shape: one key, one data column, and CDC metadata. */ private val targetSchema: StructType = new StructType() .add("id", IntegerType) .add("value", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) /** * A processor with a single key column `id`. `sequencing` is irrelevant for @@ -85,7 +85,7 @@ class Scd1BatchProcessorMergeSuite val withKeys = keyColumns.foldLeft(new StructType()) { case (s, (name, dt)) => s.add(name, dt) } - withKeys.add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + withKeys.add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) } /** @@ -116,7 +116,7 @@ class Scd1BatchProcessorMergeSuite .add("id", IntegerType) .add("value", StringType) .add( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, new StructType() .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) @@ -446,7 +446,7 @@ class Scd1BatchProcessorMergeSuite // The schema always stores the backtick consumed column name, so unticked the raw name here. .add(rawKeyName, IntegerType) .add("value", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) createTable( defaultTargetIdent, diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 9432150c40167..d2c78442c4762 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -33,7 +33,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { .add("name", StringType) .add("age", IntegerType) .add( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, new StructType() .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) @@ -596,7 +596,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Original columns are preserved in their original order, with CDC metadata appended at // the very end. assert(result.schema.fieldNames.toSeq == - schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName) + schema.fieldNames.toSeq :+ AutoCdcReservedNames.cdcMetadataColName) } test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence fields to " + @@ -624,7 +624,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch) val cdcMetadataDataType = - resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + resultDf.schema(AutoCdcReservedNames.cdcMetadataColName).dataType.asInstanceOf[StructType] assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq( Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType, Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType)) @@ -723,7 +723,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { val result = processor.projectTargetColumnsOntoMicrobatch(batch) assert(result.schema.fieldNames.toSeq == - Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName)) checkAnswer( df = result, expectedAnswer = Row(1, 30, Row(null, 10L)) @@ -753,7 +753,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { assert( result.schema.fieldNames.toSeq == - Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName) + Seq("id", "name", AutoCdcReservedNames.cdcMetadataColName) ) checkAnswer( df = result, @@ -785,7 +785,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // in which the user listed columns in IncludeColumns. The CDC metadata column is appended // last as always. assert(result.schema.fieldNames.toSeq == - Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName)) checkAnswer( df = result, @@ -800,7 +800,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Even if a column is created with backticks via DDL, those backticks are consumed by Spark // before resolving the schema; they won't show up in the schema field. .add("user.id", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) val batch = microbatchOf(schema)( Row(1, "u-100", Row(null, 10L)) @@ -826,7 +826,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { val result = processor.projectTargetColumnsOntoMicrobatch(batch) assert(result.schema.fieldNames.toSeq == - Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName)) + Seq("id", "user.id", AutoCdcReservedNames.cdcMetadataColName)) checkAnswer( df = result, expectedAnswer = Row(1, "u-100", Row(null, 10L)) @@ -860,7 +860,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Output column names follow the microbatch schema's casing, not the casing in the user's // columnSelection. The CDC metadata column is appended last as always. assert(result.schema.fieldNames.toSeq == - Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName)) checkAnswer( df = result, expectedAnswer = Row(1, 30, Row(null, 10L)) @@ -880,7 +880,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Data column. .add("value", StringType) // CDC metadata column. - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) /** * Schema for the auxiliary input to [[Scd1BatchProcessor.applyTombstonesToMicrobatch]] tests. @@ -893,7 +893,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Key column. .add("id", IntegerType) // CDC metadata column. - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) test("applyTombstonesToMicrobatch drops late-arriving deletes and upserts when a matching " + "tombstone exists for the same key") { @@ -1015,7 +1015,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { val schema = new StructType() .add("region", StringType) .add("customer_id", IntegerType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) val microbatch = microbatchOf(schema)( Row("US", 1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))), @@ -1051,7 +1051,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { test("applyTombstonesToMicrobatch supports backticked key names containing a literal dot") { val schema = new StructType() .add("user.id", IntegerType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) val microbatch = microbatchOf(schema)( Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala index 76790847ede5c..bb8043e720c65 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala @@ -41,12 +41,12 @@ class Scd1ForeachBatchHandlerSuite private val auxiliarySchema = new StructType() .add("id", IntegerType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) private val targetSchema = new StructType() .add("id", IntegerType) .add("value", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) private val processor = Scd1BatchProcessor( changeArgs = ChangeArgs( @@ -155,11 +155,11 @@ class Scd1ForeachBatchHandlerSuite val compositeAuxSchema = new StructType() .add("country", StringType) .add("city", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) val compositeTargetSchema = new StructType() .add("country", StringType) .add("city", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) val compositeProcessor = Scd1BatchProcessor( changeArgs = ChangeArgs( @@ -492,12 +492,12 @@ class Scd1ForeachBatchHandlerSuite val compositeAuxSchema = new StructType() .add("country", StringType) .add("city", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) val compositeTargetSchema = new StructType() .add("country", StringType) .add("city", StringType) .add("population", LongType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) val compositeProcessor = Scd1BatchProcessor( changeArgs = ChangeArgs( diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala new file mode 100644 index 0000000000000..80dd4975b7304 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{functions => F, QueryTest, Row} +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +class Scd2BatchProcessorSuite extends QueryTest with SharedSparkSession { + + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + // =============== preprocessMicrobatch tests =============== + + test("preprocessMicrobatch appends framework columns __START_AT, __END_AT, " + + "_cdc_metadata at the end of the schema in that order") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)(Row(1, 10L, "a")) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2 + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "seq", "value", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + + test("preprocessMicrobatch returns an empty DataFrame with the full preprocessed schema") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)() + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2 + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.collect().isEmpty) + assert(result.schema.fieldNames.toSeq == Seq( + "id", "seq", "value", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + + test("preprocessMicrobatch stamps __START_AT, __END_AT, and __RECORD_START_AT correctly " + + "across delete and upsert events for the same key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + .add("is_delete", BooleanType) + + // All three events target the same key. SCD2 must preserve every event in the output - + // unlike SCD1, no per-key deduplication is performed; this also implicitly pins the + // no-dedup contract of preprocessMicrobatch. + val batch = microbatchOf(schema)( + Row(1, 10L, "first-upsert", false), + Row(1, 20L, "second-upsert", false), + Row(1, 30L, null, true) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = Some(F.col("is_delete")) + ), + resolvedSequencingType = LongType + ) + + // Per-row contract for the framework columns: + // - __START_AT = sequencing for every row (the active-from time) + // - __END_AT = sequencing for delete rows; null for upserts (mutual exclusion) + // - __RECORD_START_AT = sequencing for every row, regardless of delete vs upsert + // (lineage preserved into the merge step) + checkAnswer( + df = processor.preprocessMicrobatch(batch), + expectedAnswer = Seq( + Row(1, 10L, "first-upsert", false, 10L, null, Row(10L)), + Row(1, 20L, "second-upsert", false, 20L, null, Row(20L)), + Row(1, 30L, null, true, 30L, 30L, Row(30L)) + ) + ) + } + + test("preprocessMicrobatch leaves __END_AT null on every row when deleteCondition is None") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a"), + Row(2, 20L, "b") + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = None + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.preprocessMicrobatch(batch).select( + F.col(Scd2BatchProcessor.endAtColName) + ), + expectedAnswer = Seq(Row(null), Row(null)) + ) + } + + test("preprocessMicrobatch treats null deleteCondition results as upsert " + + "(__END_AT stays null)") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + // is_delete is null - the delete condition evaluates to null, which Spark treats as the + // otherwise branch, so the row is classified as an upsert. + Row(1, 10L, null) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = Some(F.col("is_delete")) + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.preprocessMicrobatch(batch).select( + F.col(Scd2BatchProcessor.endAtColName) + ), + expectedAnswer = Row(null) + ) + } + + test("preprocessMicrobatch evaluates an arbitrary sequencing expression per-row") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("alt_seq", LongType) + .add("value", StringType) + + // Sequencing is a function call referencing multiple columns, not a bare identifier. Locks + // in that the framework columns evaluate the full expression per-row rather than treating + // `sequencing` as a single column reference. + val batch = microbatchOf(schema)( + // greatest(10, 30) = 30 + Row(1, 10L, 30L, "row1"), + // greatest(40, 20) = 40 + Row(2, 40L, 20L, "row2") + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.greatest(F.col("seq"), F.col("alt_seq")), + storedAsScdType = ScdType.Type2 + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + checkAnswer( + df = result.select( + F.col(Scd2BatchProcessor.startAtColName), + F.col(s"${AutoCdcReservedNames.cdcMetadataColName}." + + s"${Scd2BatchProcessor.recordStartAtFieldName}") + ), + expectedAnswer = Seq( + Row(30L, 30L), + Row(40L, 40L) + ) + ) + } + + /** Schema reused by columnSelection tests: id (key), name, age, seq (sequencing). */ + private val multiUserColSchema: StructType = new StructType() + .add("id", IntegerType) + .add("name", StringType) + .add("age", IntegerType) + .add("seq", LongType) + + test("preprocessMicrobatch keeps every user column when columnSelection is None") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + columnSelection = None + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "name", "age", "seq", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + + test("preprocessMicrobatch retains framework columns even when IncludeColumns omits them") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "age", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, 10L, null, Row(10L)) + ) + } + + test("preprocessMicrobatch drops user columns listed in ExcludeColumns; " + + "framework columns survive") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("name")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "age", "seq", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, 10L, 10L, null, Row(10L)) + ) + } + + test("preprocessMicrobatch preserves the microbatch schema's user-column order, " + + "ignoring the order of IncludeColumns") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + // User specifies (age, id) - intentionally different from the schema order (id, age). + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + // Output column order follows the microbatch schema (id before age), not the user's listing + // order in IncludeColumns. Framework columns are always appended last. + assert(result.schema.fieldNames.toSeq == Seq( + "id", "age", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + + test("preprocessMicrobatch resolves columnSelection case-insensitively " + + "when SQLConf.CASE_SENSITIVE=false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + // User columns intentionally use a different case than the schema (id, age). + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + // Output column names follow the microbatch schema's casing, not the user's casing. + assert(result.schema.fieldNames.toSeq == Seq( + "id", "age", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + } + + test("preprocessMicrobatch handles backticked column names containing a literal dot") { + val schema = new StructType() + .add("id", IntegerType) + // Even if a column is created with backticks via DDL, those backticks are consumed by Spark + // before resolving the schema; they won't show up in the schema field. + .add("user.id", StringType) + .add("seq", LongType) + + val batch = microbatchOf(schema)( + Row(1, "u-100", 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq( + UnqualifiedColumnName("id"), + UnqualifiedColumnName("`user.id`") + ) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "user.id", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + checkAnswer( + df = result, + expectedAnswer = Row(1, "u-100", 10L, null, Row(10L)) + ) + } + + test("preprocessMicrobatch correctly populates framework columns even when ExcludeColumns " + + "drops the columns referenced by sequencing and deleteCondition") { + val schema = new StructType() + .add("id", IntegerType) + .add("value", StringType) + // Both seq and is_delete are referenced by the flow's sequencing / deleteCondition + // expressions, but the user wants them excluded from the target table. + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, "alice", 10L, false), + Row(1, null, 20L, true) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = Some(F.col("is_delete")), + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("seq"), UnqualifiedColumnName("is_delete")) + )) + ), + resolvedSequencingType = LongType + ) + + // The orchestrator runs row-extension steps before column selection, so the framework + // columns reference seq / is_delete fully even though the final projection drops them. + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "value", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "alice", 10L, null, Row(10L)), + Row(1, null, 20L, 20L, Row(20L)) + ) + ) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala index 5ebdb4b4c86d2..8538ef92a588b 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.pipelines.autocdc.{ + AutoCdcReservedNames, ChangeArgs, ColumnSelection, Scd1BatchProcessor, @@ -145,7 +146,7 @@ trait AutoCdcGraphExecutionTestMixin extends BeforeAndAfterEach { * Assumes sequence type is BIGINT (Long). */ protected val cdcMetadataDdl: String = { - val col = Scd1BatchProcessor.cdcMetadataColName + val col = AutoCdcReservedNames.cdcMetadataColName val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName s"$col STRUCT<$del:BIGINT,$ups:BIGINT> NOT NULL" diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala index 5a9f6cb6710be..78dbb70027b45 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala @@ -21,8 +21,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.functions import org.apache.spark.sql.pipelines.autocdc.{ + AutoCdcReservedNames, ColumnSelection, - Scd1BatchProcessor, UnqualifiedColumnName } import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} @@ -157,7 +157,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite // The auxiliary table only contains keys and the metadata column, hence "name" should not be // included. - assert(auxSchema.fieldNames.toSeq == Seq("id", Scd1BatchProcessor.cdcMetadataColName)) + assert(auxSchema.fieldNames.toSeq == Seq("id", AutoCdcReservedNames.cdcMetadataColName)) assert(getAuxTableKeyColumnNames(target = "target") == Seq("id")) } @@ -195,7 +195,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite val auxSchema = spark.table(auxTableNameFor("target")).schema assert(auxSchema.fieldNames.toSeq == - Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName)) + Seq("region", "id", AutoCdcReservedNames.cdcMetadataColName)) assert(getAuxTableKeyColumnNames(target = "target") == Seq("region", "id")) } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala index 46f8ee47db02f..a5f3a13a012a6 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.pipelines.graph import org.apache.spark.sql.Row import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.functions -import org.apache.spark.sql.pipelines.autocdc.Scd1BatchProcessor +import org.apache.spark.sql.pipelines.autocdc.AutoCdcReservedNames import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession @@ -147,8 +147,8 @@ class AutoCdcScd1TargetTableDurabilitySuite val schema = spark.table(s"$catalog.$namespace.target").schema assert( - schema.fieldNames.contains(Scd1BatchProcessor.cdcMetadataColName), - s"Target must have ${Scd1BatchProcessor.cdcMetadataColName} after first AutoCDC run; " + + schema.fieldNames.contains(AutoCdcReservedNames.cdcMetadataColName), + s"Target must have ${AutoCdcReservedNames.cdcMetadataColName} after first AutoCDC run; " + s"got ${schema.fieldNames.toSeq}" ) checkAnswer(