The current calculation for the number of Key Groups (max parallelism) leads in many cases to data skew and to confusion among users.
Specifically, the fact that for maxParallelisms above 128, the default value is set to roundToPowerOfTwo(1.5 x parallelism) means that frequently, half of the tasks get one keygroup and the other half gets two keygroups, which is very skewed.
See section (1) in this "lessons learned" blog post. https://engineering.contentsquare.com/2021/ten-flink-gotchas/
We can fix this by
- either setting a default maxParallelism to something pretty high (2048 for example). The cost is that we add the default key group overhead per state entry from one byte to two bytes.
- or we stay with some similar logic, but we instead of 1.5 x operatorParallelism we go with some higher multiplier, like 4 x operatorParallelism. The price is again that we more quickly reach the point where we have two bytes of keygroup encoding overhead, instead of one.
Implementation wise, there is an unfortunate situation that the maxParallelism, if not configured, is not stored anywhere in the job graph, but re-derived on the JobManager each time it loads a JobGraph vertex (ExecutionJobVertex) which does not have a MaxParallelism configured. This relies on the implicit contract that this logic never changes.
Changing this logic will instantly break all jobs which have not explicitly configured the Max Parallelism. That seems like a pretty heavy design shortcoming, unfortunately
A way to partially work around that is by moving the logic that derives the maximum parallelism to the StreamGraphGenerator, so we never create JobGraphs where vertices have no configured Max Parallelism (and we keep the re-derivation logic for backwards compatibility for persisted JobGraphs).
The StreamExecutionEnvironment will need a flag to use the "old mode" to give existing un-configured applications a way to keep restoring from old savepoints.