Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
// overwrites and to allow intentional overwriting of IDs generated in previous AQE iteration
val idMap = new IdentityHashMap[QueryPlan[_], Int]()
localIdMap.set(idMap)
// Initialize an array of ReusedExchanges to help find Adaptively Optimized Out
// Exchanges as part of SPARK-42753
val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]

var currentOperatorID = 0
currentOperatorID = generateOperatorIDs(plan, currentOperatorID, idMap, reusedExchanges,
true)

val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
getSubqueries(plan, subqueries)

currentOperatorID = subqueries.foldLeft(currentOperatorID) {
(curId, plan) => generateOperatorIDs(plan._3.child, curId, idMap, reusedExchanges,
true)
}

// SPARK-42753: Process subtree for a ReusedExchange with unknown child
val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
reusedExchanges.foreach{ reused =>
val child = reused.child
if (!idMap.containsKey(child)) {
optimizedOutExchanges.append(child)
currentOperatorID = generateOperatorIDs(child, currentOperatorID, idMap,
reusedExchanges, false)
}
}
val (subqueries, optimizedOutExchanges) = assignOperatorIds(plan, idMap)

val collectedOperators = BitSet.empty
processPlanSkippingSubqueries(plan, append, collectedOperators)
Expand Down Expand Up @@ -150,6 +125,41 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
}
}

/**
* Assigns operator IDs to all operators across the full plan tree -- the main plan,
* any subqueries, and any adaptively-optimized-out exchanges (SPARK-42753) -- by populating
* the supplied idMap. Returns the discovered subqueries and optimized-out exchanges so
* the caller ([[processPlan]]) can perform post-assignment work (text output) without
* rediscovering them.
*/
private def assignOperatorIds(
plan: QueryPlan[_],
idMap: java.util.Map[QueryPlan[_], Int])
: (ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)], ArrayBuffer[Exchange]) = {
// Initialize an array of ReusedExchanges to help find Adaptively Optimized Out
// Exchanges as part of SPARK-42753
val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]
var currentOperatorID = generateOperatorIDs(plan, 0, idMap, reusedExchanges, true)

val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
getSubqueries(plan, subqueries)
currentOperatorID = subqueries.foldLeft(currentOperatorID) {
(curId, sub) => generateOperatorIDs(sub._3.child, curId, idMap, reusedExchanges, true)
}

// SPARK-42753: Process subtree for a ReusedExchange with unknown child
val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
reusedExchanges.foreach { reused =>
val child = reused.child
if (!idMap.containsKey(child)) {
optimizedOutExchanges.append(child)
currentOperatorID = generateOperatorIDs(child, currentOperatorID, idMap,
reusedExchanges, false)
}
}
(subqueries, optimizedOutExchanges)
}

/**
* Traverses the supplied input plan in a bottom-up fashion and records the operator id via
* setting a tag in the operator.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.util.StringConcat
import org.apache.spark.sql.test.SharedSparkSession

/**
* Tests for [[ExplainUtils.processPlan]]: operator ID assignment, WholeStageCodegen tag
* propagation, thread-local lifecycle, and subquery handling.
*/
class ExplainUtilsSuite extends SharedSparkSession {

private def explainOutput(plan: SparkPlan): String = {
val concat = new StringConcat()
ExplainUtils.processPlan(plan, concat.append)
concat.toString
}

test("processPlan assigns unique operator IDs to all visible plan nodes") {
val df = spark.range(100).filter("id > 10").select("id")
val output = explainOutput(df.queryExecution.executedPlan)
// Each visible operator is tagged "(N)" in the tree header.
val ids = "\\((\\d+)\\)".r.findAllMatchIn(output).map(_.group(1).toInt).toSeq
assert(ids.nonEmpty, "processPlan should assign at least one operator ID")
assert(ids == ids.distinct, s"processPlan operator IDs should be unique: $ids")
}

test("processPlan sets WholeStageCodegen tags on plan nodes") {
withSQLConf("spark.sql.codegen.wholeStage" -> "true") {
val df = spark.range(10).filter("id > 3")
val plan = df.queryExecution.executedPlan
ExplainUtils.processPlan(plan, _ => ())
val codegenNodes = plan.collect {
case p if p.getTagValue(QueryPlan.CODEGEN_ID_TAG).isDefined => p
}
assert(codegenNodes.nonEmpty,
"processPlan should set CODEGEN_ID_TAG on nodes inside WholeStageCodegenExec")
}
}

test("processPlan restores localIdMap to its prior value after completion") {
val prev = ExplainUtils.localIdMap.get()
ExplainUtils.processPlan(spark.range(10).filter("id > 3").queryExecution.executedPlan,
_ => ())
assert(ExplainUtils.localIdMap.get() eq prev,
"processPlan should restore the thread-local localIdMap on completion")
}

test("processPlan includes subquery output in the explain string") {
withSQLConf("spark.sql.adaptive.enabled" -> "false") {
val df = spark.range(100).filter("id > 0")
.filter(s"id < (SELECT max(id) FROM range(5))")
val output = explainOutput(df.queryExecution.executedPlan)
assert(output.contains("Subqueries"), "processPlan should emit a Subqueries section")
}
}
}