Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9376

Allow upgrading to incompatible state serializers (state schema evolution)

    XMLWordPrintableJSON

Details

    • Hide
      Before Flink 1.7, serializer snapshots were implemented as a `TypeSerializerConfigSnapshot` (which is now deprecated, and will eventually be removed in the future to be fully replaced by the new `TypeSerializerSnapshot` interface introduced in 1.7).
      Moreover, the responsibility of serializer schema compatibility checks lived within the `TypeSerializer`,
      implemented in the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method.

      To be future-proof and have flexibility to migrate your state serializers and schema, it is highly recommended to migrate from the old abstractions. Details and migration guides can be found [here](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html).
      Show
      Before Flink 1.7, serializer snapshots were implemented as a `TypeSerializerConfigSnapshot` (which is now deprecated, and will eventually be removed in the future to be fully replaced by the new `TypeSerializerSnapshot` interface introduced in 1.7). Moreover, the responsibility of serializer schema compatibility checks lived within the `TypeSerializer`, implemented in the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method. To be future-proof and have flexibility to migrate your state serializers and schema, it is highly recommended to migrate from the old abstractions. Details and migration guides can be found [here]( https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html ).

    Description

      Currently, users have access to upgrade state serializers on the restore run of a stateful job, as long as the upgraded new serializer remains backwards compatible with all previous written data in the savepoint (i.e. it can read all previous and current schema of serialized state objects).

      What is still lacking is the ability to upgrade to incompatible serializers. Upon being registered an incompatible serializer for existing restored state, that state needs to go through the process of -
      1. read serialized state with the previous serializer
      2. passing each deserialized state object through a “migration map function”, and
      3. writing back the state with the new serializer

      The availability of this process should be strictly limited to state registrations that occur before the actual processing begins (e.g. in the open or initializeState methods), so that we avoid performing these operations during processing.

      How this procedure actually occurs, differs across different types of state backends.
      For example, for state backends that eagerly deserialize / lazily serialize state (e.g. HeapStateBackend), the job execution itself can be seen as a "migration"; everything is deserialized to state objects on restore, and is only serialized again, with the new serializer, on checkpoints.
      Therefore, for these state backends, the above process is irrelevant.

      On the other hand, for state backends that lazily deserialize / eagerly serialize state (e.g. RocksDBStateBackend), the state evolution process needs to happen for every state with a newly registered incompatible serializer.

      Procedure 2. will allow even state type migrations, but that is out-of-scope of this JIRA.
      This ticket focuses only on procedures 1. and 3., where we try to enable schema evolution without state type changes.

      This is an umbrella JIRA ticket that overlooks this feature, including a few preliminary tasks that work towards enabling it.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              tzulitai Tzu-Li (Gordon) Tai
              Votes:
              1 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: