Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-43157

TreeNode tags can become corrupted and hang driver when the dataset is cached

    XMLWordPrintableJSON

Details

    Description

      If a cached dataset is used by multiple other datasets materialized in separate threads it can corrupt the TreeNode.tags map in any of the cached plan nodes. This will hang the driver forever. This happens because TreeNode.tags is not thread-safe. How this happens:

      1. Multiple datasets are materialized at the same time in different threads that reference the same cached dataset
      2. AdaptiveSparkPlanExec.onUpdatePlan will call ExplainMode.fromString
      3. ExplainUtils uses the TreeNode.tags map to store the operator Id for every node in the plan. This is usually okay because the plan is cloned. When there is an InMemoryScanExec the InMemoryRelation.cachedPlan is not cloned so multiple threads can set the operator Id.

      Making the TreeNode.tags field thread-safe does not solve this problem because there is still a correctness issue. The threads may be overwriting each other's operator Ids, which could be different.

      Example stack trace of the infinite loop:

      scala.collection.mutable.HashTable.resize(HashTable.scala:265)
      scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158)
      scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170)
      scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
      scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
      scala.collection.mutable.HashMap.put(HashMap.scala:126)
      scala.collection.mutable.HashMap.update(HashMap.scala:131)
      org.apache.spark.sql.catalyst.trees.TreeNode.setTagValue(TreeNode.scala:108)
      org.apache.spark.sql.execution.ExplainUtils$.setOpId$1(ExplainUtils.scala:134)
      …
      org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
      org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:662)

      Example to show the cachedPlan object is not cloned:

      import org.apache.spark.sql.execution.SparkPlan
      import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
      import spark.implicits._
      
      def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
        if (plan.isInstanceOf[InMemoryTableScanExec]) {
          Some(plan.asInstanceOf[InMemoryTableScanExec])
        } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
          None
        } else {
          (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
            plan.children.flatMap(findCacheOperator)).headOption
        }
      }
      
      val df = spark.range(10).filter($"id" < 100).cache()
      val df1 = df.limit(1)
      val df2 = df.limit(1)
      
      // Get the cache operator (InMemoryTableScanExec) in each plan
      val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
      val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get
      
      // Check if InMemoryTableScanExec references point to the same object
      println(plan1.eq(plan2))
      // returns false// Check if InMemoryRelation references point to the same object
      
      println(plan1.relation.eq(plan2.relation))
      // returns false
      
      // Check if the cached SparkPlan references point to the same object
      println(plan1.relation.cachedPlan.eq(plan2.relation.cachedPlan))
      // returns true
      // This shows that the cloned plan2 still has references to the original plan1 

      Attachments

        Issue Links

          Activity

            People

              robreeves Rob Reeves
              robreeves Rob Reeves
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: