The HeapInternalTimerService still uses simple equals checks on restored / newly provided serializers for compatibility checks. This should be replaced with the TypeSerializer::ensureCompatibility checks instead, so that new serializers can be reconfigured.
This would entail that the TypeSerializerConfiguration of the key and namespace serializer in the HeapInternalTimerService also needs to be written to the raw state.
For Flink 1.4.0 release and current master, this is a critical bug since the KryoSerializer has different default base registrations than before due to
FLINK-7420. i.e if the key of a window is serialized using the KryoSerializer in 1.3.x, the restore would never succeed in 1.4.0.
For 1.3.x, this fix would be an improvement, such that the HeapInternalTimerService restore will make use of serializer reconfiguration.
- We need to double check all operators that checkpoint / restore from *raw* state. Apparently, the serializer compatibility checks were only implemented for managed state.
- Migration ITCases apparently do not have enough coverage. A migration test job that uses a key type which required the KryoSerializer, and uses windows, would have caught this issue.