diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index 2e878c21dc7a..a81ddb1b1da1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -87,32 +87,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { // overwrites and to allow intentional overwriting of IDs generated in previous AQE iteration val idMap = new IdentityHashMap[QueryPlan[_], Int]() localIdMap.set(idMap) - // Initialize an array of ReusedExchanges to help find Adaptively Optimized Out - // Exchanges as part of SPARK-42753 - val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec] - - var currentOperatorID = 0 - currentOperatorID = generateOperatorIDs(plan, currentOperatorID, idMap, reusedExchanges, - true) - - val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)] - getSubqueries(plan, subqueries) - - currentOperatorID = subqueries.foldLeft(currentOperatorID) { - (curId, plan) => generateOperatorIDs(plan._3.child, curId, idMap, reusedExchanges, - true) - } - - // SPARK-42753: Process subtree for a ReusedExchange with unknown child - val optimizedOutExchanges = ArrayBuffer.empty[Exchange] - reusedExchanges.foreach{ reused => - val child = reused.child - if (!idMap.containsKey(child)) { - optimizedOutExchanges.append(child) - currentOperatorID = generateOperatorIDs(child, currentOperatorID, idMap, - reusedExchanges, false) - } - } + val (subqueries, optimizedOutExchanges) = assignOperatorIds(plan, idMap) val collectedOperators = BitSet.empty processPlanSkippingSubqueries(plan, append, collectedOperators) @@ -150,6 +125,41 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { } } + /** + * Assigns operator IDs to all operators across the full plan tree -- the main plan, + * any subqueries, and any adaptively-optimized-out exchanges (SPARK-42753) -- by populating + * the supplied idMap. Returns the discovered subqueries and optimized-out exchanges so + * the caller ([[processPlan]]) can perform post-assignment work (text output) without + * rediscovering them. + */ + private def assignOperatorIds( + plan: QueryPlan[_], + idMap: java.util.Map[QueryPlan[_], Int]) + : (ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)], ArrayBuffer[Exchange]) = { + // Initialize an array of ReusedExchanges to help find Adaptively Optimized Out + // Exchanges as part of SPARK-42753 + val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec] + var currentOperatorID = generateOperatorIDs(plan, 0, idMap, reusedExchanges, true) + + val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)] + getSubqueries(plan, subqueries) + currentOperatorID = subqueries.foldLeft(currentOperatorID) { + (curId, sub) => generateOperatorIDs(sub._3.child, curId, idMap, reusedExchanges, true) + } + + // SPARK-42753: Process subtree for a ReusedExchange with unknown child + val optimizedOutExchanges = ArrayBuffer.empty[Exchange] + reusedExchanges.foreach { reused => + val child = reused.child + if (!idMap.containsKey(child)) { + optimizedOutExchanges.append(child) + currentOperatorID = generateOperatorIDs(child, currentOperatorID, idMap, + reusedExchanges, false) + } + } + (subqueries, optimizedOutExchanges) + } + /** * Traverses the supplied input plan in a bottom-up fashion and records the operator id via * setting a tag in the operator. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExplainUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExplainUtilsSuite.scala new file mode 100644 index 000000000000..7bc06b2af4b3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExplainUtilsSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.util.StringConcat +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests for [[ExplainUtils.processPlan]]: operator ID assignment, WholeStageCodegen tag + * propagation, thread-local lifecycle, and subquery handling. + */ +class ExplainUtilsSuite extends SharedSparkSession { + + private def explainOutput(plan: SparkPlan): String = { + val concat = new StringConcat() + ExplainUtils.processPlan(plan, concat.append) + concat.toString + } + + test("processPlan assigns unique operator IDs to all visible plan nodes") { + val df = spark.range(100).filter("id > 10").select("id") + val output = explainOutput(df.queryExecution.executedPlan) + // Each visible operator is tagged "(N)" in the tree header. + val ids = "\\((\\d+)\\)".r.findAllMatchIn(output).map(_.group(1).toInt).toSeq + assert(ids.nonEmpty, "processPlan should assign at least one operator ID") + assert(ids == ids.distinct, s"processPlan operator IDs should be unique: $ids") + } + + test("processPlan sets WholeStageCodegen tags on plan nodes") { + withSQLConf("spark.sql.codegen.wholeStage" -> "true") { + val df = spark.range(10).filter("id > 3") + val plan = df.queryExecution.executedPlan + ExplainUtils.processPlan(plan, _ => ()) + val codegenNodes = plan.collect { + case p if p.getTagValue(QueryPlan.CODEGEN_ID_TAG).isDefined => p + } + assert(codegenNodes.nonEmpty, + "processPlan should set CODEGEN_ID_TAG on nodes inside WholeStageCodegenExec") + } + } + + test("processPlan restores localIdMap to its prior value after completion") { + val prev = ExplainUtils.localIdMap.get() + ExplainUtils.processPlan(spark.range(10).filter("id > 3").queryExecution.executedPlan, + _ => ()) + assert(ExplainUtils.localIdMap.get() eq prev, + "processPlan should restore the thread-local localIdMap on completion") + } + + test("processPlan includes subquery output in the explain string") { + withSQLConf("spark.sql.adaptive.enabled" -> "false") { + val df = spark.range(100).filter("id > 0") + .filter(s"id < (SELECT max(id) FROM range(5))") + val output = explainOutput(df.queryExecution.executedPlan) + assert(output.contains("Subqueries"), "processPlan should emit a Subqueries section") + } + } +}