From 8e7346fa04b7ffdfbe75c57cc6b4a1a8c6d6c340 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 29 May 2026 18:14:21 +0000 Subject: [PATCH 1/2] cleanup --- ...utoCdcScd1OutOfOrderConvergenceSuite.scala | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1OutOfOrderConvergenceSuite.scala diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1OutOfOrderConvergenceSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1OutOfOrderConvergenceSuite.scala new file mode 100644 index 000000000000..53ab244833d2 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1OutOfOrderConvergenceSuite.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions +import org.apache.spark.sql.pipelines.autocdc.{ + ColumnSelection, + Scd1BatchProcessor, + UnqualifiedColumnName +} +import org.apache.spark.sql.pipelines.graph.AutoCdcScd1OutOfOrderConvergenceSuite.SourceRow +import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} +import org.apache.spark.sql.test.SharedSparkSession + +object AutoCdcScd1OutOfOrderConvergenceSuite { + /** + * A single CDC event in the source stream. + * + * @param key Identity column (the AutoCDC `keys`). + * @param name Data column (nullable string). + * @param amount Data column (nullable int). + * @param active Data column (nullable boolean). + * @param sequence Sequencing value (the AutoCDC `sequencing` expression). + * @param isDelete Drives the AutoCDC `deleteCondition`; `true` marks the event as a delete, + * `false` as an upsert. Excluded from the target via `columnSelection`. + */ + case class SourceRow( + key: Int, + name: Option[String], + amount: Option[Int], + active: Option[Boolean], + sequence: Long, + isDelete: Boolean) +} + +/** + * Differential test for the SCD1 AutoCDC merge's order-invariance property: feeding the same + * randomly-generated CDC event stream as a single sorted micro-batch and as several shuffled + * micro-batches must converge to the same target table contents. + */ +class AutoCdcScd1OutOfOrderConvergenceSuite + extends ExecutionTest + with SharedSparkSession + with AutoCdcGraphExecutionTestMixin { + + private val numDistinctKeys: Int = 3 + private val maxEventsPerKey: Int = 25 + private val deleteEventProbability: Double = 0.20 + private val triplicateEventProbability: Double = 0.15 + private val numOutOfOrderBatches: Int = 4 + + private val keyColumn: String = "key" + private val nameColumn: String = "name" + private val amountColumn: String = "amount" + private val activeColumn: String = "active" + private val sequenceColumn: String = "sequence" + private val isDeleteColumn: String = "is_delete" + + private val sourceColumnNames: Seq[String] = + Seq(keyColumn, nameColumn, amountColumn, activeColumn, sequenceColumn, isDeleteColumn) + + private def randomOptional[T](rand: Random, value: => T, nullProbability: Double = 0.25) + : Option[T] = + if (rand.nextDouble() < nullProbability) None else Some(value) + + private def randomUpsertOrDelete( + rand: Random, key: Int, sequence: Long, isDelete: Boolean): SourceRow = { + val colorPalette = Seq("red", "blue", "green", "yellow") + SourceRow( + key = key, + name = randomOptional(rand, colorPalette(rand.nextInt(colorPalette.length))), + amount = randomOptional(rand, rand.nextInt(200) - 100), + active = randomOptional(rand, rand.nextBoolean()), + sequence = sequence, + isDelete = isDelete + ) + } + + private def generateRandomCdcEventStream(rand: Random): Seq[SourceRow] = { + var nextSequence: Long = 0L + val events = ArrayBuffer.empty[SourceRow] + (0 until numDistinctKeys).foreach { _ => + val key = rand.nextInt(50) + val numEventsForKey = 1 + rand.nextInt(maxEventsPerKey) + var keyHasLiveRow = false + (0 until numEventsForKey).foreach { _ => + val isDelete = keyHasLiveRow && rand.nextDouble() < deleteEventProbability + keyHasLiveRow = !isDelete + val event = randomUpsertOrDelete(rand, key, nextSequence, isDelete) + nextSequence += 1 + if (rand.nextDouble() < triplicateEventProbability) { + events += event + events += event + events += event + } else { + events += event + } + } + } + events.sortBy(_.sequence).toSeq + } + + /** Build a pipeline context with a single SCD1 AutoCDC flow reading from `stream`. */ + private def buildPipelineContext( + targetTable: String, + stream: MemoryStream[SourceRow]): TestGraphRegistrationContext = { + new TestGraphRegistrationContext(spark) { + registerTable(targetTable, catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = s"${targetTable}_flow", + target = targetTable, + query = dfFlowFunc(stream.toDF().toDF(sourceColumnNames: _*)), + keys = Seq(keyColumn), + sequencing = functions.col(sequenceColumn), + deleteCondition = Some(functions.col(isDeleteColumn) === true), + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName(isDeleteColumn)) + )) + )) + } + } + + private def createTargetTable(targetTable: String): Unit = { + spark.sql( + s"CREATE TABLE $catalog.$namespace.$targetTable (" + + s"$keyColumn INT NOT NULL, " + + s"$nameColumn STRING, " + + s"$amountColumn INT, " + + s"$activeColumn BOOLEAN, " + + s"$sequenceColumn BIGINT NOT NULL, " + + s"$cdcMetadataDdl)" + ) + } + + // The two pipelines see the same logical events but not the same per-row sequence values + // along the way, so `_cdc_metadata` (which holds the per-row delete/upsert sequence) can + // legitimately differ even when the visible row content matches. Drop it before comparing. + private def assertTargetsConverge(inOrderTable: String, outOfOrderTable: String): Unit = { + val cdcMetadataColumn = Scd1BatchProcessor.cdcMetadataColName + checkAnswer( + spark.table(s"$catalog.$namespace.$outOfOrderTable").drop(cdcMetadataColumn), + spark.table(s"$catalog.$namespace.$inOrderTable").drop(cdcMetadataColumn) + ) + } + + private def runConvergenceTest(seed: Long): Unit = { + val session = spark + import session.implicits._ + + val rand = new Random(seed) + val sortedEventStream = generateRandomCdcEventStream(rand) + val shuffledEventStream = rand.shuffle(sortedEventStream) + + withClue( + s"\nseed=$seed\n" + + s"events (${sortedEventStream.size} total, sorted by sequence):\n" + + sortedEventStream.map(r => s" $r").mkString("\n") + "\n" + ) { + val inOrderTable = "inorder_target" + val outOfOrderTable = "outoforder_target" + createTargetTable(inOrderTable) + createTargetTable(outOfOrderTable) + + val inOrderStream = MemoryStream[SourceRow] + val inOrderCtx = buildPipelineContext(inOrderTable, inOrderStream) + inOrderStream.addData(sortedEventStream: _*) + runPipeline(inOrderCtx) + + val outOfOrderStream = MemoryStream[SourceRow] + val outOfOrderCtx = buildPipelineContext(outOfOrderTable, outOfOrderStream) + val totalEvents = shuffledEventStream.size + (0 until numOutOfOrderBatches).foreach { batchIndex => + val batchStart = batchIndex * totalEvents / numOutOfOrderBatches + val batchEnd = (batchIndex + 1) * totalEvents / numOutOfOrderBatches + outOfOrderStream.addData(shuffledEventStream.slice(batchStart, batchEnd): _*) + runPipeline(outOfOrderCtx) + } + + assertTargetsConverge(inOrderTable, outOfOrderTable) + } + } + + test("SCD1 merge converges across micro-batch shuffling for randomly generated " + + "CDC events (seed 1)") { + runConvergenceTest(seed = 1L) + } + + test("SCD1 merge converges across micro-batch shuffling for randomly generated " + + "CDC events (seed 2)") { + runConvergenceTest(seed = 2L) + } + + test("SCD1 merge converges across micro-batch shuffling for randomly generated " + + "CDC events (seed 3)") { + runConvergenceTest(seed = 3L) + } +} From 999f4346871ea9d3bda379395e8e97b03e663e8a Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 29 May 2026 20:26:14 +0000 Subject: [PATCH 2/2] more cleanup --- ...utoCdcScd1OutOfOrderConvergenceSuite.scala | 81 +++++++------------ 1 file changed, 31 insertions(+), 50 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1OutOfOrderConvergenceSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1OutOfOrderConvergenceSuite.scala index 53ab244833d2..a63126f61849 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1OutOfOrderConvergenceSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1OutOfOrderConvergenceSuite.scala @@ -22,11 +22,7 @@ import scala.util.Random import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.functions -import org.apache.spark.sql.pipelines.autocdc.{ - ColumnSelection, - Scd1BatchProcessor, - UnqualifiedColumnName -} +import org.apache.spark.sql.pipelines.autocdc.{ColumnSelection, UnqualifiedColumnName} import org.apache.spark.sql.pipelines.graph.AutoCdcScd1OutOfOrderConvergenceSuite.SourceRow import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession @@ -62,10 +58,18 @@ class AutoCdcScd1OutOfOrderConvergenceSuite with SharedSparkSession with AutoCdcGraphExecutionTestMixin { + // Distinct keys in the generated event stream. private val numDistinctKeys: Int = 3 - private val maxEventsPerKey: Int = 25 + // Upper bound on unique events (one per sequence) generated per key, before intentionally + // duplicating some events. + private val maxUniqueEventsPerKey: Int = 25 + // Probability an event is a delete; (1 - this) is the upsert probability. private val deleteEventProbability: Double = 0.20 - private val triplicateEventProbability: Double = 0.15 + // Probability an event is immediately re-emitted with the same sequence and payload. + private val duplicateEventProbability: Double = 0.15 + // Probability an optional payload column is non-null; (1 - this) is the null probability. + private val nonNullProbability: Double = 0.75 + // Number of microbatches the out-of-order pipeline splits the shuffled events across. private val numOutOfOrderBatches: Int = 4 private val keyColumn: String = "key" @@ -78,18 +82,15 @@ class AutoCdcScd1OutOfOrderConvergenceSuite private val sourceColumnNames: Seq[String] = Seq(keyColumn, nameColumn, amountColumn, activeColumn, sequenceColumn, isDeleteColumn) - private def randomOptional[T](rand: Random, value: => T, nullProbability: Double = 0.25) - : Option[T] = - if (rand.nextDouble() < nullProbability) None else Some(value) - private def randomUpsertOrDelete( rand: Random, key: Int, sequence: Long, isDelete: Boolean): SourceRow = { val colorPalette = Seq("red", "blue", "green", "yellow") SourceRow( key = key, - name = randomOptional(rand, colorPalette(rand.nextInt(colorPalette.length))), - amount = randomOptional(rand, rand.nextInt(200) - 100), - active = randomOptional(rand, rand.nextBoolean()), + name = Option.when(rand.nextDouble() < nonNullProbability)( + colorPalette(rand.nextInt(colorPalette.length))), + amount = Option.when(rand.nextDouble() < nonNullProbability)(rand.nextInt(100)), + active = Option.when(rand.nextDouble() < nonNullProbability)(rand.nextBoolean()), sequence = sequence, isDelete = isDelete ) @@ -98,20 +99,14 @@ class AutoCdcScd1OutOfOrderConvergenceSuite private def generateRandomCdcEventStream(rand: Random): Seq[SourceRow] = { var nextSequence: Long = 0L val events = ArrayBuffer.empty[SourceRow] - (0 until numDistinctKeys).foreach { _ => - val key = rand.nextInt(50) - val numEventsForKey = 1 + rand.nextInt(maxEventsPerKey) - var keyHasLiveRow = false - (0 until numEventsForKey).foreach { _ => - val isDelete = keyHasLiveRow && rand.nextDouble() < deleteEventProbability - keyHasLiveRow = !isDelete + (0 until numDistinctKeys).foreach { key => + val numUniqueEventsForKey = rand.between(1, maxUniqueEventsPerKey + 1) + (0 until numUniqueEventsForKey).foreach { _ => + val isDelete = rand.nextDouble() < deleteEventProbability val event = randomUpsertOrDelete(rand, key, nextSequence, isDelete) nextSequence += 1 - if (rand.nextDouble() < triplicateEventProbability) { - events += event - events += event - events += event - } else { + events += event + if (rand.nextDouble() < duplicateEventProbability) { events += event } } @@ -142,23 +137,19 @@ class AutoCdcScd1OutOfOrderConvergenceSuite private def createTargetTable(targetTable: String): Unit = { spark.sql( s"CREATE TABLE $catalog.$namespace.$targetTable (" + - s"$keyColumn INT NOT NULL, " + - s"$nameColumn STRING, " + - s"$amountColumn INT, " + - s"$activeColumn BOOLEAN, " + - s"$sequenceColumn BIGINT NOT NULL, " + + s"`$keyColumn` INT NOT NULL, " + + s"`$nameColumn` STRING, " + + s"`$amountColumn` INT, " + + s"`$activeColumn` BOOLEAN, " + + s"`$sequenceColumn` BIGINT NOT NULL, " + s"$cdcMetadataDdl)" ) } - // The two pipelines see the same logical events but not the same per-row sequence values - // along the way, so `_cdc_metadata` (which holds the per-row delete/upsert sequence) can - // legitimately differ even when the visible row content matches. Drop it before comparing. private def assertTargetsConverge(inOrderTable: String, outOfOrderTable: String): Unit = { - val cdcMetadataColumn = Scd1BatchProcessor.cdcMetadataColName checkAnswer( - spark.table(s"$catalog.$namespace.$outOfOrderTable").drop(cdcMetadataColumn), - spark.table(s"$catalog.$namespace.$inOrderTable").drop(cdcMetadataColumn) + spark.table(s"$catalog.$namespace.$outOfOrderTable"), + spark.table(s"$catalog.$namespace.$inOrderTable") ) } @@ -199,18 +190,8 @@ class AutoCdcScd1OutOfOrderConvergenceSuite } } - test("SCD1 merge converges across micro-batch shuffling for randomly generated " + - "CDC events (seed 1)") { - runConvergenceTest(seed = 1L) - } - - test("SCD1 merge converges across micro-batch shuffling for randomly generated " + - "CDC events (seed 2)") { - runConvergenceTest(seed = 2L) - } - - test("SCD1 merge converges across micro-batch shuffling for randomly generated " + - "CDC events (seed 3)") { - runConvergenceTest(seed = 3L) + gridTest("SCD1 merge converges across micro-batch shuffling for randomly generated " + + "CDC events, seed")(Seq(1L, 2L, 3L)) { seed => + runConvergenceTest(seed) } }