Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27666

Do not release lock while TaskContext already completed

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.0
    • Component/s: SQL
    • Labels:
      None

      Description

      Exception in thread "Thread-14" java.lang.AssertionError: assertion failed: Block rdd_0_0 is not locked for reading
      	at scala.Predef$.assert(Predef.scala:223)
      	at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
      	at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:1000)
      	at org.apache.spark.storage.BlockManager.$anonfun$getLocalValues$5(BlockManager.scala:746)
      	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
      	at org.apache.spark.rdd.RDDSuite.$anonfun$new$265(RDDSuite.scala:1185)
      	at java.lang.Thread.run(Thread.java:748)
      

      We're facing an issue reported by SPARK-18406 and SPARK-25139. And https://github.com/apache/spark/pull/24542 bypassed the issue by capturing the assertion error to avoid failing the executor. However, when not using pyspark, issue still exists when user implements a custom RDD(https://issues.apache.org/jira/browse/SPARK-18406?focusedCommentId=15969384&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15969384) or task(see demo below), which spawn a separate thread to consume iterator from a cached parent RDD.

      val rdd0 = sc.parallelize(Range(0, 10), 1).cache()
          rdd0.collect()
          rdd0.mapPartitions { iter =>
            val t = new Thread(new Runnable {
              override def run(): Unit = {
                while(iter.hasNext) {
                  println(iter.next())
                  Thread.sleep(1000)
                }
              }
            })
            t.setDaemon(false)
            t.start()
            Iterator(0)
          }.collect()
      

      we could easily to reproduce the issue using the demo above.

      If we could prevent the separate thread from releasing lock on block when TaskContext has already completed,
      then, we won't hit this issue again.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Ngone51 wuyi
                Reporter:
                cloud_fan Wenchen Fan
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: