diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala index 2aae08012b..354c9b8f2a 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala @@ -28,9 +28,10 @@ class FlussAppendPartitionReader( tablePath: TablePath, projection: Array[Int], pushedPredicate: Option[Predicate], + limit: Option[Int], flussPartition: FlussAppendInputPartition, flussConfig: Configuration) - extends FlussPartitionReader(tablePath, flussConfig) { + extends FlussPartitionReader(tablePath, flussConfig, limit) { override protected lazy val projectedRowType: RowType = rowType.project(projection) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index 87f2fdad0f..5e1dbf9eaf 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -39,6 +39,7 @@ abstract class FlussBatch( tablePath: TablePath, tableInfo: TableInfo, readSchema: StructType, + limit: Option[Int], flussConfig: Configuration) extends Batch with AutoCloseable { @@ -114,9 +115,10 @@ class FlussAppendBatch( readSchema: StructType, pushedPredicate: Option[Predicate], partitionPredicate: Option[Predicate], + limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) - extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { + extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) { override val startOffsetsInitializer: OffsetsInitializer = { FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) @@ -199,6 +201,7 @@ class FlussAppendBatch( tablePath, projection, pushedPredicate, + limit, options, flussConfig) } @@ -211,9 +214,10 @@ class FlussUpsertBatch( tableInfo: TableInfo, readSchema: StructType, partitionPredicate: Option[Predicate], + limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) - extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { + extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) { override val startOffsetsInitializer: OffsetsInitializer = { val offsetsInitializer = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) @@ -247,6 +251,6 @@ class FlussUpsertBatch( } override def createReaderFactory(): PartitionReaderFactory = { - new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig) + new FlussUpsertPartitionReaderFactory(tablePath, projection, limit, options, flussConfig) } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala index f17dbd4608..f7bf6722e1 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala @@ -271,7 +271,7 @@ class FlussAppendMicroBatchStream( checkpointLocation) { override def createReaderFactory(): PartitionReaderFactory = { - new FlussAppendPartitionReaderFactory(tablePath, projection, None, options, flussConfig) + new FlussAppendPartitionReaderFactory(tablePath, projection, None, None, options, flussConfig) } override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = { @@ -352,6 +352,6 @@ class FlussUpsertMicroBatchStream( } override def createReaderFactory(): PartitionReaderFactory = { - new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig) + new FlussUpsertPartitionReaderFactory(tablePath, projection, None, options, flussConfig) } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala index 5d43303857..23a9caa4f4 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala @@ -34,7 +34,10 @@ import org.apache.spark.sql.connector.read.PartitionReader import java.time.Duration -abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: Configuration) +abstract class FlussPartitionReader( + tablePath: TablePath, + flussConfig: Configuration, + limit: Option[Int]) extends PartitionReader[InternalRow] with Logging { @@ -57,6 +60,9 @@ abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: Configura def next0(): Boolean override def next(): Boolean = { + if (limit.isDefined && numRowsRead >= limit.get) { + return false + } val hasNext = next0() if (hasNext) { numRowsRead += 1 diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala index 2204bda365..13361532e9 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala @@ -30,6 +30,7 @@ class FlussAppendPartitionReaderFactory( tablePath: TablePath, projection: Array[Int], pushedPredicate: Option[Predicate], + limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends PartitionReaderFactory { @@ -40,6 +41,7 @@ class FlussAppendPartitionReaderFactory( tablePath, projection, pushedPredicate, + limit, flussPartition, flussConfig ) @@ -50,6 +52,7 @@ class FlussAppendPartitionReaderFactory( class FlussUpsertPartitionReaderFactory( tablePath: TablePath, projection: Array[Int], + limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends PartitionReaderFactory { @@ -59,6 +62,7 @@ class FlussUpsertPartitionReaderFactory( new FlussUpsertPartitionReader( tablePath, projection, + limit, upsertPartition, flussConfig ) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala index c5379cfd8c..e4965c814f 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala @@ -43,6 +43,8 @@ trait FlussScan extends Scan { def partitionPredicate: Option[FlussPredicate] = None + def limit: Option[Int] = None + protected def scanType: String override def readSchema(): StructType = { @@ -54,10 +56,14 @@ trait FlussScan extends Scan { val withPushed = if (pushedSparkPredicates.isEmpty) base else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]" - partitionPredicate match { + val withPartition = partitionPredicate match { case Some(p) => s"$withPushed [PartitionFilter: $p]" case None => withPushed } + limit match { + case Some(l) => s"$withPartition [Limit: $l]" + case None => withPartition + } } override def supportedCustomMetrics(): Array[CustomMetric] = @@ -72,6 +78,7 @@ case class FlussAppendScan( pushedPredicate: Option[FlussPredicate], override val partitionPredicate: Option[FlussPredicate], override val pushedSparkPredicates: Seq[Predicate], + override val limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussScan { @@ -85,6 +92,7 @@ case class FlussAppendScan( readSchema, pushedPredicate, partitionPredicate, + limit, options, flussConfig) } @@ -107,6 +115,7 @@ case class FlussLakeAppendScan( requiredSchema: Option[StructType], pushedPredicate: Option[FlussPredicate], override val pushedSparkPredicates: Seq[Predicate], + override val limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussScan { @@ -119,6 +128,7 @@ case class FlussLakeAppendScan( tableInfo, readSchema, pushedPredicate, + limit, options, flussConfig) } @@ -140,6 +150,7 @@ case class FlussUpsertScan( tableInfo: TableInfo, requiredSchema: Option[StructType], override val partitionPredicate: Option[FlussPredicate], + override val limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussScan { @@ -147,7 +158,14 @@ case class FlussUpsertScan( override protected val scanType: String = "Upsert" override def toBatch: Batch = { - new FlussUpsertBatch(tablePath, tableInfo, readSchema, partitionPredicate, options, flussConfig) + new FlussUpsertBatch( + tablePath, + tableInfo, + readSchema, + partitionPredicate, + limit, + options, + flussConfig) } override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { @@ -168,6 +186,7 @@ case class FlussLakeUpsertScan( requiredSchema: Option[StructType], pushedPredicate: Option[FlussPredicate], override val pushedSparkPredicates: Seq[Predicate], + override val limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussScan { @@ -180,6 +199,7 @@ case class FlussLakeUpsertScan( tableInfo, readSchema, pushedPredicate, + limit, options, flussConfig) } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala index 9e1c0d4545..64720dc2e9 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala @@ -24,7 +24,7 @@ import org.apache.fluss.spark.read.lake.{FlussLakeBatch, FlussLakeUtils} import org.apache.fluss.spark.utils.{SparkPartitionPredicate, SparkPredicateConverter} import org.apache.spark.sql.connector.expressions.filter.Predicate -import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters} +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownLimit, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -33,13 +33,22 @@ import java.util.{Collections, IdentityHashMap, Set => JSet} import scala.collection.JavaConverters._ /** An interface that extends from Spark [[ScanBuilder]]. */ -trait FlussScanBuilder extends ScanBuilder with SupportsPushDownRequiredColumns { +trait FlussScanBuilder + extends ScanBuilder + with SupportsPushDownRequiredColumns + with SupportsPushDownLimit { protected var requiredSchema: Option[StructType] = None + protected var limit: Option[Int] = None override def pruneColumns(requiredSchema: StructType): Unit = { this.requiredSchema = Some(requiredSchema) } + + override def pushLimit(limit: Int): Boolean = { + this.limit = Some(limit) + true + } } /** Extracts a partition-key predicate so the scan can skip partitions that can't match. */ @@ -130,6 +139,7 @@ class FlussAppendScanBuilder( pushedPredicate, partitionPredicate, acceptedPredicates.toSeq, + limit, options, flussConfig) } @@ -150,6 +160,7 @@ class FlussLakeAppendScanBuilder( requiredSchema, pushedPredicate, acceptedPredicates.toSeq, + limit, options, flussConfig) } @@ -164,7 +175,14 @@ class FlussUpsertScanBuilder( extends FlussSupportsPushDownPartitionFilters { override def build(): Scan = { - FlussUpsertScan(tablePath, tableInfo, requiredSchema, partitionPredicate, options, flussConfig) + FlussUpsertScan( + tablePath, + tableInfo, + requiredSchema, + partitionPredicate, + limit, + options, + flussConfig) } } @@ -183,6 +201,7 @@ class FlussLakeUpsertScanBuilder( requiredSchema, pushedPredicate, acceptedPredicates.toSeq, + limit, options, flussConfig) } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala index 65050f45e5..7a74ee171c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala @@ -46,9 +46,10 @@ import scala.collection.mutable class FlussUpsertPartitionReader( tablePath: TablePath, projection: Array[Int], + limit: Option[Int], flussPartition: FlussUpsertInputPartition, flussConfig: Configuration) - extends FlussPartitionReader(tablePath, flussConfig) + extends FlussPartitionReader(tablePath, flussConfig, limit) with Logging { override protected lazy val projectedRowType: RowType = rowType.project(projectionWithPks) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala index d451039299..6fa1671dcd 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala @@ -40,9 +40,10 @@ class FlussLakeAppendBatch( tableInfo: TableInfo, readSchema: StructType, pushedPredicate: Option[FlussPredicate], + limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) - extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) { + extends FlussLakeBatch(tablePath, tableInfo, readSchema, limit, options, flussConfig) { // Required by FlussLakeBatch but unused — lake snapshot determines start offsets. override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest() @@ -57,6 +58,7 @@ class FlussLakeAppendBatch( tablePath, projection, logTailPredicate, + limit, options, flussConfig) } else { @@ -66,6 +68,7 @@ class FlussLakeAppendBatch( projection, pushedPredicate, logTailPredicate, + limit, flussConfig) } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala index 6c64ed902e..a1ffeac4cf 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala @@ -33,8 +33,9 @@ class FlussLakeAppendPartitionReader( partition: FlussLakeInputPartition, lakeSource: LakeSource[LakeSplit], projection: Array[Int], + limit: Option[Int], flussConfig: Configuration) - extends FlussPartitionReader(tablePath, flussConfig) + extends FlussPartitionReader(tablePath, flussConfig, limit) with Logging { private var recordIterator: CloseableIterator[LogRecord] = _ diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala index d61aea9d1c..bed1017b87 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala @@ -37,9 +37,10 @@ abstract class FlussLakeBatch( tablePath: TablePath, tableInfo: TableInfo, readSchema: StructType, + limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) - extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { + extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) { override val stoppingOffsetsInitializer: OffsetsInitializer = { FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala index 369c45f748..3d0046b74e 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala @@ -35,6 +35,7 @@ class FlussLakePartitionReaderFactory( projection: Array[Int], flussPredicate: Option[FlussPredicate], logTailPredicate: Option[FlussPredicate], + limit: Option[Int], flussConfig: Configuration) extends PartitionReaderFactory { @@ -53,12 +54,14 @@ class FlussLakePartitionReaderFactory( lakeOnlySplit, lakeSource, projection, + limit, flussConfig) case logSplit: FlussAppendInputPartition => new FlussAppendPartitionReader( tablePath, projection, logTailPredicate, + limit, logSplit, flussConfig) case mixedSplit: FlussLakeUpsertInputPartition => @@ -66,6 +69,7 @@ class FlussLakePartitionReaderFactory( tablePath, lakeSource, projection, + limit, mixedSplit, flussConfig) case _ => diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala index 2ef6ab7701..1a6efd0301 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala @@ -43,9 +43,10 @@ class FlussLakeUpsertBatch( tableInfo: TableInfo, readSchema: StructType, pushedPredicate: Option[FlussPredicate], + limit: Option[Int], options: CaseInsensitiveStringMap, flussConfig: Configuration) - extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) { + extends FlussLakeBatch(tablePath, tableInfo, readSchema, limit, options, flussConfig) { override val startOffsetsInitializer: OffsetsInitializer = { val offsetsInitializer = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) @@ -57,7 +58,7 @@ class FlussLakeUpsertBatch( override def createReaderFactory(): PartitionReaderFactory = { if (isFallback) { - new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig) + new FlussUpsertPartitionReaderFactory(tablePath, projection, limit, options, flussConfig) } else { // PK kv-tail reader does not consume server-side log filters. new FlussLakePartitionReaderFactory( @@ -66,6 +67,7 @@ class FlussLakeUpsertBatch( projection, pushedPredicate, None, + limit, flussConfig) } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala index ae259dc9b9..449fbc0626 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala @@ -39,9 +39,10 @@ class FlussLakeUpsertPartitionReader( tablePath: TablePath, lakeSource: LakeSource[LakeSplit], projection: Array[Int], + limit: Option[Int], flussPartition: FlussLakeUpsertInputPartition, flussConfig: Configuration) - extends FlussPartitionReader(tablePath, flussConfig) + extends FlussPartitionReader(tablePath, flussConfig, limit) with Logging { private val lakeSplits = flussPartition.lakeSplits diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index 42b0aa62d0..2a05b2dcf1 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -603,4 +603,23 @@ class SparkLogTableReadTest extends FlussSparkTestBase { assert(numRowsRead == 5L, s"Expected 5 rows read, got $numRowsRead") } } + + test("Spark Read: limit pushdown") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id INT, name STRING) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t VALUES + |(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e') + |""".stripMargin) + + val dfNoLimit = sql(s"SELECT * FROM $DEFAULT_DATABASE.t") + assert(flussAppendScans(dfNoLimit).flatMap(_.limit).isEmpty) + + val dfLimit = sql(s"SELECT * FROM $DEFAULT_DATABASE.t LIMIT 2") + assert(flussAppendScans(dfLimit).flatMap(_.limit).distinct == Seq(2)) + } + } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala index cddca21722..e25479b238 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala @@ -448,6 +448,16 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase { body } + test("Spark Read: primary key table limit pushdown") { + withPkPartitionedTable { + val dfNoLimit = sql(s"SELECT * FROM $DEFAULT_DATABASE.t") + assert(flussUpsertScan(dfNoLimit).flatMap(_.limit).isEmpty) + + val dfLimit = sql(s"SELECT * FROM $DEFAULT_DATABASE.t LIMIT 2") + assert(flussUpsertScan(dfLimit).flatMap(_.limit).contains(2)) + } + } + private def partitionPredicate(df: DataFrame): Option[org.apache.fluss.predicate.Predicate] = { flussUpsertScan(df).flatMap(_.partitionPredicate) } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala index 210b23e127..593da775a5 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala @@ -523,6 +523,33 @@ abstract class SparkLakeLogTableReadTest extends SparkLakeTableReadTestBase { } } + test("Spark Lake Read: log table union read with limit pushdown") { + withTable("t_union_limit") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_union_limit (id INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union_limit VALUES + |(1, "alpha"), (2, "beta"), (3, "gamma") + |""".stripMargin) + + tierToLake("t_union_limit") + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union_limit VALUES + |(4, "delta"), (5, "epsilon") + |""".stripMargin) + + val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union_limit LIMIT 2") + assert(flussScan(df).flatMap(_.limit).distinct == Seq(2)) + } + } + test("Spark Lake Read: non-FULL startup mode skips lake path") { withTable("t_earliest") { sql(s""" diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala index 0de25f100e..e8af70c5c4 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala @@ -458,6 +458,32 @@ abstract class SparkLakePrimaryKeyTableReadTestBase extends SparkLakeTableReadTe } } + test("Spark Lake Read: union with limit pushdown") { + withTable("t_pk_union_limit") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_pk_union_limit (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pk_union_limit VALUES + |(1, 'alice', 90), (2, 'bob', 85), (3, 'charlie', 95) + |""".stripMargin) + tierToLake("t_pk_union_limit") + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pk_union_limit VALUES + |(4, 'dave', 88), (5, 'eve', 92) + |""".stripMargin) + + val query = + sql(s"SELECT id, score FROM $DEFAULT_DATABASE.t_pk_union_limit LIMIT 2") + assert(flussScan(query).flatMap(_.limit).distinct == Seq(2)) + } + } + test("Spark Lake Read: primary key table projection with type-dependent columns") { withTable("t") { val tablePath = createTablePath("t") diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala index 526e898899..ac2d09c1a4 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala @@ -132,14 +132,18 @@ abstract class SparkLakeTableReadTestBase extends FlussSparkTestBase { } } - protected def pushedPredicates(df: DataFrame): Array[Predicate] = { + protected def flussScan(df: DataFrame): Seq[FlussScan] = { val scans = df.queryExecution.executedPlan.collect { case b: BatchScanExec => b.scan } ++ df.queryExecution.optimizedPlan.collect { case DataSourceV2ScanRelation(_, scan, _, _, _) => scan } - scans + scans.collect { case s: FlussScan => s } + } + + protected def pushedPredicates(df: DataFrame): Array[Predicate] = { + flussScan(df) .collect { case f: FlussScan => f.pushedSparkPredicates } .flatten .toArray