Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40407

Repartition of DataFrame can result in severe data skew in some special case

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.1, 3.1.1, 3.1.2, 3.1.3, 3.2.1, 3.3.0, 3.2.2
    • 3.3.1, 3.2.3, 3.4.0
    • SQL
    • None

    Description

      _val df = spark.range(0, 100, 1, 50).repartition(4)_
      _val v = df.rdd.mapPartitions { iter => {_
              _Iterator.single(iter.length)_
      {_}}{_}{_}.collect(){_}
      _println(v.mkString(","))_
      

      The above simple code outputs `50,0,0,50`, which means there is no data in partition 1 and partition 2.

      I just debugged it and found the RoundRobin seems to ensure to distribute the records evenly *in the same partition*, and not guarantee it between partitions.

      Below is the code to generate the key

            case RoundRobinPartitioning(numPartitions) =>
              // Distributes elements evenly across output partitions, starting from a random partition.
              var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)  
              (row: InternalRow) => {
                // The HashPartitioner will handle the `mod` by the number of partitions
                position += 1
                position
              }
      

      In this case, There are 50 partitions, each partition will only compute 2 elements. The issue for RoundRobin here is it always starts with position=2 to do the Roundrobin.

      See the output of Random

      scala> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))  // the position is always 2.
      2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 
      

      Similarly, the below Random code also outputs the same value, 

      (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " "))
      
      (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))
      
      (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " "))
      
      (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " "))
      
      (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " "))
      

      Let's go back to this case,

      Consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0
      consider partition 1, the total elements are [2, 3], so when shuffle writes, for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, the key will be (position + 1)=(3+1)=4%4 = 0

       

      The calculation is also applied for other left partitions since the starting position is always 2 for this case.

      So, as you can see, each partition will write its elements to Partition [0, 3], which results in Partition [1, 2] without any data.

       

      I will try to provide the patch to fix this issue.

      Attachments

        Activity

          People

            wbo4958 Bobby Wang
            wbo4958 Bobby Wang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: