Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.3.0, 3.5.0
Description
If a cached dataset is used by multiple other datasets materialized in separate threads it can corrupt the TreeNode.tags map in any of the cached plan nodes. This will hang the driver forever. This happens because TreeNode.tags is not thread-safe. How this happens:
- Multiple datasets are materialized at the same time in different threads that reference the same cached dataset
- AdaptiveSparkPlanExec.onUpdatePlan will call ExplainMode.fromString
- ExplainUtils uses the TreeNode.tags map to store the operator Id for every node in the plan. This is usually okay because the plan is cloned. When there is an InMemoryScanExec the InMemoryRelation.cachedPlan is not cloned so multiple threads can set the operator Id.
Making the TreeNode.tags field thread-safe does not solve this problem because there is still a correctness issue. The threads may be overwriting each other's operator Ids, which could be different.
Example stack trace of the infinite loop:
scala.collection.mutable.HashTable.resize(HashTable.scala:265) scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158) scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170) scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167) scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44) scala.collection.mutable.HashMap.put(HashMap.scala:126) scala.collection.mutable.HashMap.update(HashMap.scala:131) org.apache.spark.sql.catalyst.trees.TreeNode.setTagValue(TreeNode.scala:108) org.apache.spark.sql.execution.ExplainUtils$.setOpId$1(ExplainUtils.scala:134) … org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175) org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:662)
Example to show the cachedPlan object is not cloned:
import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import spark.implicits._ def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = { if (plan.isInstanceOf[InMemoryTableScanExec]) { Some(plan.asInstanceOf[InMemoryTableScanExec]) } else if (plan.children.isEmpty && plan.subqueries.isEmpty) { None } else { (plan.subqueries.flatMap(p => findCacheOperator(p)) ++ plan.children.flatMap(findCacheOperator)).headOption } } val df = spark.range(10).filter($"id" < 100).cache() val df1 = df.limit(1) val df2 = df.limit(1) // Get the cache operator (InMemoryTableScanExec) in each plan val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get // Check if InMemoryTableScanExec references point to the same object println(plan1.eq(plan2)) // returns false// Check if InMemoryRelation references point to the same object println(plan1.relation.eq(plan2.relation)) // returns false // Check if the cached SparkPlan references point to the same object println(plan1.relation.cachedPlan.eq(plan2.relation.cachedPlan)) // returns true // This shows that the cloned plan2 still has references to the original plan1
Attachments
Issue Links
- causes
-
SPARK-47177 Cached SQL plan do not display final AQE plan in explain string
- Resolved
- links to