Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
3.3.0
-
None
Description
SPARK-35703 removed HashClusteredDistribution and replaced its usages with ClusteredDistribution.
While this works great for non stateful operators, we still need to have a separate requirement of distribution for stateful operator, because the requirement of ClusteredDistribution is too relaxed while the requirement of physical partitioning on stateful operator is quite strict.
In most cases, stateful operators must require child distribution as HashClusteredDistribution, with below major assumptions:
- HashClusteredDistribution creates HashPartitioning and we will never ever change it for the future.
- We will never ever change the implementation of partitionIdExpression in HashPartitioning for the future, so that Partitioner will behave consistently across Spark versions.
- No partitioning except HashPartitioning can satisfy HashClusteredDistribution.
We should revive HashClusteredDistribution (with probably renaming specifically with stateful operator) and apply the distribution to the all stateful operators.
SPARK-35703 only touched stream-stream join, which means stream-stream join hasn't been broken in actual releases. Let's aim the partial revert of SPARK-35703 in this ticket, and have another ticket to deal with other stateful operators, which have been broken for their introduction (2.2+).