Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26114

Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.2, 2.3.2, 2.4.0
    • Fix Version/s: 2.4.1, 3.0.0
    • Component/s: Spark Core
    • Labels:
      None
    • Environment:

      Spark 3.0.0-SNAPSHOT (master branch)
      Scala 2.11
      Yarn 2.7

      Description

      Trying to use coalesce after shuffle-oriented transformations leads to OutOfMemoryErrors or Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead.
      Discussion is here.

      The error happens when trying specify pretty small number of partitions in coalesce call.

      How to reproduce?

      1. Start spark-shell
        spark-shell \ 
          --num-executors=5 \ 
          --executor-cores=2 \ 
          --master=yarn \
          --deploy-mode=client \ 
          --conf spark.executor.memoryOverhead=512 \
          --conf spark.executor.memory=1g \ 
          --conf spark.dynamicAllocation.enabled=false \
          --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
        

        Please note using -Dio.netty.noUnsafe=true property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now.
        Also note that the total number of cores allocated for job is 5x2=10

      2. Then generate some test data
        import org.apache.hadoop.io._ 
        import org.apache.hadoop.io.compress._ 
        import org.apache.commons.lang._ 
        import org.apache.spark._ 
        
        // generate 100M records of sample data 
        sc.makeRDD(1 to 1000, 1000) 
          .flatMap(item => (1 to 100000) 
            .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) 
          .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
        
      3. Run the sample job
        import org.apache.hadoop.io._
        import org.apache.spark._
        import org.apache.spark.storage._
        
        val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
        rdd 
          .map(item => item._1.toString -> item._2.toString) 
          .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
          .coalesce(10,false) 
          .count 
        

        Note that the number of partitions is equal to the total number of cores allocated to the job.

      Here is dominator tree from the heapdump

      4 instances of ExternalSorter, although there are only 2 concurrently running tasks per executor.

      And paths to GC root of the already stopped ExternalSorter.

        Attachments

        1. run1-noparams-dominator-tree-externalsorter-gc-root.png
          121 kB
          Sergey Zhemzhitsky
        2. run1-noparams-dominator-tree-externalsorter.png
          66 kB
          Sergey Zhemzhitsky
        3. run1-noparams-dominator-tree.png
          141 kB
          Sergey Zhemzhitsky

          Activity

            People

            • Assignee:
              szhemzhitsky Sergey Zhemzhitsky
              Reporter:
              szhemzhitsky Sergey Zhemzhitsky
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: