Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Later
-
None
-
None
-
None
Description
The implementation of HashPartitioning.compatibleWith has been sub-optimal for a while. Think of the following case:
if table_a is hash partitioned by int column `i`, and table_b is also partitioned by int column `i`, logically these 2 partitionings are compatible. However, HashPartitioning.compatibleWith will return false for this case as the AttributeReference of column `i` between these 2 tables have different expr ids.
With this wrong result of HashPartitioning.compatibleWith, we will go into this branch and may add unnecessary shuffle.
This won't impact correctness if the join keys are exactly the same with hash partitioning keys, as there’s still an opportunity to not partition that child in that branch: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L428
However, if the join keys are a super-set of hash partitioning keys, for example, table_a and table_b are both hash partitioned by column `i`, and we wanna join them using column `i, j`, logically we don't need shuffle but in fact the 2 tables start out as partitioned only by `i` and redundantly be repartitioned by `i, j`.
A quick fix is just set the expr id of AttributeReference to 0 before we call this.semanticEquals(o) in HashPartitioning.compatibleWith, but for long term, I think we need a better design than the `compatibleWith`, `guarantees`, and `satisfies` mechanism, as it's quite complex