Currently we have state assignment strategy of operator state below:
- When parallelism not changed:
- If we only have even-split redistributed state, state assignment would try to keep as the same as previously (actually not always the same).
- If we have union redistributed state, all the operator state would be redistributed as the new state assignment.
- When parallelism changed:
- all the operator state would be redistributed as the new state assignment.
There existed two problems when parallelism not changed:
- If we only have even-split redistributed state, current implementation actually cannot ensure state assignment to keep as the same as previously. This is because current StateAssignmentOperation#collectPartitionableStates would repartition managedOperatorStates without subtask-index information. Take an example, if we have a operator-state with parallelism as 2, and subtask-0's managed-operatorstate is empty while subtask-1 not. Although new parallelism still keeps as 2, after StateAssignmentOperation#collectPartitionableStates and state assigned, subtask-0 would be assigned the managed-operatorstate while subtask-1 got none.
- We should only redistribute union state and not touch the even-split state. Redistribute even-split state would cause unexpected behavior after RestartPipelinedRegionStrategy supported to restore state.