In order for serializers to be able to be reconfigured on restore, we need knowledge of the previous serializer configurations, e.g. what types were registered, with which specific / default serializers, and especially for Kryo, the order they were registered.
For this, we will need serializer configuration metainfo to be self-contained within the written state.
For the implementation, we propose to include the following changes:
- Have a new separate SerializersConfig class that is extracted from ExecutionConfig. This new class should contain only the serializer-related configurations (e.g., registeredKryoTypes, registeredPojoTypes, etc.). The SerializersConfig class should only be internally used, and therefore annotated with Internal. Users should still use the ExecutionConfig to configure serializers.
- For serializers that previously require a ExecutionConfig in constructors, try changing them to take a SerializersConfig instead.
- Introduce SerializersConfigSerializationProxy, which is in charge of serializing the current SerializersConfig when writing state to streams. This proxy defines the the serialized format of serializer configurations, therefore should be a VersionedIOReadableWritable as we may change the format / information written in the future.
- Add SerializersConfigSerializationProxy into state backends serialization proxies (e.g. KeyedBackendSerializationProxy) so that the serializer configuration is written into state. Need to additionally make sure backwards compatibility of previous-version backend serialization proxies.
For the initial version, we propose to include the following within the written serialization config metadata (ordered):
2. Throwable.class --> org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer default Flink-specific registry for serializing throwables.
5. Kryo registrations for all primitive types (and boxed versions). This is to allow compatibility in case the built-in registrations for the primitive types change in Kryo in the future.