Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
This sub-task follows after FLINK-6190.
For non-primitive type serializers internally created by Flink, we need to allow them to be reconfigurable whenever we detect a change between the previous and current serializer configuration.
Most notably, this is relevant for the KryoSerializer and PojoSerializer which are affected by the configuration, as well as composite types which can potentially have nested serializers (e.g. GenericArraySerializer).
Since not all serializers require / reconfiguration, we propose to have a extended abstract base class for these:
@Internal public abstract class ReconfigurableTypeSerializer<T> extends TypeSerializer<T> { void abstract reconfigure(SerializersConfig serializersConfig); }
This class is also used as a tag, to check if a serializer needs to be reconfigured when serializer configuration change is detected.
Note that type serializer reconfiguration is only a mechanic internal to Flink. User custom serializers cannot rely on reconfiguration to bridge upgrades; they should be responsible that the deserialize method is able to read old state.
For the KryoSerializer, reconfiguration is basically making sure that all previous registrations are existent in the exact same order, and new registrations are only appended. This allows the reconfigured serializer to be able to read old state.
For the PojoSerializer and other serializers that may have nested serializers, reconfiguration should basically be a reconfigure call from the top serializer, traversing through all nested serializers and reconfiguring them, until there are no more reconfigurable serializers.