Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27112

Spark Scheduler encounters two independent Deadlocks when trying to kill executors either due to dynamic allocation or blacklisting

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0, 3.0.0
    • 2.3.4, 2.4.1, 3.0.0
    • Scheduler, Spark Core
    • None

    Description

      Recently, a few spark users in the organization have reported that their jobs were getting stuck. On further analysis, it was found out that there exist two independent deadlocks and either of them occur under different circumstances. The screenshots for these two deadlocks are attached here. 

      We were able to reproduce the deadlocks with the following piece of code:

       

      import org.apache.hadoop.conf.Configuration
      import org.apache.hadoop.fs.{FileSystem, Path}
      
      import org.apache.spark._
      import org.apache.spark.TaskContext
      
      // Simple example of Word Count in Scala
      object ScalaWordCount {
      def main(args: Array[String]) {
      
      if (args.length < 2) {
      System.err.println("Usage: ScalaWordCount <inputFilesURI> <outputFilesUri>")
      System.exit(1)
      }
      
      val conf = new SparkConf().setAppName("Scala Word Count")
      val sc = new SparkContext(conf)
      
      // get the input file uri
      val inputFilesUri = args(0)
      
      // get the output file uri
      val outputFilesUri = args(1)
      
      while (true) {
      val textFile = sc.textFile(inputFilesUri)
      val counts = textFile.flatMap(line => line.split(" "))
      .map(word => {if (TaskContext.get.partitionId == 5 && TaskContext.get.attemptNumber == 0) throw new Exception("Fail for blacklisting") else (word, 1)})
      .reduceByKey(_ + _)
      counts.saveAsTextFile(outputFilesUri)
      val conf: Configuration = new Configuration()
      val path: Path = new Path(outputFilesUri)
      val hdfs: FileSystem = FileSystem.get(conf)
      hdfs.delete(path, true)
      }
      
      sc.stop()
      }
      }
      

       

      Additionally, to ensure that the deadlock surfaces up soon enough, I also added a small delay in the Spark code here:

      https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala#L256

       

      executorIdToFailureList.remove(exec)
      updateNextExpiryTime()
      Thread.sleep(2000)
      killBlacklistedExecutor(exec)
      

       

      Also make sure that the following configs are set when launching the above spark job:
      spark.blacklist.enabled=true
      spark.blacklist.killBlacklistedExecutors=true
      spark.blacklist.application.maxFailedTasksPerExecutor=1

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            pgandhi Parth Gandhi Assign to me
            pgandhi Parth Gandhi
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment