Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21539

Job should not be aborted when dynamic allocation is enabled or spark.executor.instances larger then current allocated number by yarn

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.6.1, 2.1.0
    • None
    • Spark Core
    • None

    Description

      For spark on yarn.
      Right now, when TaskSet can not run on any node or host.Which means blacklistedEverywhere is true in TaskSetManager#abortIfCompleteBlacklisted.
      However, if dynamic allocation is enabled, we should wait for yarn to allocate new nodemanager in order to execute job successfully.
      How to reproduce?
      1、Set up a yarn cluster with 5 nodes.And assign a node1 with much larger cpu core and memory,which can let yarn launch container on this node even it is blacklisted by TaskScheduler.
      2、modify BlockManager#registerWithExternalShuffleServer

      logInfo("Registering executor with local external shuffle service.")
          val shuffleConfig = new ExecutorShuffleInfo(
            diskBlockManager.localDirs.map(_.toString),
            diskBlockManager.subDirsPerLocalDir,
            shuffleManager.getClass.getName)
      
          val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
          val SLEEP_TIME_SECS = 5
      
          for (i <- 1 to MAX_ATTEMPTS) {
            try {
              {color:red}if (shuffleId.host.equals("node1's address")) {
                   throw new Exception
              }{color}
              // Synchronous and will throw an exception if we cannot connect.
              shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
                shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
              return
            } catch {
              case e: Exception if i < MAX_ATTEMPTS =>
                logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
                  + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
                Thread.sleep(SLEEP_TIME_SECS * 1000)
              case NonFatal(e) =>
                throw new SparkException("Unable to register with external shuffle server due to : " +
                  e.getMessage, e)
            }
          }
      

      add logic in red.
      3、set shuffle service enable as true and open shuffle service for yarn.
      Then yarn will always launch executor on node1 but failed since shuffle service can not register success.
      Then job will be aborted.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              cane zhoukang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: