Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19371

Cannot spread cached partitions evenly across executors

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 1.6.1
    • None
    • None

    Description

      Before running an intensive iterative job (in this case a distributed topic model training), we need to load a dataset and persist it across executors.

      After loading from HDFS and persisting, the partitions are spread unevenly across executors (based on the initial scheduling of the reads which are not data locale sensitive). The partition sizes are even, just not their distribution over executors. We currently have no way to force the partitions to spread evenly, and as the iterative algorithm begins, tasks are distributed to executors based on this initial load, forcing some very unbalanced work.

      This has been mentioned a number of times in various user/dev group threads.

      None of the discussions I could find had solutions that worked for me. Here are examples of things I have tried. All resulted in partitions in memory that were NOT evenly distributed to executors, causing future tasks to be imbalanced across executors as well.

      Reduce Locality

      spark.shuffle.reduceLocality.enabled=false/true

      "Legacy" memory mode

      spark.memory.useLegacyMode = true/false

      Basic load and repartition

      val numPartitions = 48*16
      val df = sqlContext.read.
          parquet("/data/folder_to_load").
          repartition(numPartitions).
          persist
      df.count
      

      Load and repartition to 2x partitions, then shuffle repartition down to desired partitions

      val numPartitions = 48*16
      val df2 = sqlContext.read.
          parquet("/data/folder_to_load").
          repartition(numPartitions*2)
      val df = df2.repartition(numPartitions).
          persist
      df.count
      

      It would be great if when persisting an RDD/DataFrame, if we could request that those partitions be stored evenly across executors in preparation for future tasks.

      I'm not sure if this is a more general issue (I.E. not just involving persisting RDDs), but for the persisted in-memory case, it can make a HUGE difference in the over-all running time of the remaining work.

      Attachments

        1. execution timeline.png
          31 kB
          Thunder Stumpges
        2. RDD Block Distribution on two executors.png
          56 kB
          Thunder Stumpges
        3. Unbalanced RDD Blocks, and resulting task imbalance.png
          59 kB
          Thunder Stumpges
        4. Unbalanced RDD Blocks, and resulting task imbalance.png
          78 kB
          Thunder Stumpges

        Activity

          People

            Unassigned Unassigned
            thunderstumpges Thunder Stumpges
            Votes:
            14 Vote for this issue
            Watchers:
            14 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: