Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11073

Make serializers immutable / provide option TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer

    XMLWordPrintableJSON

    Details

    • Release Note:
      Hide
      The {{CompositeSerializerSnapshot}} utility class has been removed. You should now use {{CompositeTypeSerializerSnapshot}} instead, for snapshots of composite serializers that delegate serialization to multiple nested serializers. Please see https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html#implementing-a-compositetypeserializersnapshot for instructions on using {{CompositeTypeSerializerSnapshot}}.
      Show
      The {{CompositeSerializerSnapshot}} utility class has been removed. You should now use {{CompositeTypeSerializerSnapshot}} instead, for snapshots of composite serializers that delegate serialization to multiple nested serializers. Please see https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html#implementing-a-compositetypeserializersnapshot for instructions on using {{CompositeTypeSerializerSnapshot}}.

      Description

      Motivation

      Right now, when a new serializer is provided to the old serializer (or, to be more specific, the old serializer's snapshot) for state schema compatibility checks, if the new serializer is reconfigurable so that it may be compatible, the only possible way to do this is reconfigure the new serializer in-place and return TypeSerializerSchemaCompatibility.compatibleAsIs() as the result of the compatibility check.

      One solid example is the KryoSerializer. The KryoSerializer contains as configuration a map of serialized classes to their registered ids. This mapping may change on restore executions, and the new KryoSerializer must reconfigure this mapping to match with the previous execution before the new KryoSerializer can be used for state access.
      Right now, this is performed by directly mutating the map in the new serializer instance.

      This mutative behaviour is fragile, especially when taking into account scale down / up scenarios which could easily result in mismatching state serializer configurations across TMs.

      Proposed Approach

      1. The TypeSerializerSchemaCompatibility result class should be extended to contain an option compatibleWithReconfiguredSerializer(TypeSerializer), which would wrap a new instance of a reconfigured version of the new serializer.
      2. Callers of the compatibility check needs to be aware of this case and respect it, using the provided reconfigured serializer instance when one is provided. In Flink, there are two places which performs compatibility checks on serializers: 1) composite serializers which contain nested serializers, and therefore needs to check compatibility of its nested serializers, and 2) in state backends, checking the compatibility of the new serializer with the old serializer.
      3. Introduce CompositeTypeSerializerSnapshot to encapsulate logic of handling reconfiguration of nested serializers: if a composite serializer has a nested serializer that returns a new reconfigured instance of itself, than the result of the compatibility check on the composite serializer should also wrap a reconfigured version of the composite serializer that holds the reconfigured nested serializer. This logic should be captured in a base abstract class, say CompositeTypeSerializerSnapshot so that it can be commonly shared by many of Flink's composite serializers.
      4. For composite serializers that is still using the legacy, less-powerful TypeSerializerConfigSnapshot and CompatibilityResult abstractions, while its nested serializer is signaling that it has reconfigured itself, this should be detected and an error is thrown complaining that the outer composite serializer needs to be upgraded to use the new serializer snapshot and compatibility abstractions. This approach follows the same way we handled bridging the new TypeSerializerSchemaCompatibility and old CompatibilityResult class in Flink 1.7.

        Attachments

          Activity

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 50m
                50m