Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-48290

AQE not working when joining dataframes with more than 2000 partitions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.2, 3.5.1
    • None
    • Optimizer, SQL
    • None
    • spark-standalone

      spark3.5.1

    Description

      We are joining 2 large dataframes with a considerable skew on the left side in one specific key (>2000 skew ratio).

      left side num partitions: 10335
      right side num partitions: 1241
      
      left side num rows: 20181947343
      right side num rows: 107462219 

      Since we have `spark.sql.adaptive.enabled ` we expect AQE to act during the join, dealing with the skewed partition automatically.

      During their join, we can see the following log indicating that the skew was not detected since their statistics looks weirdly equal for min/median/max sizes:

      OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
       OptimizeSkewedJoin: 
      Optimizing skewed join.
      Left side partitions size info:
      median size: 780925482, max size: 780925482, min size: 780925482, avg size: 780925482
      Right side partitions size info:
      median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
             

      Looking at this log line and the spark configuration possibilities, our two main hypotheses to work around this behavior and correctly detect the skew were:

      1. Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t convert the statistics into a `CompressedMapStatus` and therefore is able to identify the skewed partition.
      2. Allowing spark to use a `HighlyCompressedMapStatus`, but change other configurations such as `spark.shuffle.accurateBlockThreshold` and `spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the skewed partitions/blocks is accurately registered and consequently used in the optimization.

      We tried different values for `spark.shuffle.accurateBlockThreshold` (even absurd ones like 1MB) and nothing seem to work. The statistics indicates that the min/median and max are the same somehow and thus, the skew is not detected.

      However, when forcibly reducing `spark.sql.shuffle.partitions` to less than 2000 partitions, the statistics looked correct and the optimized skewed join acts as it should:

      OptimizeSkewedJoin: number of skewed partitions: left 1, right 0
      OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into 337 parts.
      OptimizeSkewedJoin: 
      Optimizing skewed join.
      Left side partitions size info:
      median size: 862803419, max size: 282616632301, min size: 842320875, avg size: 1019367139
      Right side partitions size info:
      median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766 

      Should we assume that the statistics are becoming corrupted when Spark uses `HighlyCompressedMapStatus`? Should we try another configuration property to try to work around this problem? (Assuming that fine tuning all dataframes in skewed joins in our ETL to have less than 2000 partitions is not an option)

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            andre.amorimfonseca@gmail.com André F.
            Votes:
            4 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

              Created:
              Updated: