Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6056

Unlimit offHeap memory use cause RM killing the container

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 1.2.1
    • None
    • Shuffle, Spark Core
    • None

    Description

      No matter set the `preferDirectBufs` or limit the number of thread or not ,spark can not limit the use of offheap memory.
      At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty had allocated a offheap memory buffer with the same size in heap.
      So how many buffer you want to transfor, the same size offheap memory will be allocated.
      But once the allocated memory size reach the capacity of the overhead momery set in yarn, this executor will be killed.
      I wrote a simple code to test it:

      test.scala
      import org.apache.spark.storage._
      import org.apache.spark._
      val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new Array[Byte](10*1024*1024)).persist
      bufferRdd.count
      val part =  bufferRdd.partitions(0)
      val sparkEnv = SparkEnv.get
      val blockMgr = sparkEnv.blockManager
      def test = {
              val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
              val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
              val len = resultIt.map(_.length).sum
              println(s"[${Thread.currentThread.getId}] get block length = $len")
      }
      
      def test_driver(count:Int, parallel:Int)(f: => Unit) = {
          val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
          val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
          val parseq = (1 to count).par
          parseq.tasksupport = taskSupport
          parseq.foreach(x=>f)
      
          tpool.shutdown
          tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
      }
      

      progress:
      1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
      2. :load test.scala in spark-shell
      3. use such comman to catch executor on slave node

      pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p $pid|grep $pid
      

      4. test_driver(20,100)(test) in spark-shell
      5. watch the output of the command on slave node

      If use multi-thread to get len, the physical memery will soon exceed the limit set by spark.yarn.executor.memoryOverhead

      Attachments

        Activity

          People

            Unassigned Unassigned
            carlmartin Zhaowei Huang
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: