Skip to content

Commit af9c8b3

Browse files
yyanyycloud-fan
authored andcommitted
[SPARK-56273][SQL] Simplify extracting fields from DataSourceV2ScanRelation
### What changes were proposed in this pull request? `DataSourceV2ScanRelation` is a case class with 5 fields. Many pattern match sites only need the `scan` field or a subset of `(relation, scan, output)`, using wildcards for the rest. This couples every match site to the constructor arity, so adding or removing fields requires updating all of them. This PR introduces two extractors following the `ExtractV2Table` precedent (SPARK-53720): - `ExtractV2Scan`: returns `Scan` - `ExtractV2ScanRelation`: returns `(DataSourceV2Relation, Scan, Seq[AttributeReference])` Updated 14 pattern match sites across 10 files. Type-based matches and constructor calls are unchanged. ### Why are the changes needed? Code simplification. Similar to 6bae835 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? This PR relies on existing tests. ### Was this patch authored or co-authored using generative AI tooling? Yes - Opus 4.6  Closes #55070 from yyanyy/extract-v2-scan-relation-extractors. Authored-by: Yan Yan <yyanyyyy@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 975b299 commit af9c8b3

11 files changed

Lines changed: 34 additions & 23 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,17 @@ object ExtractV2CatalogAndIdentifier {
285285
}
286286
}
287287

288+
object ExtractV2Scan {
289+
def unapply(scanRelation: DataSourceV2ScanRelation): Option[Scan] =
290+
Some(scanRelation.scan)
291+
}
292+
293+
object ExtractV2ScanInfo {
294+
def unapply(scanRelation: DataSourceV2ScanRelation)
295+
: Option[(DataSourceV2Relation, Scan, Seq[AttributeReference])] =
296+
Some((scanRelation.relation, scanRelation.scan, scanRelation.output))
297+
}
298+
288299
object DataSourceV2Relation {
289300
def create(
290301
table: Table,

sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
6060
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
6161
import org.apache.spark.sql.execution.command._
6262
import org.apache.spark.sql.execution.datasources.LogicalRelationWithTable
63-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, ExtractV2Table, FileTable}
63+
import org.apache.spark.sql.execution.datasources.v2.{ExtractV2ScanInfo, ExtractV2Table, FileTable}
6464
import org.apache.spark.sql.execution.python.EvaluatePython
6565
import org.apache.spark.sql.execution.stat.StatFunctions
6666
import org.apache.spark.sql.internal.SQLConf
@@ -1732,7 +1732,7 @@ class Dataset[T] private[sql](
17321732
fr.inputFiles
17331733
case r: HiveTableRelation =>
17341734
r.tableMeta.storage.locationUri.map(_.toString).toArray
1735-
case DataSourceV2ScanRelation(ExtractV2Table(table: FileTable), _, _, _, _) =>
1735+
case ExtractV2ScanInfo(ExtractV2Table(table: FileTable), _, _) =>
17361736
table.fileIndex.inputFiles
17371737
}.flatten
17381738
files.toSet.toArray

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
119119
}
120120

121121
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
122-
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
123-
v2Relation, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _, _)) =>
122+
case PhysicalOperation(project, filters, ExtractV2ScanInfo(
123+
v2Relation, V1ScanWrapper(scan, pushed, pushedDownOperators), output)) =>
124124
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
125125
if (v1Relation.schema != scan.readSchema()) {
126126
throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError(
@@ -146,7 +146,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
146146
project, filters, dsScan, needsUnsafeConversion = false) :: Nil
147147

148148
case PhysicalOperation(project, filters,
149-
DataSourceV2ScanRelation(_, scan: LocalScan, output, _, _)) =>
149+
ExtractV2ScanInfo(_, scan: LocalScan, output)) =>
150150
val localScanExec = LocalTableScanExec(output, scan.rows().toImmutableArraySeq, None)
151151
DataSourceV2Strategy.withProjectAndFilter(
152152
project, filters, localScanExec, needsUnsafeConversion = false) :: Nil
@@ -346,7 +346,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
346346

347347
case DeleteFromTable(relation, condition) =>
348348
relation match {
349-
case DataSourceV2ScanRelation(r, _, output, _, _) =>
349+
case ExtractV2ScanInfo(r, _, output) =>
350350
val table = r.table
351351
if (SubqueryExpression.hasSubquery(condition)) {
352352
throw QueryCompilationErrors.unsupportedDeleteByConditionWithSubqueryError(condition)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with Logging {
4141
}
4242

4343
private def partitioning(plan: LogicalPlan) = plan.transformDown {
44-
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None, _) =>
44+
case d @ ExtractV2ScanInfo(relation, scan: SupportsReportPartitioning, _)
45+
if d.keyGroupedPartitioning.isEmpty =>
4546
val catalystPartitioning = scan.outputPartitioning() match {
4647
case kgp: KeyGroupedPartitioning =>
4748
val partitioning = sequenceToOption(
@@ -68,7 +69,7 @@ object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with Logging {
6869
}
6970

7071
private def ordering(plan: LogicalPlan) = plan.transformDown {
71-
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportOrdering, _, _, _) =>
72+
case d @ ExtractV2ScanInfo(relation, scan: SupportsReportOrdering, _) =>
7273
val ordering =
7374
V2ExpressionUtils.toCatalystOrdering(scan.outputOrdering(), relation, relation.funCatalog)
7475
d.copy(ordering = Some(ordering))

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
2626
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
2727
import org.apache.spark.sql.execution.columnar.InMemoryRelation
2828
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
29-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
29+
import org.apache.spark.sql.execution.datasources.v2.ExtractV2Scan
3030
import org.apache.spark.util.ArrayImplicits._
3131

3232
/**
@@ -79,7 +79,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
7979
} else {
8080
None
8181
}
82-
case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) =>
82+
case (resExp, r @ ExtractV2Scan(scan: SupportsRuntimeV2Filtering)) =>
8383
val filterAttrs = V2ExpressionUtils.resolveRefs[Attribute](
8484
scan.filterAttributes.toImmutableArraySeq, r)
8585
if (resExp.references.subsetOf(AttributeSet(filterAttrs))) {

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
2727
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
2828
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
2929
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, MERGE, UPDATE}
30-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
30+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation, ExtractV2Scan}
3131
import org.apache.spark.util.ArrayImplicits._
3232

3333
/**
@@ -50,7 +50,7 @@ class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
5050
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
5151
// apply special dynamic filtering only for group-based row-level operations
5252
case GroupBasedRowLevelOperation(replaceData, _, Some(cond),
53-
DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
53+
ExtractV2Scan(scan: SupportsRuntimeV2Filtering))
5454
if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral
5555
&& scan.filterAttributes().nonEmpty =>
5656

sql/core/src/test/scala/org/apache/spark/sql/collation/CollatedFilterPushDownToParquetSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
2626
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2727
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelationWithTable}
2828
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter}
29-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
29+
import org.apache.spark.sql.execution.datasources.v2.ExtractV2Scan
3030
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3131
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
3232
import org.apache.spark.sql.sources.{EqualTo, Filter, IsNotNull}
@@ -257,8 +257,7 @@ class CollatedFilterPushDownToParquetV2Suite extends CollatedFilterPushDownToPar
257257

258258
override def getPushedDownFilters(query: DataFrame): Seq[Filter] = {
259259
query.queryExecution.optimizedPlan.collectFirst {
260-
case PhysicalOperation(_, _,
261-
DataSourceV2ScanRelation(_, scan: ParquetScan, _, _, _)) =>
260+
case PhysicalOperation(_, _, ExtractV2Scan(scan: ParquetScan)) =>
262261
scan.pushedFilters.toSeq
263262
}.getOrElse(Seq.empty)
264263
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.connector
2020
import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper}
2121
import org.apache.spark.sql.catalyst.plans.logical._
2222
import org.apache.spark.sql.connector.expressions.aggregate.GeneralAggregateFunc
23-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
23+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, ExtractV2Scan, V1ScanWrapper}
2424
import org.apache.spark.sql.internal.SQLConf
2525
import org.apache.spark.sql.types.StructType
2626

@@ -88,7 +88,7 @@ trait DataSourcePushdownTestUtils extends ExplainSuiteHelper {
8888

8989
protected def checkAggregatePushed(df: DataFrame, funcName: String): Unit = {
9090
df.queryExecution.optimizedPlan.collect {
91-
case DataSourceV2ScanRelation(_, scan, _, _, _) =>
91+
case ExtractV2Scan(scan) =>
9292
assert(scan.isInstanceOf[V1ScanWrapper])
9393
val wrapper = scan.asInstanceOf[V1ScanWrapper]
9494
assert(wrapper.pushedDownOperators.aggregation.isDefined)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
3232
import org.apache.spark.sql.catalyst.dsl.expressions._
3333
import org.apache.spark.sql.catalyst.expressions._
3434
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
35-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
35+
import org.apache.spark.sql.execution.datasources.v2.ExtractV2Scan
3636
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
3737
import org.apache.spark.sql.functions.col
3838
import org.apache.spark.sql.internal.SQLConf
@@ -63,7 +63,7 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
6363
.where(Column(predicate))
6464

6565
query.queryExecution.optimizedPlan match {
66-
case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, o: OrcScan, _, _, _)) =>
66+
case PhysicalOperation(_, filters, ExtractV2Scan(o: OrcScan)) =>
6767
assert(filters.nonEmpty, "No filter is analyzed from the given query")
6868
assert(o.pushedFilters.nonEmpty, "No filter is pushed down")
6969
val maybeFilter = OrcFilters.createFilter(query.schema, o.pushedFilters.toImmutableArraySeq)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
2727
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2828
import org.apache.spark.sql.classic.ClassicConversions._
2929
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
30-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
30+
import org.apache.spark.sql.execution.datasources.v2.ExtractV2Scan
3131
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
3232
import org.apache.spark.sql.internal.SQLConf
3333
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
@@ -120,7 +120,7 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest {
120120
.where(Column(predicate))
121121

122122
query.queryExecution.optimizedPlan match {
123-
case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, o: OrcScan, _, _, _)) =>
123+
case PhysicalOperation(_, filters, ExtractV2Scan(o: OrcScan)) =>
124124
assert(filters.nonEmpty, "No filter is analyzed from the given query")
125125
if (noneSupported) {
126126
assert(o.pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")

0 commit comments

Comments
 (0)