Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47458

Incorrect to calculate the concurrent task number




      The below test case failed,


      test("problem of calculating the maximum concurrent task") {
        withTempDir { dir =>
          val discoveryScript = createTempScriptWithExpectedOutput(
            dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
          val conf = new SparkConf()
            // Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU.
            .setMaster("local-cluster[1, 6, 1024]")
            .set(WORKER_GPU_ID.amountConf, "4")
            .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
            .set(EXECUTOR_GPU_ID.amountConf, "4")
            .set(TASK_GPU_ID.amountConf, "2")
            // disable barrier stage retry to fail the application as soon as possible
          sc = new SparkContext(conf)
          TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
          // Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU.
          // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage
          // can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total.
          assert(sc.parallelize(Range(1, 10), 2)
            .mapPartitions { iter => iter }
            .collect() sameElements Range(1, 10).toArray[Int])

      The error log

      SPARK-24819: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
      org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: SPARK-24819: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
      at org.apache.spark.errors.SparkCoreErrors$.numPartitionsGreaterThanMaxNumConcurrentTasksError(SparkCoreErrors.scala:241)
      at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:576)
      at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:654)
      at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1321)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3055)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3046)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3035)


        Issue Links



              wbo4958 Bobby Wang
              wbo4958 Bobby Wang
              0 Vote for this issue
              1 Start watching this issue

