Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ class FlussAppendPartitionReader(
tablePath: TablePath,
projection: Array[Int],
pushedPredicate: Option[Predicate],
limit: Option[Int],
flussPartition: FlussAppendInputPartition,
flussConfig: Configuration)
extends FlussPartitionReader(tablePath, flussConfig) {
extends FlussPartitionReader(tablePath, flussConfig, limit) {

override protected lazy val projectedRowType: RowType = rowType.project(projection)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ abstract class FlussBatch(
tablePath: TablePath,
tableInfo: TableInfo,
readSchema: StructType,
limit: Option[Int],
flussConfig: Configuration)
extends Batch
with AutoCloseable {
Expand Down Expand Up @@ -114,9 +115,10 @@ class FlussAppendBatch(
readSchema: StructType,
pushedPredicate: Option[Predicate],
partitionPredicate: Option[Predicate],
limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) {

override val startOffsetsInitializer: OffsetsInitializer = {
FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
Expand Down Expand Up @@ -199,6 +201,7 @@ class FlussAppendBatch(
tablePath,
projection,
pushedPredicate,
limit,
options,
flussConfig)
}
Expand All @@ -211,9 +214,10 @@ class FlussUpsertBatch(
tableInfo: TableInfo,
readSchema: StructType,
partitionPredicate: Option[Predicate],
limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) {

override val startOffsetsInitializer: OffsetsInitializer = {
val offsetsInitializer = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
Expand Down Expand Up @@ -247,6 +251,6 @@ class FlussUpsertBatch(
}

override def createReaderFactory(): PartitionReaderFactory = {
new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig)
new FlussUpsertPartitionReaderFactory(tablePath, projection, limit, options, flussConfig)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class FlussAppendMicroBatchStream(
checkpointLocation) {

override def createReaderFactory(): PartitionReaderFactory = {
new FlussAppendPartitionReaderFactory(tablePath, projection, None, options, flussConfig)
new FlussAppendPartitionReaderFactory(tablePath, projection, None, None, options, flussConfig)
}

override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
Expand Down Expand Up @@ -352,6 +352,6 @@ class FlussUpsertMicroBatchStream(
}

override def createReaderFactory(): PartitionReaderFactory = {
new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig)
new FlussUpsertPartitionReaderFactory(tablePath, projection, None, options, flussConfig)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ import org.apache.spark.sql.connector.read.PartitionReader

import java.time.Duration

abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: Configuration)
abstract class FlussPartitionReader(
tablePath: TablePath,
flussConfig: Configuration,
limit: Option[Int])
extends PartitionReader[InternalRow]
with Logging {

Expand All @@ -57,6 +60,9 @@ abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: Configura
def next0(): Boolean

override def next(): Boolean = {
if (limit.isDefined && numRowsRead >= limit.get) {
return false
}
val hasNext = next0()
if (hasNext) {
numRowsRead += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class FlussAppendPartitionReaderFactory(
tablePath: TablePath,
projection: Array[Int],
pushedPredicate: Option[Predicate],
limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends PartitionReaderFactory {
Expand All @@ -40,6 +41,7 @@ class FlussAppendPartitionReaderFactory(
tablePath,
projection,
pushedPredicate,
limit,
flussPartition,
flussConfig
)
Expand All @@ -50,6 +52,7 @@ class FlussAppendPartitionReaderFactory(
class FlussUpsertPartitionReaderFactory(
tablePath: TablePath,
projection: Array[Int],
limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends PartitionReaderFactory {
Expand All @@ -59,6 +62,7 @@ class FlussUpsertPartitionReaderFactory(
new FlussUpsertPartitionReader(
tablePath,
projection,
limit,
upsertPartition,
flussConfig
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ trait FlussScan extends Scan {

def partitionPredicate: Option[FlussPredicate] = None

def limit: Option[Int] = None

protected def scanType: String

override def readSchema(): StructType = {
Expand All @@ -54,10 +56,14 @@ trait FlussScan extends Scan {
val withPushed =
if (pushedSparkPredicates.isEmpty) base
else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]"
partitionPredicate match {
val withPartition = partitionPredicate match {
case Some(p) => s"$withPushed [PartitionFilter: $p]"
case None => withPushed
}
limit match {
case Some(l) => s"$withPartition [Limit: $l]"
case None => withPartition
}
}

override def supportedCustomMetrics(): Array[CustomMetric] =
Expand All @@ -72,6 +78,7 @@ case class FlussAppendScan(
pushedPredicate: Option[FlussPredicate],
override val partitionPredicate: Option[FlussPredicate],
override val pushedSparkPredicates: Seq[Predicate],
override val limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {
Expand All @@ -85,6 +92,7 @@ case class FlussAppendScan(
readSchema,
pushedPredicate,
partitionPredicate,
limit,
options,
flussConfig)
}
Expand All @@ -107,6 +115,7 @@ case class FlussLakeAppendScan(
requiredSchema: Option[StructType],
pushedPredicate: Option[FlussPredicate],
override val pushedSparkPredicates: Seq[Predicate],
override val limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {
Expand All @@ -119,6 +128,7 @@ case class FlussLakeAppendScan(
tableInfo,
readSchema,
pushedPredicate,
limit,
options,
flussConfig)
}
Expand All @@ -140,14 +150,22 @@ case class FlussUpsertScan(
tableInfo: TableInfo,
requiredSchema: Option[StructType],
override val partitionPredicate: Option[FlussPredicate],
override val limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {

override protected val scanType: String = "Upsert"

override def toBatch: Batch = {
new FlussUpsertBatch(tablePath, tableInfo, readSchema, partitionPredicate, options, flussConfig)
new FlussUpsertBatch(
tablePath,
tableInfo,
readSchema,
partitionPredicate,
limit,
options,
flussConfig)
}

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
Expand All @@ -168,6 +186,7 @@ case class FlussLakeUpsertScan(
requiredSchema: Option[StructType],
pushedPredicate: Option[FlussPredicate],
override val pushedSparkPredicates: Seq[Predicate],
override val limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {
Expand All @@ -180,6 +199,7 @@ case class FlussLakeUpsertScan(
tableInfo,
readSchema,
pushedPredicate,
limit,
options,
flussConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.fluss.spark.read.lake.{FlussLakeBatch, FlussLakeUtils}
import org.apache.fluss.spark.utils.{SparkPartitionPredicate, SparkPredicateConverter}

import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownLimit, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -33,13 +33,22 @@ import java.util.{Collections, IdentityHashMap, Set => JSet}
import scala.collection.JavaConverters._

/** An interface that extends from Spark [[ScanBuilder]]. */
trait FlussScanBuilder extends ScanBuilder with SupportsPushDownRequiredColumns {
trait FlussScanBuilder
extends ScanBuilder
with SupportsPushDownRequiredColumns
with SupportsPushDownLimit {

protected var requiredSchema: Option[StructType] = None
protected var limit: Option[Int] = None

override def pruneColumns(requiredSchema: StructType): Unit = {
this.requiredSchema = Some(requiredSchema)
}

override def pushLimit(limit: Int): Boolean = {
this.limit = Some(limit)
true
}
}

/** Extracts a partition-key predicate so the scan can skip partitions that can't match. */
Expand Down Expand Up @@ -130,6 +139,7 @@ class FlussAppendScanBuilder(
pushedPredicate,
partitionPredicate,
acceptedPredicates.toSeq,
limit,
options,
flussConfig)
}
Expand All @@ -150,6 +160,7 @@ class FlussLakeAppendScanBuilder(
requiredSchema,
pushedPredicate,
acceptedPredicates.toSeq,
limit,
options,
flussConfig)
}
Expand All @@ -164,7 +175,14 @@ class FlussUpsertScanBuilder(
extends FlussSupportsPushDownPartitionFilters {

override def build(): Scan = {
FlussUpsertScan(tablePath, tableInfo, requiredSchema, partitionPredicate, options, flussConfig)
FlussUpsertScan(
tablePath,
tableInfo,
requiredSchema,
partitionPredicate,
limit,
options,
flussConfig)
}
}

Expand All @@ -183,6 +201,7 @@ class FlussLakeUpsertScanBuilder(
requiredSchema,
pushedPredicate,
acceptedPredicates.toSeq,
limit,
options,
flussConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ import scala.collection.mutable
class FlussUpsertPartitionReader(
tablePath: TablePath,
projection: Array[Int],
limit: Option[Int],
flussPartition: FlussUpsertInputPartition,
flussConfig: Configuration)
extends FlussPartitionReader(tablePath, flussConfig)
extends FlussPartitionReader(tablePath, flussConfig, limit)
with Logging {

override protected lazy val projectedRowType: RowType = rowType.project(projectionWithPks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class FlussLakeAppendBatch(
tableInfo: TableInfo,
readSchema: StructType,
pushedPredicate: Option[FlussPredicate],
limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) {
extends FlussLakeBatch(tablePath, tableInfo, readSchema, limit, options, flussConfig) {

// Required by FlussLakeBatch but unused — lake snapshot determines start offsets.
override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest()
Expand All @@ -57,6 +58,7 @@ class FlussLakeAppendBatch(
tablePath,
projection,
logTailPredicate,
limit,
options,
flussConfig)
} else {
Expand All @@ -66,6 +68,7 @@ class FlussLakeAppendBatch(
projection,
pushedPredicate,
logTailPredicate,
limit,
flussConfig)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ class FlussLakeAppendPartitionReader(
partition: FlussLakeInputPartition,
lakeSource: LakeSource[LakeSplit],
projection: Array[Int],
limit: Option[Int],
flussConfig: Configuration)
extends FlussPartitionReader(tablePath, flussConfig)
extends FlussPartitionReader(tablePath, flussConfig, limit)
with Logging {

private var recordIterator: CloseableIterator[LogRecord] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ abstract class FlussLakeBatch(
tablePath: TablePath,
tableInfo: TableInfo,
readSchema: StructType,
limit: Option[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
extends FlussBatch(tablePath, tableInfo, readSchema, limit, flussConfig) {

override val stoppingOffsetsInitializer: OffsetsInitializer = {
FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class FlussLakePartitionReaderFactory(
projection: Array[Int],
flussPredicate: Option[FlussPredicate],
logTailPredicate: Option[FlussPredicate],
limit: Option[Int],
flussConfig: Configuration)
extends PartitionReaderFactory {

Expand All @@ -53,19 +54,22 @@ class FlussLakePartitionReaderFactory(
lakeOnlySplit,
lakeSource,
projection,
limit,
flussConfig)
case logSplit: FlussAppendInputPartition =>
new FlussAppendPartitionReader(
tablePath,
projection,
logTailPredicate,
limit,
logSplit,
flussConfig)
case mixedSplit: FlussLakeUpsertInputPartition =>
new FlussLakeUpsertPartitionReader(
tablePath,
lakeSource,
projection,
limit,
mixedSplit,
flussConfig)
case _ =>
Expand Down
Loading