Details
-
New Feature
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
None
-
None
Description
Currently, users are locked in with the serializer implementation used to write their state.
This is suboptimal, as generally for users, it could easily be possible that they wish to change their serialization formats / state schemas and types in the future.
This is an umbrella JIRA for the required tasks to make this possible.
Here's an overview description of what to expect for the overall outcome of this JIRA (the specific details are outlined in their respective subtasks):
Ideally, the main user-facing change this would result in is that users implementing their custom TypeSerializer s will also need to implement hook methods that identify whether or not there is a change to the serialized format or even a change to the serialized data type. It would be the user's responsibility that the deserialize method can bridge the change between the old / new formats.
For Flink's built-in serializers that are automatically built using the user's configuration (most notably the more complex KryoSerializer and GenericArraySerializer), Flink should be able to automatically "reconfigure" them using the new configuration, so that the reconfigured versions can be used to de- / serialize previous state. This would require knowledge of the previous configuration of the serializer, therefore "serializer configuration metadata" will be added to savepoints.
Note that for the first version of this, although additional infrastructure (e.g. serializer reconfigure hooks, serializer configuration metadata in savepoints) will be added to potentially allow Kryo version upgrade, this JIRA will not cover this. Kryo has breaking binary formats across major versions, and will most likely need some further changes. Therefore, for the KryoSerializer, "upgrading" it simply means changes in the registration of specific / default serializers, at least for now.
Finally, we would need to add a "convertState" phase to the task lifecycle, that takes place after the "open" phase and before checkpointing starts / the task starts running. It can only happen after "open", because only then can we be certain if any reconfiguration of state serialization has occurred, and state needs to be converted. Ideally, the code for the "convertState" is designed so that it can be easily exposed as an offline tool in the future.
For this JIRA, we should simply assume that after open(), we have all the required information and serializers are appropriately reconfigured. srichter is currently planning to deprecate RuntimeContext state registration methods in favor of a new interface that enforces eager state registration, so that we may have all the info after open()