Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27734

Add memory based thresholds for shuffle spill

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: In Progress
    • Minor
    • Resolution: Unresolved
    • 3.1.0
    • None
    • Spark Core, SQL
    • None

    Description

      When running large shuffles (700TB input data, 200k map tasks, 50k reducers on a 300 nodes cluster) the job is regularly OOMing in map and reduce phase.

      IIUC ShuffleExternalSorter (map side) and ExternalAppendOnlyMap and ExternalSorter (reduce side) are trying to max out the available execution memory. This in turn doesn't play nice with the Garbage Collector and executors are failing with OutOfMemoryError when the memory allocation from these in-memory structure is maxing out the available heap size (in our case we are running with 9 cores/executor, 32G per executor)

      To mitigate this, I set spark.shuffle.spill.numElementsForceSpillThreshold to force the spill on disk. While this config works, it is not flexible enough as it's expressed in number of elements, and in our case we run multiple shuffles in a single job and element size is different from one stage to another.

      We have an internal patch to extend this behaviour and add two new parameters to control the spill based on memory usage:

      - spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold
      - spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              amuraru Adrian Muraru
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: