On restoring, splits are add back directly to SourceReader in SourceOperator. In no rescaled restoring, bindings between splits and subtasks are preserved due to no repartition in RoundRobinOperatorStateRepartitioner. But in rescaled restoring, these operator states will be redistributed cross existing subtasks. This might break possible assignment from SourceEnumerator.
Given KafkaSource as an example, the partition to subtask assignment is decided by KafkaSourceEnumerator.getSplitOwner. The mappings will break after rescaling.
I pushed a test case using KafkaSource for evaluation.
I think it requires api addition to solve in generic and configurable way.
Is it a valid issue ? I am not that sure.