Details
Description
We are joining 2 large dataframes with a considerable skew on the left side in one specific key (>2000 skew ratio).
left side num partitions: 10335 right side num partitions: 1241 left side num rows: 20181947343 right side num rows: 107462219
Since we have `spark.sql.adaptive.enabled ` we expect AQE to act during the join, dealing with the skewed partition automatically.
During their join, we can see the following log indicating that the skew was not detected since their statistics looks weirdly equal for min/median/max sizes:
OptimizeSkewedJoin: number of skewed partitions: left 0, right 0 OptimizeSkewedJoin: Optimizing skewed join. Left side partitions size info: median size: 780925482, max size: 780925482, min size: 780925482, avg size: 780925482 Right side partitions size info: median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
Looking at this log line and the spark configuration possibilities, our two main hypotheses to work around this behavior and correctly detect the skew were:
- Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t convert the statistics into a `CompressedMapStatus` and therefore is able to identify the skewed partition.
- Allowing spark to use a `HighlyCompressedMapStatus`, but change other configurations such as `spark.shuffle.accurateBlockThreshold` and `spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the skewed partitions/blocks is accurately registered and consequently used in the optimization.
We tried different values for `spark.shuffle.accurateBlockThreshold` (even absurd ones like 1MB) and nothing seem to work. The statistics indicates that the min/median and max are the same somehow and thus, the skew is not detected.
However, when forcibly reducing `spark.sql.shuffle.partitions` to less than 2000 partitions, the statistics looked correct and the optimized skewed join acts as it should:
OptimizeSkewedJoin: number of skewed partitions: left 1, right 0 OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into 337 parts. OptimizeSkewedJoin: Optimizing skewed join. Left side partitions size info: median size: 862803419, max size: 282616632301, min size: 842320875, avg size: 1019367139 Right side partitions size info: median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766
Should we assume that the statistics are becoming corrupted when Spark uses `HighlyCompressedMapStatus`? Should we try another configuration property to try to work around this problem? (Assuming that fine tuning all dataframes in skewed joins in our ETL to have less than 2000 partitions is not an option)